feat(core): bind

This commit is contained in:
Josep M Sobrepere 2020-07-21 11:16:28 +02:00
parent db4d38e271
commit 76609e867e
9 changed files with 143 additions and 85 deletions

View File

@ -24,8 +24,9 @@
- [Installation](#installation)
- [API](#api)
- Core
- [connectObservable](#connectobservable)
- [connectFactoryObservable](#connectfactoryobservable)
- [bind](#bind)
- [Observable overload](#observable-overload)
- [Factory of Observables overload](#factory-of-observables-overload)
- [shareLatest](#sharelatest)
- React Suspense Support
- [SUSPENSE](#suspense)
@ -40,9 +41,11 @@
## API
### connectObservable
### bind
#### Observable overload
```ts
const [useCounter, sharedCounter$] = connectObservable(
const [useCounter, sharedCounter$] = bind(
clicks$.pipe(
scan(prev => prev + 1, 0),
startWith(0),
@ -61,9 +64,9 @@ the hook will leverage React Suspense while it's waiting for the first value.
streams that depend on it. The shared subscription is closed as soon as there
are no subscribers to that observable.
### connectFactoryObservable
#### Factory of Observables overload
```tsx
const [useStory, getStory$] = connectFactoryObservable(
const [useStory, getStory$] = bind(
(storyId: number) => getStoryWithUpdates$(storyId)
)
@ -111,8 +114,7 @@ const shareLatest = <T>(): Observable<T> =>
)
```
The enhanced observables returned from `connectObservable` and `connectFactoryObservable`
have been enhanced with this operator.
The enhanced observables returned from `bind` have been enhanced with this operator.
### SUSPENSE
@ -170,7 +172,7 @@ Like `switchMap` but applying a `startWith(SUSPENSE)` to the inner observable.
import React, { Suspense } from "react"
import { Subject } from "rxjs"
import { startWith, map } from "rxjs/operators"
import { connectObservable, switchMapSuspended } from "react-rxjs"
import { bind, switchMapSuspended } from "@react-rxjs/core"
import { Header, Search, LoadingResults, Repo } from "./components"
interface Repo {
@ -199,7 +201,7 @@ const findRepos = (query: string): Promise<Repo[]> =>
})),
)
const [useRepos, repos$] = connectObservable(
const [useRepos, repos$] = bind(
searchInput$.pipe(
switchMapSuspended(findRepos),
startWith(null),
@ -228,7 +230,7 @@ function Repos() {
)
}
const [useMostRecentlyUpdatedRepo] = connectObservable(
const [useMostRecentlyUpdatedRepo] = bind(
repos$.pipe(
map(repos =>
Array.isArray(repos) && repos.length > 0

View File

@ -5,9 +5,11 @@
## API
### connectObservable
### bind
#### Observable overload
```ts
const [useCounter, sharedCounter$] = connectObservable(
const [useCounter, sharedCounter$] = bind(
clicks$.pipe(
scan(prev => prev + 1, 0),
startWith(0),
@ -26,9 +28,9 @@ the hook will leverage React Suspense while it's waiting for the first value.
streams that depend on it. The shared subscription is closed as soon as there
are no subscribers to that observable.
### connectFactoryObservable
#### Factory Observables overload
```tsx
const [useStory, getStory$] = connectFactoryObservable(
const [useStory, getStory$] = bind(
(storyId: number) => getStoryWithUpdates$(storyId)
)
@ -76,8 +78,7 @@ const shareLatest = <T>(): Observable<T> =>
)
```
The enhanced observables returned from `connectObservable` and `connectFactoryObservable`
have been enhanced with this operator.
The enhanced observables returned from `bind` have been enhanced with this operator.
### SUSPENSE

View File

@ -18,8 +18,8 @@ import {
screen,
render,
} from "@testing-library/react"
import { connectFactoryObservable } from "./"
import { TestErrorBoundary } from "./test-helpers/TestErrorBoundary"
import { bind } from "../"
import { TestErrorBoundary } from "../test-helpers/TestErrorBoundary"
const wait = (ms: number) => new Promise((res) => setTimeout(res, ms))
@ -43,7 +43,7 @@ describe("connectFactoryObservable", () => {
describe("hook", () => {
it("returns the latest emitted value", async () => {
const valueStream = new BehaviorSubject(1)
const [useNumber] = connectFactoryObservable(() => valueStream)
const [useNumber] = bind(() => valueStream)
const { result } = renderHook(() => useNumber())
expect(result.current).toBe(1)
@ -55,7 +55,7 @@ describe("connectFactoryObservable", () => {
it("suspends the component when the observable hasn't emitted yet.", async () => {
const source$ = of(1).pipe(delay(100))
const [useDelayedNumber] = connectFactoryObservable(() => source$)
const [useDelayedNumber] = bind(() => source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
@ -86,7 +86,7 @@ describe("connectFactoryObservable", () => {
const [
useLatestNumber,
latestNumber$,
] = connectFactoryObservable((id: number, value: number) =>
] = bind((id: number, value: number) =>
concat(observable$, of(id + value)),
)
expect(subscriberCount).toBe(0)
@ -108,7 +108,7 @@ describe("connectFactoryObservable", () => {
})
it("returns the value of next new Observable when the arguments change", () => {
const [useNumber] = connectFactoryObservable((x: number) => of(x))
const [useNumber] = bind((x: number) => of(x))
const { result, rerender } = renderHook(({ input }) => useNumber(input), {
initialProps: { input: 0 },
})
@ -126,9 +126,7 @@ describe("connectFactoryObservable", () => {
})
it("suspends the component when the factory-observable hasn't emitted yet.", async () => {
const [useDelayedNumber] = connectFactoryObservable((x: number) =>
of(x).pipe(delay(50)),
)
const [useDelayedNumber] = bind((x: number) => of(x).pipe(delay(50)))
const Result: React.FC<{ input: number }> = (p) => (
<div>Result {useDelayedNumber(p.input)}</div>
)
@ -183,7 +181,7 @@ describe("connectFactoryObservable", () => {
return from([1, 2, 3, 4, 5])
})
const [useLatestNumber] = connectFactoryObservable(
const [useLatestNumber] = bind(
(id: number) => concat(observable$, of(id)),
100,
)
@ -207,7 +205,7 @@ describe("connectFactoryObservable", () => {
it("allows errors to be caught in error boundaries", () => {
const errStream = new BehaviorSubject(1)
const [useError] = connectFactoryObservable(() => errStream)
const [useError] = bind(() => errStream)
const ErrorComponent = () => {
const value = useError()
@ -236,7 +234,7 @@ describe("connectFactoryObservable", () => {
const errStream = new Observable((observer) =>
observer.error("controlled error"),
)
const [useError] = connectFactoryObservable((_: string) => errStream)
const [useError] = bind((_: string) => errStream)
const ErrorComponent = () => {
const value = useError("foo")
@ -262,7 +260,7 @@ describe("connectFactoryObservable", () => {
it("allows async errors to be caught in error boundaries with suspense", async () => {
const errStream = new Subject()
const [useError] = connectFactoryObservable((_: string) => errStream)
const [useError] = bind((_: string) => errStream)
const ErrorComponent = () => {
const value = useError("foo")
@ -300,9 +298,7 @@ describe("connectFactoryObservable", () => {
observer.error("controlled error")
})
const [useOkKo] = connectFactoryObservable((ok: boolean) =>
ok ? normal$ : errored$,
)
const [useOkKo] = bind((ok: boolean) => (ok ? normal$ : errored$))
const ErrorComponent = () => {
const [ok, setOk] = useState(true)
@ -347,8 +343,8 @@ describe("connectFactoryObservable", () => {
it("doesn't throw errors on components that will get unmounted on the next cycle", () => {
const valueStream = new BehaviorSubject(1)
const [useValue, value$] = connectFactoryObservable(() => valueStream)
const [useError] = connectFactoryObservable(() =>
const [useValue, value$] = bind(() => valueStream)
const [useError] = bind(() =>
value$().pipe(
switchMap((v) => (v === 1 ? of(v) : throwError("error"))),
),
@ -386,13 +382,10 @@ describe("connectFactoryObservable", () => {
describe("observable", () => {
it("it completes when the source observable completes, regardless of mounted componentes being subscribed to the source", async () => {
let diff = -1
const [useLatestNumber, getShared] = connectFactoryObservable(
(_: number) => {
diff++
return from([1, 2, 3, 4].map((val) => val + diff))
},
0,
)
const [useLatestNumber, getShared] = bind((_: number) => {
diff++
return from([1, 2, 3, 4].map((val) => val + diff))
}, 0)
let latestValue1: number = 0
let nUpdates = 0

View File

@ -1,10 +1,10 @@
import { Observable } from "rxjs"
import shareLatest from "./internal/share-latest"
import reactEnhancer from "./internal/react-enhancer"
import { BehaviorObservable } from "./internal/BehaviorObservable"
import { useObservable } from "./internal/useObservable"
import { SUSPENSE } from "./SUSPENSE"
import { takeUntilComplete } from "./internal/take-until-complete"
import shareLatest from "../internal/share-latest"
import reactEnhancer from "../internal/react-enhancer"
import { BehaviorObservable } from "../internal/BehaviorObservable"
import { useObservable } from "../internal/useObservable"
import { SUSPENSE } from "../SUSPENSE"
import { takeUntilComplete } from "../internal/take-until-complete"
/**
* Accepts: A factory function that returns an Observable.
@ -28,12 +28,12 @@ import { takeUntilComplete } from "./internal/take-until-complete"
* subscription, then the hook will leverage React Suspense while it's waiting
* for the first value.
*/
export function connectFactoryObservable<
export default function connectFactoryObservable<
A extends (number | string | boolean | null)[],
O
>(
getObservable: (...args: A) => Observable<O>,
unsubscribeGraceTime = 200,
unsubscribeGraceTime: number,
): [
(...args: A) => Exclude<O, typeof SUSPENSE>,
(...args: A) => Observable<O>,

View File

@ -16,10 +16,10 @@ import {
Observable,
} from "rxjs"
import { delay, scan, startWith, map, switchMap } from "rxjs/operators"
import { connectObservable, SUSPENSE } from "./"
import { TestErrorBoundary } from "./test-helpers/TestErrorBoundary"
import { bind, SUSPENSE } from "../"
import { TestErrorBoundary } from "../test-helpers/TestErrorBoundary"
const wait = (ms: number) => new Promise(res => setTimeout(res, ms))
const wait = (ms: number) => new Promise((res) => setTimeout(res, ms))
describe("connectObservable", () => {
const originalError = console.error
@ -42,7 +42,7 @@ describe("connectObservable", () => {
it("sets the initial state synchronously if it's available", async () => {
const observable$ = of(1)
const [useLatestNumber] = connectObservable(observable$)
const [useLatestNumber] = bind(observable$)
const { result } = renderHook(() => useLatestNumber())
expect(result.current).toEqual(1)
@ -50,7 +50,7 @@ describe("connectObservable", () => {
it("suspends the component when the observable hasn't emitted yet.", async () => {
const source$ = of(1).pipe(delay(100))
const [useDelayedNumber] = connectObservable(source$)
const [useDelayedNumber] = bind(source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
@ -73,7 +73,7 @@ describe("connectObservable", () => {
it("updates with the last emitted value", async () => {
const numberStream = new BehaviorSubject(1)
const [useNumber] = connectObservable(numberStream)
const [useNumber] = bind(numberStream)
const { result } = renderHook(() => useNumber())
expect(result.current).toBe(1)
@ -85,7 +85,7 @@ describe("connectObservable", () => {
it("updates more than one component", async () => {
const value = new Subject<number>()
const [useValue] = connectObservable(value.pipe(startWith(0)), 50)
const [useValue] = bind(value.pipe(startWith(0)), 50)
const { result: result1, unmount: unmount1 } = renderHook(() => useValue())
const { result: result2, unmount: unmount2 } = renderHook(() => useValue())
const { result: result3, unmount: unmount3 } = renderHook(() => useValue())
@ -128,8 +128,8 @@ describe("connectObservable", () => {
it("allows React to batch synchronous updates", async () => {
const numberStream = new BehaviorSubject(1)
const stringStream = new BehaviorSubject("a")
const [useNumber] = connectObservable(numberStream)
const [useString] = connectObservable(stringStream)
const [useNumber] = bind(numberStream)
const [useString] = bind(stringStream)
const BatchComponent: FC<{
onUpdate: () => void
@ -163,7 +163,7 @@ describe("connectObservable", () => {
return from([1, 2, 3, 4, 5])
})
const [useLatestNumber] = connectObservable(observable$, 100)
const [useLatestNumber] = bind(observable$, 100)
const { unmount } = renderHook(() => useLatestNumber())
const { unmount: unmount2 } = renderHook(() => useLatestNumber())
const { unmount: unmount3 } = renderHook(() => useLatestNumber())
@ -189,7 +189,7 @@ describe("connectObservable", () => {
return from([1, 2, 3, 4, 5])
})
const [useLatestNumber] = connectObservable(observable$, Infinity)
const [useLatestNumber] = bind(observable$, Infinity)
const { unmount } = renderHook(() => useLatestNumber())
const { unmount: unmount2 } = renderHook(() => useLatestNumber())
const { unmount: unmount3 } = renderHook(() => useLatestNumber())
@ -208,8 +208,8 @@ describe("connectObservable", () => {
it("suspends the component when the observable emits SUSPENSE", async () => {
const subject$ = new Subject()
const source$ = subject$.pipe(
scan(a => a + 1, 0),
map(x => {
scan((a) => a + 1, 0),
map((x) => {
if (x === 1) {
return SUSPENSE
}
@ -217,7 +217,7 @@ describe("connectObservable", () => {
}),
startWith(0),
)
const [useDelayedNumber] = connectObservable(source$)
const [useDelayedNumber] = bind(source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
@ -248,8 +248,8 @@ describe("connectObservable", () => {
it("keeps in suspense if more than two SUSPENSE are emitted in succesion", async () => {
const subject$ = new Subject()
const source$ = subject$.pipe(
scan(a => a + 1, 0),
map(x => {
scan((a) => a + 1, 0),
map((x) => {
if (x <= 2) {
return SUSPENSE
}
@ -257,7 +257,7 @@ describe("connectObservable", () => {
}),
startWith(0),
)
const [useDelayedNumber] = connectObservable(source$)
const [useDelayedNumber] = bind(source$)
const Result: React.FC = () => <div>Result {useDelayedNumber()}</div>
const TestSuspense: React.FC = () => {
return (
@ -291,7 +291,7 @@ describe("connectObservable", () => {
it("allows errors to be caught in error boundaries", () => {
const errStream = new BehaviorSubject(1)
const [useError] = connectObservable(errStream)
const [useError] = bind(errStream)
const ErrorComponent = () => {
const value = useError()
@ -316,10 +316,10 @@ describe("connectObservable", () => {
})
it("allows sync errors to be caught in error boundaries with suspense", () => {
const errStream = new Observable(observer =>
const errStream = new Observable((observer) =>
observer.error("controlled error"),
)
const [useError] = connectObservable(errStream)
const [useError] = bind(errStream)
const ErrorComponent = () => {
const value = useError()
@ -344,7 +344,7 @@ describe("connectObservable", () => {
it("allows async errors to be caught in error boundaries with suspense", async () => {
const errStream = new Subject()
const [useError] = connectObservable(errStream)
const [useError] = bind(errStream)
const ErrorComponent = () => {
const value = useError()
@ -374,7 +374,7 @@ describe("connectObservable", () => {
it("allows to retry the errored observable after a grace period of time", async () => {
let errStream = new Subject<string>()
const [useError] = connectObservable(
const [useError] = bind(
defer(() => {
return (errStream = new Subject<string>())
}),
@ -438,9 +438,9 @@ describe("connectObservable", () => {
it("doesn't throw errors on components that will get unmounted on the next cycle", () => {
const valueStream = new BehaviorSubject(1)
const [useValue, value$] = connectObservable(valueStream)
const [useError] = connectObservable(
value$.pipe(switchMap(v => (v === 1 ? of(v) : throwError("error")))),
const [useValue, value$] = bind(valueStream)
const [useError] = bind(
value$.pipe(switchMap((v) => (v === 1 ? of(v) : throwError("error")))),
)
const ErrorComponent: FC = () => {

View File

@ -1,8 +1,8 @@
import { Observable } from "rxjs"
import shareLatest from "./internal/share-latest"
import reactEnhancer from "./internal/react-enhancer"
import { useObservable } from "./internal/useObservable"
import { takeUntilComplete } from "./internal/take-until-complete"
import shareLatest from "../internal/share-latest"
import reactEnhancer from "../internal/react-enhancer"
import { useObservable } from "../internal/useObservable"
import { takeUntilComplete } from "../internal/take-until-complete"
/**
* Accepts: An Observable.
@ -22,9 +22,9 @@ import { takeUntilComplete } from "./internal/take-until-complete"
* subscription, then the hook will leverage React Suspense while it's waiting
* for the first value.
*/
export function connectObservable<T>(
export default function connectObservable<T>(
observable: Observable<T>,
unsubscribeGraceTime = 200,
unsubscribeGraceTime: number,
) {
const sharedObservable$ = shareLatest<T>(observable)
const reactObservable$ = reactEnhancer(

View File

@ -0,0 +1,63 @@
import { Observable } from "rxjs"
import { SUSPENSE } from "../SUSPENSE"
import connectFactoryObservable from "./connectFactoryObservable"
import connectObservable from "./connectObservable"
/**
* Accepts: An Observable.
*
* Returns [1, 2]
* 1. A React Hook that yields the latest emitted value of the observable
* 2. A `sharedLatest` version of the observable. It can be used for composing
* other streams that depend on it. The shared subscription is closed as soon as
* there are no subscribers to that observable.
*
* @param observable Source observable to be used by the hook.
* @param unsubscribeGraceTime (= 200): Amount of time in ms that the shared
* observable should wait before unsubscribing from the source observable when
* there are no new subscribers.
*
* @remarks If the Observable doesn't synchronously emit a value upon the first
* subscription, then the hook will leverage React Suspense while it's waiting
* for the first value.
*/
export function bind<T>(
observable: Observable<T>,
unsubscribeGraceTime?: number,
): [() => Exclude<T, typeof SUSPENSE>, Observable<T>]
/**
* Accepts: A factory function that returns an Observable.
*
* Returns [1, 2]
* 1. A React Hook function with the same parameters as the factory function.
* This hook will yield the latest update from the observable returned from
* the factory function.
* 2. A `sharedLatest` version of the observable generated by the factory
* function that can be used for composing other streams that depend on it.
* The shared subscription is closed as soon as there are no subscribers to
* that observable.
*
* @param getObservable Factory of observables. The arguments of this function
* will be the ones used in the hook.
* @param unsubscribeGraceTime (= 200): Amount of time in ms that the shared
* observable should wait before unsubscribing from the source observable when
* there are no new subscribers.
*
* @remarks If the Observable doesn't synchronously emit a value upon the first
* subscription, then the hook will leverage React Suspense while it's waiting
* for the first value.
*/
export function bind<A extends (number | string | boolean | null)[], O>(
getObservable: (...args: A) => Observable<O>,
unsubscribeGraceTime?: number,
): [(...args: A) => Exclude<O, typeof SUSPENSE>, (...args: A) => Observable<O>]
export function bind<A extends (number | string | boolean | null)[], O>(
obs: ((...args: A) => Observable<O>) | Observable<O>,
unsubscribeGraceTime = 200,
) {
return (typeof obs === "function"
? (connectFactoryObservable as any)
: connectObservable)(obs, unsubscribeGraceTime)
}

View File

@ -1,6 +1,5 @@
// core
export { connectObservable } from "./connectObservable"
export { connectFactoryObservable } from "./connectFactoryObservable"
export { bind } from "./bind"
export { shareLatest } from "./operators/shareLatest"
// support for React Suspense

View File

@ -1,13 +1,13 @@
import React, { Component, ErrorInfo, useLayoutEffect } from "react"
import { Observable, from, throwError } from "rxjs"
import { delay, startWith } from "rxjs/operators"
import { connectFactoryObservable } from "@react-rxjs/core"
import { bind } from "@react-rxjs/core"
import { batchUpdates } from "./"
import { render, screen } from "@testing-library/react"
const wait = (ms: number) => new Promise((res) => setTimeout(res, ms))
const [useLatestNumber] = connectFactoryObservable(
const [useLatestNumber] = bind(
(id: string, batched: boolean) =>
(id === "error"
? throwError("controlled error")