mirror of
https://github.com/re-rxjs/react-rxjs.git
synced 2025-12-08 18:01:51 +00:00
suspense API WIP
This commit is contained in:
parent
ad1c39e19c
commit
40b07f97f4
@ -1,5 +1,7 @@
|
||||
import { Observable, NEVER, concat } from "rxjs"
|
||||
import distinctShareReplay from "./operators/distinct-share-replay"
|
||||
import distinctShareReplay, {
|
||||
BehaviorObservable,
|
||||
} from "./operators/distinct-share-replay"
|
||||
import { FactoryObservableOptions, defaultFactoryOptions } from "./options"
|
||||
import useObservable from "./useObservable"
|
||||
|
||||
@ -9,7 +11,6 @@ export function connectFactoryObservable<
|
||||
O
|
||||
>(
|
||||
getObservable: (...args: A) => Observable<O>,
|
||||
initialValue: I,
|
||||
_options?: FactoryObservableOptions<O>,
|
||||
): [(...args: A) => O | I, (...args: A) => Observable<O>] {
|
||||
const options = {
|
||||
@ -17,9 +18,9 @@ export function connectFactoryObservable<
|
||||
..._options,
|
||||
}
|
||||
|
||||
const cache = new Map<string, Observable<O>>()
|
||||
const cache = new Map<string, BehaviorObservable<O>>()
|
||||
|
||||
const getSharedObservable$ = (...input: A): Observable<O> => {
|
||||
const getSharedObservable$ = (...input: A): BehaviorObservable<O> => {
|
||||
const key = JSON.stringify(input)
|
||||
const cachedVal = cache.get(key)
|
||||
|
||||
@ -27,11 +28,9 @@ export function connectFactoryObservable<
|
||||
return cachedVal
|
||||
}
|
||||
|
||||
const reactObservable$ = concat(getObservable(...input), NEVER).pipe(
|
||||
distinctShareReplay(options.compare, () => {
|
||||
cache.delete(key)
|
||||
}),
|
||||
)
|
||||
const reactObservable$ = distinctShareReplay(options.compare, () => {
|
||||
cache.delete(key)
|
||||
})(concat(getObservable(...input), NEVER))
|
||||
|
||||
cache.set(key, reactObservable$)
|
||||
return reactObservable$
|
||||
@ -39,7 +38,10 @@ export function connectFactoryObservable<
|
||||
|
||||
return [
|
||||
(...input: A) =>
|
||||
useObservable(getSharedObservable$(...input), initialValue, options),
|
||||
useObservable(
|
||||
getSharedObservable$(...input),
|
||||
options.unsubscribeGraceTime,
|
||||
),
|
||||
|
||||
getSharedObservable$,
|
||||
]
|
||||
|
||||
@ -86,7 +86,7 @@ export const connectInstanceObservable: ConnectInstanceObservable = (
|
||||
flatSingleTuple,
|
||||
getObservable,
|
||||
distinctUntilChanged(options.compare),
|
||||
delayUnsubscription(options.unsubscribeGraceTime),
|
||||
delayUnsubscription(options.unsubscribeGraceTime) as any,
|
||||
)
|
||||
.subscribe(setState)
|
||||
subject.next(input)
|
||||
|
||||
@ -3,22 +3,21 @@ import distinctShareReplay from "./operators/distinct-share-replay"
|
||||
import { StaticObservableOptions, defaultStaticOptions } from "./options"
|
||||
import useObservable from "./useObservable"
|
||||
|
||||
export function connectObservable<O, IO>(
|
||||
observable: Observable<O>,
|
||||
initialValue: IO,
|
||||
_options?: StaticObservableOptions<O>,
|
||||
export function connectObservable<T>(
|
||||
observable: Observable<T>,
|
||||
_options?: StaticObservableOptions<T>,
|
||||
) {
|
||||
const options = {
|
||||
...defaultStaticOptions,
|
||||
..._options,
|
||||
suspenseTime: Infinity,
|
||||
}
|
||||
const sharedObservable$ = concat(observable, NEVER).pipe(
|
||||
distinctShareReplay(options.compare),
|
||||
const sharedObservable$ = distinctShareReplay(options.compare)(
|
||||
concat(observable, NEVER),
|
||||
)
|
||||
|
||||
const useStaticObservable = () =>
|
||||
useObservable(sharedObservable$, initialValue, options)
|
||||
useObservable(sharedObservable$, options.unsubscribeGraceTime)
|
||||
|
||||
return [useStaticObservable, sharedObservable$] as const
|
||||
}
|
||||
|
||||
@ -1,16 +1,19 @@
|
||||
import { Observable, of, Subscription } from "rxjs"
|
||||
import { delay } from "rxjs/operators"
|
||||
import { Observable, of, Subscription, Subject, race } from "rxjs"
|
||||
import { delay, takeUntil, take } from "rxjs/operators"
|
||||
import { BehaviorObservable } from "./distinct-share-replay"
|
||||
|
||||
const IS_SSR =
|
||||
typeof window === "undefined" ||
|
||||
typeof window.document === "undefined" ||
|
||||
typeof window.document.createElement === "undefined"
|
||||
const noop = Function.prototype as () => void
|
||||
|
||||
const delayUnsubscription = <T>(delayTime: number) => (
|
||||
source$: Observable<T>,
|
||||
): Observable<T> => {
|
||||
if (delayTime === 0) {
|
||||
return source$
|
||||
}
|
||||
source$: BehaviorObservable<T>,
|
||||
): BehaviorObservable<T> => {
|
||||
let finalizeLastUnsubscription = noop
|
||||
return new Observable<T>(subscriber => {
|
||||
const onSubscribe = new Subject()
|
||||
const result = new Observable<T>(subscriber => {
|
||||
let isActive = true
|
||||
const subscription = source$.subscribe({
|
||||
next(value) {
|
||||
@ -25,6 +28,7 @@ const delayUnsubscription = <T>(delayTime: number) => (
|
||||
subscriber.complete()
|
||||
},
|
||||
})
|
||||
onSubscribe.next()
|
||||
finalizeLastUnsubscription()
|
||||
return () => {
|
||||
finalizeLastUnsubscription()
|
||||
@ -44,7 +48,23 @@ const delayUnsubscription = <T>(delayTime: number) => (
|
||||
finalizeLastUnsubscription = noop
|
||||
}
|
||||
}
|
||||
})
|
||||
}) as BehaviorObservable<T>
|
||||
|
||||
const getValue = () => {
|
||||
try {
|
||||
source$.getValue()
|
||||
} catch (e) {
|
||||
if (!IS_SSR) {
|
||||
source$
|
||||
.pipe(takeUntil(race(onSubscribe, of(true).pipe(delay(60000)))))
|
||||
.subscribe()
|
||||
}
|
||||
throw source$.pipe(take(1)).toPromise()
|
||||
}
|
||||
}
|
||||
|
||||
result.getValue = getValue as () => T
|
||||
return result
|
||||
}
|
||||
|
||||
export default delayUnsubscription
|
||||
|
||||
@ -3,17 +3,21 @@ import { Observable, Subscription, Subject } from "rxjs"
|
||||
const defaultCompare = (a: any, b: any) => a === b
|
||||
function defaultTeardown() {}
|
||||
|
||||
export interface BehaviorObservable<T> extends Observable<T> {
|
||||
getValue: () => T
|
||||
}
|
||||
|
||||
const EMPTY_VALUE: any = {}
|
||||
const distinctShareReplay = <T>(
|
||||
compareFn: (a: T, b: T) => boolean = defaultCompare,
|
||||
teardown = defaultTeardown,
|
||||
) => (source$: Observable<T>): Observable<T> => {
|
||||
) => (source$: Observable<T>): BehaviorObservable<T> => {
|
||||
let subject: Subject<T> | undefined
|
||||
let subscription: Subscription | undefined
|
||||
let refCount = 0
|
||||
let currentValue: { value: T }
|
||||
|
||||
return new Observable<T>(subscriber => {
|
||||
const result = new Observable<T>(subscriber => {
|
||||
refCount++
|
||||
if (!subject) {
|
||||
currentValue = { value: EMPTY_VALUE }
|
||||
@ -56,7 +60,16 @@ const distinctShareReplay = <T>(
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}) as BehaviorObservable<T>
|
||||
|
||||
result.getValue = () => {
|
||||
if (currentValue.value === EMPTY_VALUE) {
|
||||
throw null
|
||||
}
|
||||
return currentValue.value
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
export default distinctShareReplay
|
||||
|
||||
@ -3,7 +3,7 @@ export interface StaticObservableOptions<T> {
|
||||
compare?: (a: T, b: T) => boolean
|
||||
}
|
||||
export const defaultStaticOptions = {
|
||||
unsubscribeGraceTime: 120,
|
||||
unsubscribeGraceTime: 200,
|
||||
compare: (a: any, b: any) => a === b,
|
||||
}
|
||||
|
||||
|
||||
@ -1,36 +1,41 @@
|
||||
import { useState, useLayoutEffect } from "react"
|
||||
import { Observable } from "rxjs"
|
||||
import delayUnsubscription from "./operators/delay-unsubscription"
|
||||
import { defaultFactoryOptions, ObservableOptions } from "./options"
|
||||
import { BehaviorObservable } from "operators/distinct-share-replay"
|
||||
|
||||
const useObservable = <O, I>(
|
||||
source$: Observable<O>,
|
||||
initialValue: I,
|
||||
options?: ObservableOptions,
|
||||
) => {
|
||||
const { suspenseTime, unsubscribeGraceTime } = {
|
||||
...defaultFactoryOptions,
|
||||
...options,
|
||||
const cache = new WeakMap<
|
||||
BehaviorObservable<any>,
|
||||
[BehaviorObservable<any>, number]
|
||||
>()
|
||||
const getEnhancedSource = <T>(
|
||||
source$: BehaviorObservable<T>,
|
||||
graceTime: number,
|
||||
): BehaviorObservable<T> => {
|
||||
let [result, prevGraceTime] = cache.get(source$) ?? []
|
||||
if (result && prevGraceTime === graceTime) {
|
||||
return result
|
||||
}
|
||||
const [state, setState] = useState<I | O>(initialValue)
|
||||
result = delayUnsubscription(graceTime)(source$)
|
||||
cache.set(source$, [result, graceTime])
|
||||
return result
|
||||
}
|
||||
|
||||
const defaultValue: any = {}
|
||||
const useObservable = <O>(
|
||||
source$: BehaviorObservable<O>,
|
||||
unsubscribeGraceTime: number,
|
||||
) => {
|
||||
const [state, setState] = useState<O>(defaultValue)
|
||||
|
||||
useLayoutEffect(() => {
|
||||
let timeoutToken =
|
||||
suspenseTime === Infinity
|
||||
? undefined
|
||||
: setTimeout(setState, suspenseTime, initialValue)
|
||||
|
||||
const subscription = delayUnsubscription(unsubscribeGraceTime)(
|
||||
source$,
|
||||
).subscribe(nextState => {
|
||||
setState(nextState as any)
|
||||
timeoutToken = timeoutToken && (clearTimeout(timeoutToken) as undefined)
|
||||
})
|
||||
|
||||
const enhancedSource$ = getEnhancedSource(source$, unsubscribeGraceTime)
|
||||
setState(enhancedSource$.getValue())
|
||||
const subscription = enhancedSource$.subscribe(setState)
|
||||
return () => subscription.unsubscribe()
|
||||
}, [source$, suspenseTime, unsubscribeGraceTime])
|
||||
}, [source$, unsubscribeGraceTime])
|
||||
|
||||
return state
|
||||
return state !== defaultValue
|
||||
? state
|
||||
: getEnhancedSource(source$, unsubscribeGraceTime).getValue()
|
||||
}
|
||||
|
||||
export default useObservable
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user