@react-rxjs/utils
Installation
npm install @react-rxjs/utils
API
split
A RxJS operator that groups the items emitted by the source based on the keySelector function, emitting one Observable for each group.
Arguments:
keySelector: A function that receives an item and returns the key of that item's group.streamSelector: (optional) The function to apply to each group observable (default = identity).
split will subscribe to each group observable and share the result to every
inner subscriber of that group. This inner observable can be mapped to another
observable through the streamSelector argument.
collectValues
A pipeable operator that collects all the GroupedObservables emitted by the source and emits a Map with the latest values of the inner observables.
const votesByKey$ = new Subject<{ key: string }>()
const counters$ = votesByKey$.pipe(
split(
(vote) => vote.key,
(votes$) =>
votes$.pipe(
mapTo(1),
scan((count) => count + 1),
takeWhile((count) => count < 3),
),
),
collectValues(),
)
counters$.subscribe((counters) => {
console.log("counters$:")
counters.forEach((value, key) => {
console.log(`${key}: ${value}`)
})
})
votesByKey$.next({ key: "foo" })
// > counters$:
// > foo: 1
votesByKey$.next({ key: "foo" })
// > counters$:
// > foo: 2
votesByKey$.next({ key: "bar" })
// > counters$:
// > foo: 2
// > bar: 1
votesByKey$.next({ key: "foo" })
// > counters$:
// > bar: 1
votesByKey$.next({ key: "bar" })
// > counters$:
// > bar: 2
//
votesByKey$.next({ key: "bar" })
// > counters$:
collect
A pipeable operator that collects all the GroupedObservables emitted by the source and emits a Map with the active inner observables.
Arguments:
filter?: A function that receives the inner Observable and returns an Observable of boolean values, which indicates whether the inner observable should be collected.
mergeWithKey
Emits the values from all the streams of the provided object, in a result which provides the key of the stream of that emission.
Arguments:
inputObject: Object of streams
const inc$ = new Subject()
const dec$ = new Subject()
const resetTo$ = new Subject<number>()
const counter$ = mergeWithKey({
inc$,
dec$,
resetTo$,
}).pipe(
scan((acc, current) => {
switch (current.type) {
case "inc$":
return acc + 1
case "dec$":
return acc - 1
case "resetTo$":
return current.payload
default:
return acc
}
}, 0),
startWith(0),
)
selfDependant
A creation operator that helps at creating observables that have circular dependencies
const [_resetableCounter$, connectResetableCounter] = selfDependant<number>()
const clicks$ = new Subject()
const inc$ = clicks$.pipe(
withLatestFrom(_resetableCounter$),
map((_, x) => x + 1),
share(),
)
const delayedZero$ = of(0).pipe(delay(10_000))
const reset$ = inc$.pipe(switchMapTo(delayedZero$))
const resetableCounter$ = merge(inc$, reset$, of(0)).pipe(
connectResetableCounter(),
)
suspend
const story$ = selectedStoryId$.pipe(
switchMap(id => suspend(getStory$(id))
)
A RxJS creation operator that prepends a SUSPENSE on the source observable.
suspended
const story$ = selectedStoryId$.pipe(
switchMap((id) => getStory$(id).pipe(suspended())),
)
The pipeable version of suspend
switchMapSuspended
const story$ = selectedStoryId$.pipe(switchMapSuspended(getStory$))
Like switchMap but applying a startWith(SUSPENSE) to the inner observable.