connectGroupedObservable improvements

This commit is contained in:
Josep M Sobrepere 2020-06-02 22:46:26 +02:00
parent b5cdd80fce
commit 306b6b257e

View File

@ -1,6 +1,5 @@
import { useMemo } from "react"
import { Observable, GroupedObservable } from "rxjs"
import { map, filter, take, concatMap, shareReplay } from "rxjs/operators"
import { Observable, GroupedObservable, Subject } from "rxjs"
import { filter, take, concatMap } from "rxjs/operators"
import distinctShareReplay from "./operators/distinct-share-replay"
import { FactoryObservableOptions, defaultFactoryOptions } from "./options"
import useSharedReplayableObservable from "./useSharedReplayableObservable"
@ -9,52 +8,55 @@ const connectGroupedObservable = <K, O, I>(
source$: Observable<GroupedObservable<K, O>>,
initialValue: I,
_options?: FactoryObservableOptions<O>,
): [
(key: K) => I | O,
() => () => void,
(key: K) => GroupedObservable<K, O>,
] => {
): [(key: K) => I | O, () => () => void, (key: K) => Observable<O>] => {
const options = {
...defaultFactoryOptions,
..._options,
}
const observables = new Map<K, Observable<O>>()
const activeObservables$ = source$.pipe(
map(x => {
observables.set(
x.key,
x.pipe(
distinctShareReplay(options.compare, () => observables.delete(x.key)),
),
)
return observables
}),
shareReplay(1),
)
const cache = new Map<K, GroupedObservable<K, O>>()
const innerSubject$ = new Subject<GroupedObservable<K, O>>()
const subscribe = () => {
const subscription = source$
.subscribe(inner$ => {
const enhanced$ = inner$.pipe(
distinctShareReplay(options.compare, () => cache.delete(inner$.key)),
) as GroupedObservable<K, O>
enhanced$.key = inner$.key
if (cache.has(inner$.key)) {
innerSubject$.next(enhanced$)
} else {
cache.set(inner$.key, enhanced$)
}
})
.add(() => {
cache.clear()
})
return () => subscription.unsubscribe()
}
const getObservableByKey = (key: K) => {
const result = activeObservables$.pipe(
filter(x => x.has(key)),
let result = cache.get(key)
if (result) {
return result
}
result = innerSubject$.pipe(
filter(inner$ => inner$.key === key),
take(1),
concatMap(x => x.get(key)!),
concatMap(x => x),
) as GroupedObservable<K, O>
result.key = key
cache.set(key, result)
return result
}
const hook = (key: K) =>
useSharedReplayableObservable(
useMemo(() => getObservableByKey(key), [key]),
getObservableByKey(key),
initialValue,
options,
)
const getGroupSubscription = () => {
const subscription = activeObservables$.subscribe()
return () => subscription.unsubscribe()
}
return [hook, getGroupSubscription, getObservableByKey]
return [hook, subscribe, getObservableByKey]
}
export default connectGroupedObservable