diff --git a/src/connectFactoryObservable.ts b/src/connectFactoryObservable.ts index f9a89cb..11ccc6c 100644 --- a/src/connectFactoryObservable.ts +++ b/src/connectFactoryObservable.ts @@ -1,5 +1,7 @@ import { Observable, NEVER, concat } from "rxjs" -import distinctShareReplay from "./operators/distinct-share-replay" +import distinctShareReplay, { + BehaviorObservable, +} from "./operators/distinct-share-replay" import { FactoryObservableOptions, defaultFactoryOptions } from "./options" import useObservable from "./useObservable" @@ -9,7 +11,6 @@ export function connectFactoryObservable< O >( getObservable: (...args: A) => Observable, - initialValue: I, _options?: FactoryObservableOptions, ): [(...args: A) => O | I, (...args: A) => Observable] { const options = { @@ -17,9 +18,9 @@ export function connectFactoryObservable< ..._options, } - const cache = new Map>() + const cache = new Map>() - const getSharedObservable$ = (...input: A): Observable => { + const getSharedObservable$ = (...input: A): BehaviorObservable => { const key = JSON.stringify(input) const cachedVal = cache.get(key) @@ -27,11 +28,9 @@ export function connectFactoryObservable< return cachedVal } - const reactObservable$ = concat(getObservable(...input), NEVER).pipe( - distinctShareReplay(options.compare, () => { - cache.delete(key) - }), - ) + const reactObservable$ = distinctShareReplay(options.compare, () => { + cache.delete(key) + })(concat(getObservable(...input), NEVER)) cache.set(key, reactObservable$) return reactObservable$ @@ -39,7 +38,10 @@ export function connectFactoryObservable< return [ (...input: A) => - useObservable(getSharedObservable$(...input), initialValue, options), + useObservable( + getSharedObservable$(...input), + options.unsubscribeGraceTime, + ), getSharedObservable$, ] diff --git a/src/connectInstanceObservable.ts b/src/connectInstanceObservable.ts index 0147f1f..0c78de2 100644 --- a/src/connectInstanceObservable.ts +++ b/src/connectInstanceObservable.ts @@ -86,7 +86,7 @@ export const connectInstanceObservable: ConnectInstanceObservable = ( flatSingleTuple, getObservable, distinctUntilChanged(options.compare), - delayUnsubscription(options.unsubscribeGraceTime), + delayUnsubscription(options.unsubscribeGraceTime) as any, ) .subscribe(setState) subject.next(input) diff --git a/src/connectObservable.ts b/src/connectObservable.ts index 68fd36a..1d3f66a 100644 --- a/src/connectObservable.ts +++ b/src/connectObservable.ts @@ -3,22 +3,21 @@ import distinctShareReplay from "./operators/distinct-share-replay" import { StaticObservableOptions, defaultStaticOptions } from "./options" import useObservable from "./useObservable" -export function connectObservable( - observable: Observable, - initialValue: IO, - _options?: StaticObservableOptions, +export function connectObservable( + observable: Observable, + _options?: StaticObservableOptions, ) { const options = { ...defaultStaticOptions, ..._options, suspenseTime: Infinity, } - const sharedObservable$ = concat(observable, NEVER).pipe( - distinctShareReplay(options.compare), + const sharedObservable$ = distinctShareReplay(options.compare)( + concat(observable, NEVER), ) const useStaticObservable = () => - useObservable(sharedObservable$, initialValue, options) + useObservable(sharedObservable$, options.unsubscribeGraceTime) return [useStaticObservable, sharedObservable$] as const } diff --git a/src/operators/delay-unsubscription.ts b/src/operators/delay-unsubscription.ts index db56259..9392d88 100644 --- a/src/operators/delay-unsubscription.ts +++ b/src/operators/delay-unsubscription.ts @@ -1,16 +1,19 @@ -import { Observable, of, Subscription } from "rxjs" -import { delay } from "rxjs/operators" +import { Observable, of, Subscription, Subject, race } from "rxjs" +import { delay, takeUntil, take } from "rxjs/operators" +import { BehaviorObservable } from "./distinct-share-replay" +const IS_SSR = + typeof window === "undefined" || + typeof window.document === "undefined" || + typeof window.document.createElement === "undefined" const noop = Function.prototype as () => void const delayUnsubscription = (delayTime: number) => ( - source$: Observable, -): Observable => { - if (delayTime === 0) { - return source$ - } + source$: BehaviorObservable, +): BehaviorObservable => { let finalizeLastUnsubscription = noop - return new Observable(subscriber => { + const onSubscribe = new Subject() + const result = new Observable(subscriber => { let isActive = true const subscription = source$.subscribe({ next(value) { @@ -25,6 +28,7 @@ const delayUnsubscription = (delayTime: number) => ( subscriber.complete() }, }) + onSubscribe.next() finalizeLastUnsubscription() return () => { finalizeLastUnsubscription() @@ -44,7 +48,23 @@ const delayUnsubscription = (delayTime: number) => ( finalizeLastUnsubscription = noop } } - }) + }) as BehaviorObservable + + const getValue = () => { + try { + source$.getValue() + } catch (e) { + if (!IS_SSR) { + source$ + .pipe(takeUntil(race(onSubscribe, of(true).pipe(delay(60000))))) + .subscribe() + } + throw source$.pipe(take(1)).toPromise() + } + } + + result.getValue = getValue as () => T + return result } export default delayUnsubscription diff --git a/src/operators/distinct-share-replay.ts b/src/operators/distinct-share-replay.ts index 81a4d17..9365b1f 100644 --- a/src/operators/distinct-share-replay.ts +++ b/src/operators/distinct-share-replay.ts @@ -3,17 +3,21 @@ import { Observable, Subscription, Subject } from "rxjs" const defaultCompare = (a: any, b: any) => a === b function defaultTeardown() {} +export interface BehaviorObservable extends Observable { + getValue: () => T +} + const EMPTY_VALUE: any = {} const distinctShareReplay = ( compareFn: (a: T, b: T) => boolean = defaultCompare, teardown = defaultTeardown, -) => (source$: Observable): Observable => { +) => (source$: Observable): BehaviorObservable => { let subject: Subject | undefined let subscription: Subscription | undefined let refCount = 0 let currentValue: { value: T } - return new Observable(subscriber => { + const result = new Observable(subscriber => { refCount++ if (!subject) { currentValue = { value: EMPTY_VALUE } @@ -56,7 +60,16 @@ const distinctShareReplay = ( } } } - }) + }) as BehaviorObservable + + result.getValue = () => { + if (currentValue.value === EMPTY_VALUE) { + throw null + } + return currentValue.value + } + + return result } export default distinctShareReplay diff --git a/src/options.ts b/src/options.ts index 85af988..977ff19 100644 --- a/src/options.ts +++ b/src/options.ts @@ -3,7 +3,7 @@ export interface StaticObservableOptions { compare?: (a: T, b: T) => boolean } export const defaultStaticOptions = { - unsubscribeGraceTime: 120, + unsubscribeGraceTime: 200, compare: (a: any, b: any) => a === b, } diff --git a/src/useObservable.ts b/src/useObservable.ts index e0251ce..7b14ed5 100644 --- a/src/useObservable.ts +++ b/src/useObservable.ts @@ -1,36 +1,41 @@ import { useState, useLayoutEffect } from "react" -import { Observable } from "rxjs" import delayUnsubscription from "./operators/delay-unsubscription" -import { defaultFactoryOptions, ObservableOptions } from "./options" +import { BehaviorObservable } from "operators/distinct-share-replay" -const useObservable = ( - source$: Observable, - initialValue: I, - options?: ObservableOptions, -) => { - const { suspenseTime, unsubscribeGraceTime } = { - ...defaultFactoryOptions, - ...options, +const cache = new WeakMap< + BehaviorObservable, + [BehaviorObservable, number] +>() +const getEnhancedSource = ( + source$: BehaviorObservable, + graceTime: number, +): BehaviorObservable => { + let [result, prevGraceTime] = cache.get(source$) ?? [] + if (result && prevGraceTime === graceTime) { + return result } - const [state, setState] = useState(initialValue) + result = delayUnsubscription(graceTime)(source$) + cache.set(source$, [result, graceTime]) + return result +} + +const defaultValue: any = {} +const useObservable = ( + source$: BehaviorObservable, + unsubscribeGraceTime: number, +) => { + const [state, setState] = useState(defaultValue) useLayoutEffect(() => { - let timeoutToken = - suspenseTime === Infinity - ? undefined - : setTimeout(setState, suspenseTime, initialValue) - - const subscription = delayUnsubscription(unsubscribeGraceTime)( - source$, - ).subscribe(nextState => { - setState(nextState as any) - timeoutToken = timeoutToken && (clearTimeout(timeoutToken) as undefined) - }) - + const enhancedSource$ = getEnhancedSource(source$, unsubscribeGraceTime) + setState(enhancedSource$.getValue()) + const subscription = enhancedSource$.subscribe(setState) return () => subscription.unsubscribe() - }, [source$, suspenseTime, unsubscribeGraceTime]) + }, [source$, unsubscribeGraceTime]) - return state + return state !== defaultValue + ? state + : getEnhancedSource(source$, unsubscribeGraceTime).getValue() } export default useObservable