Re-RxJS

React bindings for RxJS
Main features
- 🌀 Truly Reactive
- ⚡ Highly performant and free of memory-leaks
- 🔀 First class support for React Suspense and ready for Concurrent Mode
- ✂️ Decentralized and composable, thus enabling optimal code-splitting
- 🔬 Tiny and tree-shakeable
- 💪 Supports TypeScript
Table of Contents
Installation
npm install re-rxjs
API
connectObservable
const [useCounter, sharedCounter$] = connectObservable(
clicks$.pipe(
scan(prev => prev + 1, 0),
startWith(0),
)
)
Returns a hook that provides the latest update of the accepted observable, and the underlying enhanced observable, which shares the subscription to all of its subscribers, and always emits the latest value when subscribing to it.
The shared subscription is closed as soon as there are no subscribers to that observable.
connectFactoryObservable
const [useStory, getStory$] = connectFactoryObservable(
(storyId: number) => getStoryWithUpdates$(storyId)
)
const Story: React.FC<{id: number}> = ({id}) => {
const story = useStory(id);
return (
<article>
<h1>{story.title}</h1>
<p>{story.description</p>
</article>
)
}
Accepts: A factory function that returns an Observable.
Returns [1, 2]
-
A React Hook function with the same parameters as the factory function. This hook will yield the latest update from the observable returned from the factory function.
-
A
sharedLatestversion of the observable returned by the factory function that does not complete. It can be used for composing other streams that depend on it.
shareLatest
const activePlanetName$ = planet$.pipe(
filter(planet => planet.isActive),
map(planet => planet.name),
shareLatest()
)
A RxJS pipeable operator which shares and replays the latest emitted value. It's the equivalent of:
const shareLatest = <T>(): Observable<T> =>
source$.pipe(
multicast(() => new ReplaySubject<T>(1)),
refCount(),
)
The enhanced observables returned from connectObservable and connectFactoryObservable
have been enhanced with this operator, but do not complete. Meaning that the latest
emitted value will be available until the refCount drops to zero.
SUSPENSE
const story$ = selectedStoryId$.pipe(
switchMap(id => concat(
SUSPENSE,
getStory$(id)
))
)
This is a special symbol that can be emitted from our observables to let the react hook know that there is a value on its way, and that we want to leverage React Suspense API while we are waiting for that value.
suspend
const story$ = selectedStoryId$.pipe(
switchMap(id => suspend(getStory$(id))
)
A RxJS creation operator that prepends a SUSPENSE on the source observable.
suspended
const story$ = selectedStoryId$.pipe(
switchMap(id => getStory$(id).pipe(
suspended()
))
)
The pipeable version of suspend
switchMapSuspended
const story$ = selectedStoryId$.pipe(
switchMapSuspended(getStory$)
)
Like switchMap but applying a startWith(SUSPENSE) to the inner observable.
createInput
A couple examples are worth a thousand words:
const [getCounter$, setCounter] = createInput(0)
const useCounter = connectFactoryObservable((id: string) => getCounter$(id))
const Counter: React.FC<{id: string}> = ({id}) => {
const counter = useCounter(id);
return (
<button onClick={() => setCounter$(id, x => x - 1)} />-</button>
{counter}
<button onClick={() => setCounter$(id, x => x + 1)} />+</button
)
}
or:
const [getUpClicks$, onUpClick] = createInput()
const [getDownClicks$, onDownClick] = createInput()
const useCounter = connectFactoryObservable((id: string) =>
merge(
getUpClicks$(id).pipe(mapTo(1)),
getDownClicks$(id).pipe(mapTo(-1)),
).pipe(
scan((a, b) => a + b, 0),
startWith(0)
)
)
const Counter: React.FC<{id: string}> = ({id}) => {
const counter = useCounter(id);
return (
<button onClick={onDownClick} />-</button>
{counter}
<button onClick={onUpClick} />+</button
)
}
Examples
-
This is a contrived example based on this example from the React docs.
-
A search for Github repos that highlights the most recently updated one:
import React, { Suspense } from "react"
import { Subject } from "rxjs"
import { startWith, map } from "rxjs/operators"
import { connectObservable, switchMapSuspended } from "re-rxjs"
import { Header, Search, LoadingResults, Repo } from "./components"
interface Repo {
id: number
name: string
description: string
author: string
stars: number
lastUpdate: number
}
const searchInput$ = new Subject<string>()
const onSubmit = (value: string) => searchInput$.next(value)
const findRepos = (query: string): Promise<Repo[]> =>
fetch(`https://api.github.com/search/repositories?q=${query}`)
.then(response => response.json())
.then(rawData =>
(rawData.items ?? []).map((repo: any) => ({
id: repo.id,
name: repo.name,
description: repo.description,
author: repo.owner.login,
stars: repo.stargazers_count,
lastUpdate: Date.parse(repo.update_at),
})),
)
const [useRepos, repos$] = connectObservable(
searchInput$.pipe(
switchMapSuspended(findRepos),
startWith(null),
),
)
function Repos() {
const repos = useRepos()
if (repos === null) {
return null
}
if (repos.length === 0) {
return <div>No results were found.</div>
}
return (
<ul>
{repos.map(repo => (
<li key={repo.id}>
<Repo {...repo} />
</li>
))}
</ul>
)
}
const [useMostRecentlyUpdatedRepo] = connectObservable(
repos$.pipe(
map(repos =>
Array.isArray(repos) && repos.length > 0
? repos.reduce((winner, current) =>
current.lastUpdate > winner.lastUpdate ? current : winner,
)
: null,
),
),
)
function MostRecentlyUpdatedRepo() {
const mostRecent = useMostRecentlyUpdatedRepo()
if (mostRecent === null) {
return null
}
const { id, name } = mostRecent
return (
<div>
The most recently updated repo is <a href={`#${id}`}>{name}</a>
</div>
)
}
export default function App() {
return (
<>
<Header>Search Github Repos</Header>
<Search onSubmit={onSubmit} />
<Suspense fallback={<LoadingResults />}>
<MostRecentlyUpdatedRepo />
<Repos />
</Suspense>
</>
)
}
Contributors ✨
Thanks goes to these wonderful people (emoji key):
Josep M Sobrepere 💻 🤔 🚧 ⚠️ 👀 📖 |
Víctor Oliva 🤔 👀 💻 ⚠️ 📖 |
Ed 🎨 |
Pierre Grimaud 📖 |
Bhavesh Desai 👀 📖 ⚠️ |
This project follows the all-contributors specification. Contributions of any kind welcome!