@react-rxjs/core@v0.6.0

This commit is contained in:
Josep M Sobrepere 2020-10-29 17:37:38 +01:00
parent 9f5f6d6d8b
commit 90e6a00920
14 changed files with 296 additions and 228 deletions

View File

@ -1,5 +1,5 @@
{
"version": "0.5.0",
"version": "0.6.0",
"repository": {
"type": "git",
"url": "git+https://github.com/re-rxjs/react-rxjs.git"

View File

@ -1,23 +1,26 @@
import React from "react"
import { render } from "@testing-library/react"
import { defer, Subject } from "rxjs"
import { share, finalize } from "rxjs/operators"
import { Subscribe } from "./"
import { Observable } from "rxjs"
import { Subscribe, bind } from "./"
describe("Subscribe", () => {
it("subscribes to the provided observable and remains subscribed until it's unmounted", () => {
let nSubscriptions = 0
const source$ = defer(() => {
nSubscriptions++
return new Subject()
}).pipe(
finalize(() => {
nSubscriptions--
const [useNumber, number$] = bind(
new Observable<number>(() => {
nSubscriptions++
return () => {
nSubscriptions--
}
}),
share(),
)
const TestSubscribe: React.FC = () => <Subscribe source$={source$} />
const Number: React.FC = () => <>{useNumber()}</>
const TestSubscribe: React.FC = () => (
<Subscribe source$={number$}>
<Number />
</Subscribe>
)
expect(nSubscriptions).toBe(0)

View File

@ -1,5 +1,10 @@
import React, { useState, useEffect } from "react"
import { Observable } from "rxjs"
import React, { useState, Suspense, useLayoutEffect, ReactNode } from "react"
import { Observable, noop } from "rxjs"
const p = Promise.resolve()
const Throw = () => {
throw p
}
/**
* A React Component that creates a subscription to the provided observable once
@ -12,15 +17,27 @@ import { Observable } from "rxjs"
*/
export const Subscribe: React.FC<{
source$: Observable<any>
fallback?: null | JSX.Element
fallback?: NonNullable<ReactNode> | null
}> = ({ source$, children, fallback }) => {
const [mounted, setMounted] = useState(0)
useEffect(() => {
const subscription = source$.subscribe()
const [mounted, setMounted] = useState(() => {
try {
;(source$ as any).gV()
return 1
} catch (e) {
return e.then ? 1 : 0
}
})
useLayoutEffect(() => {
const subscription = source$.subscribe(noop, (e) =>
setMounted(() => {
throw e
}),
)
setMounted(1)
return () => {
subscription.unsubscribe()
}
}, [source$])
return <>{mounted ? children : fallback}</>
const fBack = fallback || null
return <Suspense fallback={fBack}>{mounted ? children : <Throw />}</Suspense>
}

View File

@ -3,14 +3,14 @@ import {
of,
defer,
concat,
BehaviorSubject,
throwError,
Observable,
Subject,
merge,
} from "rxjs"
import { renderHook, act as actHook } from "@testing-library/react-hooks"
import { switchMap, delay, take, catchError, map } from "rxjs/operators"
import { FC, Suspense, useState } from "react"
import { delay, take, catchError, map, switchMapTo } from "rxjs/operators"
import { FC, useState } from "react"
import React from "react"
import {
act as componentAct,
@ -18,7 +18,7 @@ import {
screen,
render,
} from "@testing-library/react"
import { bind } from "../"
import { bind, Subscribe } from "../"
import { TestErrorBoundary } from "../test-helpers/TestErrorBoundary"
const wait = (ms: number) => new Promise((res) => setTimeout(res, ms))
@ -42,8 +42,8 @@ describe("connectFactoryObservable", () => {
})
describe("hook", () => {
it("returns the latest emitted value", async () => {
const valueStream = new BehaviorSubject(1)
const [useNumber] = bind(() => valueStream)
const valueStream = new Subject<number>()
const [useNumber] = bind(() => valueStream, 1)
const { result } = renderHook(() => useNumber())
expect(result.current).toBe(1)
@ -56,13 +56,15 @@ describe("connectFactoryObservable", () => {
it("suspends the component when the observable hasn't emitted yet.", async () => {
const source$ = of(1).pipe(delay(100))
const [useDelayedNumber, getDelayedNumber$] = bind(() => source$)
const subs = getDelayedNumber$().subscribe()
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
<Suspense fallback={<span>Waiting</span>}>
<Subscribe
source$={getDelayedNumber$()}
fallback={<span>Waiting</span>}
>
<Result />
</Suspense>
</Subscribe>
)
}
@ -75,7 +77,51 @@ describe("connectFactoryObservable", () => {
expect(screen.queryByText("Result 1")).not.toBeNull()
expect(screen.queryByText("Waiting")).toBeNull()
subs.unsubscribe()
})
it("synchronously mounts the emitted value if the observable emits synchronously", () => {
const source$ = of(1)
const [useDelayedNumber, getDelayedNumber$] = bind(() => source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
<Subscribe
source$={getDelayedNumber$()}
fallback={<span>Waiting</span>}
>
<Result />
</Subscribe>
)
}
render(<TestSuspense />)
expect(screen.queryByText("Result 1")).not.toBeNull()
expect(screen.queryByText("Waiting")).toBeNull()
})
it("doesn't mount the fallback element if the subscription is already active", () => {
const source$ = new Subject<number>()
const [useDelayedNumber, getDelayedNumber$] = bind(() => source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
<Subscribe
source$={getDelayedNumber$()}
fallback={<span>Waiting</span>}
>
<Result />
</Subscribe>
)
}
const subscription = getDelayedNumber$().subscribe()
source$.next(1)
render(<TestSuspense />)
expect(screen.queryByText("Result 1")).not.toBeNull()
expect(screen.queryByText("Waiting")).toBeNull()
subscription.unsubscribe()
})
it("shares the multicasted subscription with all of the components that use the same parameters", async () => {
@ -114,7 +160,12 @@ describe("connectFactoryObservable", () => {
})
it("returns the value of next new Observable when the arguments change", () => {
const [useNumber] = bind((x: number) => of(x))
const [useNumber, getNumber$] = bind((x: number) => of(x))
const subs = merge(
getNumber$(0),
getNumber$(1),
getNumber$(2),
).subscribe()
const { result, rerender } = renderHook(({ input }) => useNumber(input), {
initialProps: { input: 0 },
})
@ -129,6 +180,7 @@ describe("connectFactoryObservable", () => {
rerender({ input: 2 })
})
expect(result.current).toBe(2)
subs.unsubscribe()
})
it("handles optional args correctly", () => {
@ -149,9 +201,12 @@ describe("connectFactoryObservable", () => {
const [input, setInput] = useState(0)
return (
<>
<Suspense fallback={<span>Waiting</span>}>
<Subscribe
source$={getDelayedNumber$(input)}
fallback={<span>Waiting</span>}
>
<Result input={input} />
</Suspense>
</Subscribe>
<button onClick={() => setInput((x) => x + 1)}>increase</button>
</>
)
@ -223,8 +278,8 @@ describe("connectFactoryObservable", () => {
})
it("allows errors to be caught in error boundaries", () => {
const errStream = new BehaviorSubject(1)
const [useError] = bind(() => errStream)
const errStream = new Subject()
const [useError] = bind(() => errStream, 1)
const ErrorComponent = () => {
const value = useError()
@ -253,7 +308,7 @@ describe("connectFactoryObservable", () => {
const errStream = new Observable((observer) =>
observer.error("controlled error"),
)
const [useError] = bind((_: string) => errStream)
const [useError, getErrStream$] = bind((_: string) => errStream)
const ErrorComponent = () => {
const value = useError("foo")
@ -264,9 +319,12 @@ describe("connectFactoryObservable", () => {
const errorCallback = jest.fn()
const { unmount } = render(
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<Subscribe
source$={getErrStream$("foo")}
fallback={<div>Loading...</div>}
>
<ErrorComponent />
</Suspense>
</Subscribe>
</TestErrorBoundary>,
)
@ -279,7 +337,7 @@ describe("connectFactoryObservable", () => {
it("allows async errors to be caught in error boundaries with suspense", async () => {
const errStream = new Subject()
const [useError] = bind((_: string) => errStream)
const [useError, getErrStream$] = bind((_: string) => errStream)
const ErrorComponent = () => {
const value = useError("foo")
@ -290,9 +348,12 @@ describe("connectFactoryObservable", () => {
const errorCallback = jest.fn()
const { unmount } = render(
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<Subscribe
source$={getErrStream$("foo")}
fallback={<div>Loading...</div>}
>
<ErrorComponent />
</Suspense>
</Subscribe>
</TestErrorBoundary>,
)
@ -325,19 +386,24 @@ describe("connectFactoryObservable", () => {
.pipe(catchError(() => []))
.subscribe()
const Ok: React.FC<{ ok: boolean }> = ({ ok }) => <>{useOkKo(ok)}</>
const ErrorComponent = () => {
const [ok, setOk] = useState(true)
const value = useOkKo(ok)
return <span onClick={() => setOk(false)}>{value}</span>
return (
<Subscribe source$={getObs$(ok)} fallback={<div>Loading...</div>}>
<span onClick={() => setOk(false)}>
<Ok ok={ok} />
</span>
</Subscribe>
)
}
const errorCallback = jest.fn()
const { unmount } = render(
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<ErrorComponent />
</Suspense>
<ErrorComponent />
</TestErrorBoundary>,
)
@ -367,12 +433,11 @@ describe("connectFactoryObservable", () => {
)
it("doesn't throw errors on components that will get unmounted on the next cycle", () => {
const valueStream = new BehaviorSubject(1)
const [useValue, value$] = bind(() => valueStream)
const [useError] = bind(() =>
value$().pipe(
switchMap((v) => (v === 1 ? of(v) : throwError("error"))),
),
const valueStream = new Subject<number>()
const [useValue, value$] = bind(() => valueStream, 1)
const [useError] = bind(
() => value$().pipe(switchMapTo(throwError("error"))),
1,
)
const ErrorComponent: FC = () => {
@ -403,30 +468,6 @@ describe("connectFactoryObservable", () => {
expect(errorCallback).not.toHaveBeenCalled()
})
it("does not resubscribe to an observable that emits synchronously and that does not have a top-level subscription after a re-render", () => {
let nTopSubscriptions = 0
const [useNTopSubscriptions] = bind((id: number) =>
defer(() => {
return of(++nTopSubscriptions + id)
}),
)
const { result, rerender, unmount } = renderHook(() =>
useNTopSubscriptions(0),
)
expect(result.current).toBe(2)
actHook(() => {
rerender()
})
expect(result.current).toBe(2)
expect(nTopSubscriptions).toBe(2)
unmount()
})
it("if the observable hasn't emitted and a defaultValue is provided, it does not start suspense", () => {
const number$ = new Subject<number>()
const [useNumber] = bind(

View File

@ -1,9 +1,7 @@
import { Observable } from "rxjs"
import shareLatest from "../internal/share-latest"
import reactEnhancer from "../internal/react-enhancer"
import { BehaviorObservable } from "../internal/BehaviorObservable"
import { useObservable } from "../internal/useObservable"
import { EMPTY_VALUE } from "../internal/empty-value"
import { SUSPENSE } from "../SUSPENSE"
/**
@ -27,16 +25,14 @@ import { SUSPENSE } from "../SUSPENSE"
*/
export default function connectFactoryObservable<A extends [], O>(
getObservable: (...args: A) => Observable<O>,
defaultValue: O = EMPTY_VALUE,
defaultValue: O,
): [
(...args: A) => Exclude<O, typeof SUSPENSE>,
(...args: A) => Observable<O>,
] {
const cache = new NestedMap<A, [BehaviorObservable<O>, () => O]>()
const cache = new NestedMap<A, BehaviorObservable<O>>()
const getSharedObservables$ = (
input: A,
): [BehaviorObservable<O>, () => O] => {
const getSharedObservables$ = (input: A): BehaviorObservable<O> => {
for (let i = input.length - 1; input[i] === undefined && i > -1; i--) {
input.splice(-1)
}
@ -50,6 +46,7 @@ export default function connectFactoryObservable<A extends [], O>(
const sharedObservable$ = shareLatest(
getObservable(...input),
false,
defaultValue,
() => {
cache.delete(keys)
},
@ -61,31 +58,25 @@ export default function connectFactoryObservable<A extends [], O>(
if (!inCache) {
cache.set(keys, result)
} else if (inCache[0] !== publicShared$) {
source$ = inCache[0]
publicShared$.getValue = source$.getValue
} else if (inCache !== publicShared$) {
source$ = inCache
publicShared$.gV = source$.gV
}
return source$.subscribe(subscriber)
}) as BehaviorObservable<O>
publicShared$.getValue = sharedObservable$.getValue
const reactGetValue = reactEnhancer(publicShared$, defaultValue)
publicShared$.gV = sharedObservable$.gV
const result: [BehaviorObservable<O>, () => O] = [
publicShared$,
reactGetValue,
]
const result: BehaviorObservable<O> = publicShared$
cache.set(keys, result)
return result
}
return [
(...input: A) => {
const [source$, getValue] = getSharedObservables$(input)
return useObservable(source$, getValue, input)
},
(...input: A) => getSharedObservables$(input)[0],
(...input: A) =>
useObservable(getSharedObservables$(input), input, defaultValue),
(...input: A) => getSharedObservables$(input),
]
}

View File

@ -7,7 +7,6 @@ import {
import { act, renderHook } from "@testing-library/react-hooks"
import React, { Suspense, useEffect, FC, StrictMode } from "react"
import {
BehaviorSubject,
defer,
from,
of,
@ -15,17 +14,17 @@ import {
throwError,
Observable,
merge,
NEVER,
EMPTY,
} from "rxjs"
import {
delay,
scan,
startWith,
map,
switchMap,
catchError,
switchMapTo,
} from "rxjs/operators"
import { bind, SUSPENSE } from "../"
import { bind, SUSPENSE, Subscribe } from "../"
import { TestErrorBoundary } from "../test-helpers/TestErrorBoundary"
const wait = (ms: number) => new Promise((res) => setTimeout(res, ms))
@ -37,6 +36,8 @@ describe("connectObservable", () => {
if (
/Warning.*not wrapped in act/.test(args[0]) ||
/Uncaught 'controlled error'/.test(args[0]) ||
/Missing subscription/.test(args[0]) ||
/Empty observable/.test(args[0]) ||
/using the error boundary .* TestErrorBoundary/.test(args[0])
) {
return
@ -51,10 +52,12 @@ describe("connectObservable", () => {
it("sets the initial state synchronously if it's available", async () => {
const observable$ = of(1)
const [useLatestNumber] = bind(observable$)
const [useLatestNumber, latestNumber$] = bind(observable$)
const subs = latestNumber$.subscribe()
const { result } = renderHook(() => useLatestNumber())
expect(result.current).toEqual(1)
subs.unsubscribe()
})
it("suspends the component when the observable hasn't emitted yet.", async () => {
@ -108,8 +111,8 @@ describe("connectObservable", () => {
})
it("updates with the last emitted value", async () => {
const numberStream = new BehaviorSubject(1)
const [useNumber] = bind(numberStream)
const numberStream = new Subject<number>()
const [useNumber] = bind(numberStream, 1)
const { result } = renderHook(() => useNumber())
expect(result.current).toBe(1)
@ -121,7 +124,7 @@ describe("connectObservable", () => {
it("updates more than one component", async () => {
const value = new Subject<number>()
const [useValue] = bind(value.pipe(startWith(0)))
const [useValue] = bind(value, 0)
const { result: result1, unmount: unmount1 } = renderHook(() => useValue())
const { result: result2, unmount: unmount2 } = renderHook(() => useValue())
const { result: result3, unmount: unmount3 } = renderHook(() => useValue())
@ -162,10 +165,10 @@ describe("connectObservable", () => {
})
it("allows React to batch synchronous updates", async () => {
const numberStream = new BehaviorSubject(1)
const stringStream = new BehaviorSubject("a")
const [useNumber] = bind(numberStream)
const [useString] = bind(stringStream)
const numberStream = new Subject<number>()
const stringStream = new Subject<string>()
const [useNumber] = bind(numberStream, 1)
const [useString] = bind(stringStream, "a")
const BatchComponent: FC<{
onUpdate: () => void
@ -231,15 +234,15 @@ describe("connectObservable", () => {
}),
startWith(0),
)
const [useDelayedNumber] = bind(source$)
const [useDelayedNumber, delayedNumber$] = bind(source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
<div>
<button onClick={() => subject$.next()}>Next</button>
<Suspense fallback={<span>Waiting</span>}>
<Subscribe source$={delayedNumber$} fallback={<span>Waiting</span>}>
<Result />
</Suspense>
</Subscribe>
</div>
)
}
@ -271,15 +274,15 @@ describe("connectObservable", () => {
}),
startWith(0),
)
const [useDelayedNumber] = bind(source$)
const [useDelayedNumber, delayedNumber$] = bind(source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
<div>
<button onClick={() => subject$.next()}>Next</button>
<Suspense fallback={<span>Waiting</span>}>
<Subscribe source$={delayedNumber$} fallback={<span>Waiting</span>}>
<Result />
</Suspense>
</Subscribe>
</div>
)
}
@ -304,8 +307,8 @@ describe("connectObservable", () => {
})
it("allows errors to be caught in error boundaries", () => {
const errStream = new BehaviorSubject(1)
const [useError] = bind(errStream)
const errStream = new Subject()
const [useError] = bind(errStream, 1)
const ErrorComponent = () => {
const value = useError()
@ -333,7 +336,7 @@ describe("connectObservable", () => {
const errStream = new Observable((observer) =>
observer.error("controlled error"),
)
const [useError] = bind(errStream)
const [useError, errStream$] = bind(errStream)
const ErrorComponent = () => {
const value = useError()
@ -343,9 +346,9 @@ describe("connectObservable", () => {
const errorCallback = jest.fn()
const { unmount } = render(
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<Subscribe source$={errStream$} fallback={<div>Loading...</div>}>
<ErrorComponent />
</Suspense>
</Subscribe>
</TestErrorBoundary>,
)
@ -358,7 +361,8 @@ describe("connectObservable", () => {
it("allows async errors to be caught in error boundaries with suspense", async () => {
const errStream = new Subject()
const [useError] = bind(errStream)
const [useError, errStream$] = bind(errStream)
const errStream$WithoutErrors = errStream$.pipe(catchError(() => EMPTY))
const ErrorComponent = () => {
const value = useError()
@ -368,9 +372,12 @@ describe("connectObservable", () => {
const errorCallback = jest.fn()
const { unmount } = render(
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<Subscribe
source$={errStream$WithoutErrors}
fallback={<div>Loading...</div>}
>
<ErrorComponent />
</Suspense>
</Subscribe>
</TestErrorBoundary>,
)
@ -406,12 +413,11 @@ describe("connectObservable", () => {
}
const errorCallback = jest.fn()
error$.pipe(catchError((_, caught) => caught)).subscribe()
const { unmount } = render(
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<Subscribe source$={error$} fallback={<div>Loading...</div>}>
<ErrorComponent />
</Suspense>
</Subscribe>
</TestErrorBoundary>,
)
@ -435,13 +441,12 @@ describe("connectObservable", () => {
await componentAct(async () => {
await wait(200)
})
error$.subscribe()
render(
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<Subscribe source$={error$} fallback={<div>Loading...</div>}>
<ErrorComponent />
</Suspense>
</Subscribe>
</TestErrorBoundary>,
)
expect(screen.queryByText("Loading...")).not.toBeNull()
@ -459,11 +464,9 @@ describe("connectObservable", () => {
})
it("doesn't throw errors on components that will get unmounted on the next cycle", () => {
const valueStream = new BehaviorSubject(1)
const [useValue, value$] = bind(valueStream)
const [useError] = bind(
value$.pipe(switchMap((v) => (v === 1 ? of(v) : throwError("error")))),
)
const valueStream = new Subject<number>()
const [useValue, value$] = bind(valueStream, 1)
const [useError] = bind(value$.pipe(switchMapTo(throwError("error"))), 1)
const ErrorComponent: FC = () => {
const value = useError()
@ -493,20 +496,52 @@ describe("connectObservable", () => {
expect(errorCallback).not.toHaveBeenCalled()
})
it("should not trigger suspense when the stream emits synchronously", () => {
const [useValue] = bind(NEVER.pipe(startWith("Hello")))
it("should throw an error when the stream does not have a subscription", () => {
const [useValue] = bind(of("Hello"))
const errorCallback = jest.fn()
const Component: FC = () => <>{useValue()}</>
render(
<StrictMode>
<Suspense fallback={<div>Loading...</div>}>
<Component />
</Suspense>
<TestErrorBoundary onError={errorCallback}>
<Suspense fallback={<div>Loading...</div>}>
<Component />
</Suspense>
</TestErrorBoundary>
,
</StrictMode>,
)
expect(errorCallback).toHaveBeenCalled()
})
it("should throw an error if the stream completes without emitting while on SUSPENSE", async () => {
const subject = new Subject()
const [useValue, value$] = bind(subject)
const errorCallback = jest.fn()
const Component: FC = () => <>{useValue()}</>
render(
<StrictMode>
<TestErrorBoundary onError={errorCallback}>
<Subscribe source$={value$} fallback={<div>Loading...</div>}>
<Component />
</Subscribe>
</TestErrorBoundary>
,
</StrictMode>,
)
expect(errorCallback).not.toHaveBeenCalled()
expect(screen.queryByText("Loading...")).not.toBeNull()
await componentAct(async () => {
subject.complete()
await wait(100)
})
expect(screen.queryByText("Loading...")).toBeNull()
expect(screen.queryByText("Hello")).not.toBeNull()
expect(errorCallback).toHaveBeenCalled()
})
it("if the observable hasn't emitted and a defaultValue is provided, it does not start suspense", () => {

View File

@ -1,8 +1,6 @@
import { Observable } from "rxjs"
import shareLatest from "../internal/share-latest"
import reactEnhancer from "../internal/react-enhancer"
import { useObservable } from "../internal/useObservable"
import { EMPTY_VALUE } from "../internal/empty-value"
/**
* Accepts: An Observable.
@ -22,11 +20,10 @@ import { EMPTY_VALUE } from "../internal/empty-value"
const emptyArr: Array<any> = []
export default function connectObservable<T>(
observable: Observable<T>,
defaultValue: T = EMPTY_VALUE,
defaultValue: T,
) {
const sharedObservable$ = shareLatest<T>(observable, false)
const getValue = reactEnhancer(sharedObservable$, defaultValue)
const sharedObservable$ = shareLatest<T>(observable, false, defaultValue)
const useStaticObservable = () =>
useObservable(sharedObservable$, getValue, emptyArr)
useObservable(sharedObservable$, emptyArr, defaultValue)
return [useStaticObservable, sharedObservable$] as const
}

View File

@ -2,6 +2,7 @@ import { Observable } from "rxjs"
import { SUSPENSE } from "../SUSPENSE"
import connectFactoryObservable from "./connectFactoryObservable"
import connectObservable from "./connectObservable"
import { EMPTY_VALUE } from "../internal/empty-value"
/**
* Binds an observable to React
@ -45,8 +46,8 @@ export function bind<A extends unknown[], O>(
defaultValue?: O,
): [(...args: A) => Exclude<O, typeof SUSPENSE>, (...args: A) => Observable<O>]
export function bind(...args: any[]) {
return (typeof args[0] === "function"
export function bind(observable: any, defaultValue: any = EMPTY_VALUE) {
return (typeof observable === "function"
? (connectFactoryObservable as any)
: connectObservable)(...args)
: connectObservable)(observable, defaultValue)
}

View File

@ -1,6 +1,5 @@
import { Observable } from "rxjs"
import { SUSPENSE } from "../SUSPENSE"
export interface BehaviorObservable<T> extends Observable<T> {
getValue: () => T | typeof SUSPENSE
gV: () => T
}

View File

@ -1,67 +0,0 @@
import { SUSPENSE } from "../SUSPENSE"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
const reactEnhancer = <T>(
source$: BehaviorObservable<T>,
defaultValue: T,
): (() => T) => {
let promise: Promise<T | void> | null
let error: any = EMPTY_VALUE
const res = (): T => {
const currentValue = source$.getValue()
if (currentValue !== SUSPENSE && currentValue !== EMPTY_VALUE) {
return currentValue
}
if (defaultValue !== EMPTY_VALUE) return defaultValue
let timeoutToken
if (error !== EMPTY_VALUE) {
clearTimeout(timeoutToken)
timeoutToken = setTimeout(() => {
error = EMPTY_VALUE
}, 50)
throw error
}
if (promise) throw promise
let value: typeof EMPTY_VALUE | T = EMPTY_VALUE
promise = new Promise<T>((res) => {
const subscription = source$.subscribe(
(v) => {
if (v !== (SUSPENSE as any)) {
value = v
subscription && subscription.unsubscribe()
res(v)
}
},
(e) => {
error = e
timeoutToken = setTimeout(() => {
error = EMPTY_VALUE
}, 50)
res()
},
)
if (value !== EMPTY_VALUE) {
subscription.unsubscribe()
}
}).finally(() => {
promise = null
})
if (value !== EMPTY_VALUE) {
promise = null
return value
}
throw error !== EMPTY_VALUE ? error : promise
}
res.d = defaultValue
return res
}
export default reactEnhancer

View File

@ -1,18 +1,23 @@
import { Observable, Subscription, Subject, noop } from "rxjs"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
import { SUSPENSE } from "../SUSPENSE"
const shareLatest = <T>(
source$: Observable<T>,
shouldComplete = true,
defaultValue: T = EMPTY_VALUE,
teardown = noop,
): BehaviorObservable<T> => {
let subject: Subject<T> | null
let subscription: Subscription | null
let refCount = 0
let currentValue: T = EMPTY_VALUE
let promise: Promise<T> | null
const result = new Observable<T>((subscriber) => {
if (!shouldComplete) subscriber.complete = noop
refCount++
let innerSub: Subscription
if (!subject) {
@ -31,7 +36,7 @@ const shareLatest = <T>(
},
() => {
subscription = null
shouldComplete && subject!.complete()
subject!.complete()
},
)
if (subscription.closed) subscription = null
@ -53,11 +58,57 @@ const shareLatest = <T>(
teardown()
subject = null
subscription = null
promise = null
}
}
}) as BehaviorObservable<T>
result.getValue = () => currentValue
let error: any = EMPTY_VALUE
let timeoutToken: any
result.gV = (): T => {
if ((currentValue as any) !== SUSPENSE && currentValue !== EMPTY_VALUE) {
return currentValue
}
if (defaultValue !== EMPTY_VALUE) return defaultValue
if (error !== EMPTY_VALUE) {
clearTimeout(timeoutToken)
timeoutToken = setTimeout(() => {
error = EMPTY_VALUE
}, 50)
throw error
}
if (!subscription) {
throw new Error("Missing subscription")
}
if (promise) throw promise
throw (promise = new Promise<T>((res) => {
const setError = (e: any) => {
error = e
timeoutToken = setTimeout(() => {
error = EMPTY_VALUE
}, 50)
res()
promise = null
}
const pSubs = subject!.subscribe(
(v) => {
if (v !== (SUSPENSE as any)) {
pSubs.unsubscribe()
res(v)
promise = null
}
},
setError,
() => {
setError(new Error("Empty observable"))
},
)
subscription!.add(pSubs)
}))
}
return result
}

View File

@ -1,17 +1,18 @@
import { useEffect, useState, useRef } from "react"
import { SUSPENSE } from "../SUSPENSE"
import { EMPTY_VALUE } from "./empty-value"
import { Observable } from "rxjs"
import { BehaviorObservable } from "../internal/BehaviorObservable"
export const useObservable = <O>(
source$: Observable<O>,
getValue: () => O,
source$: BehaviorObservable<O>,
keys: Array<any>,
defaultValue: O,
): Exclude<O, typeof SUSPENSE> => {
const [state, setState] = useState(getValue)
const [state, setState] = useState(source$.gV)
const prevStateRef = useRef<O | (() => O)>(state)
useEffect(() => {
const { gV } = source$
let err: any = EMPTY_VALUE
let syncVal: O | typeof SUSPENSE = EMPTY_VALUE
@ -32,14 +33,13 @@ export const useObservable = <O>(
setState((prevStateRef.current = value))
}
const defaultValue = (getValue as any).d
if (syncVal === EMPTY_VALUE) {
set(defaultValue === EMPTY_VALUE ? getValue : defaultValue)
set(defaultValue === EMPTY_VALUE ? gV : defaultValue)
}
const t = subscription
subscription = source$.subscribe((value: O | typeof SUSPENSE) => {
set(value === SUSPENSE ? getValue : value)
set(value === SUSPENSE ? gV : value)
}, onError)
t.unsubscribe()

View File

@ -36,7 +36,7 @@
"Victor Oliva (https://github.com/voliva)"
],
"devDependencies": {
"@react-rxjs/core": "0.5.0",
"@react-rxjs/core": "0.6.0",
"@testing-library/react": "^11.1.0",
"@testing-library/react-hooks": "^3.4.2",
"@types/jest": "^26.0.15",

View File

@ -36,7 +36,7 @@
"Victor Oliva (https://github.com/voliva)"
],
"devDependencies": {
"@react-rxjs/core": "0.5.0",
"@react-rxjs/core": "0.6.0",
"@testing-library/react": "^11.1.0",
"@testing-library/react-hooks": "^3.4.2",
"@types/jest": "^26.0.15",