From 1e67a47ea1cdf7acb64eefde947f4e8980e9df71 Mon Sep 17 00:00:00 2001 From: Josep M Sobrepere Date: Thu, 4 Jun 2020 04:52:44 +0200 Subject: [PATCH] v0.2.0-alpha.6 connectInstanceObservable --- package-lock.json | 8 +- package.json | 3 +- src/connectInstanceObservable.ts | 117 +++++++++++++++++++++++++ src/index.tsx | 1 + test/connectInstanceObservable.test.ts | 68 ++++++++++++++ 5 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 src/connectInstanceObservable.ts create mode 100644 test/connectInstanceObservable.test.ts diff --git a/package-lock.json b/package-lock.json index bfdddb7..0560643 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@josepot/react-rxjs", - "version": "0.2.0-alpha.5", + "version": "0.2.0-alpha.6", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -1411,6 +1411,12 @@ "@types/yargs": "^13.0.0" } }, + "@josepot/rxjs-utils": { + "version": "0.12.0", + "resolved": "https://registry.npmjs.org/@josepot/rxjs-utils/-/rxjs-utils-0.12.0.tgz", + "integrity": "sha512-tgD3tO8E1OTnPgotqg4UiA1E98bD/qRsifEsrChc1hHSqdC9vFOemzlLw1BjwJ3rLnd+Xp6PBupVD0c6raQq0A==", + "dev": true + }, "@rollup/plugin-commonjs": { "version": "11.1.0", "resolved": "https://registry.npmjs.org/@rollup/plugin-commonjs/-/plugin-commonjs-11.1.0.tgz", diff --git a/package.json b/package.json index 0746228..17a52f5 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "version": "0.2.0-alpha.5", + "version": "0.2.0-alpha.6", "sideEffects": false, "repository": { "type": "git", @@ -36,6 +36,7 @@ "author": "Josep M Sobrepere", "module": "dist/react-rxjs.esm.js", "devDependencies": { + "@josepot/rxjs-utils": "^0.12.0", "@testing-library/react-hooks": "^3.2.1", "@types/jest": "^25.2.3", "@types/react": "^16.9.35", diff --git a/src/connectInstanceObservable.ts b/src/connectInstanceObservable.ts new file mode 100644 index 0000000..333bc54 --- /dev/null +++ b/src/connectInstanceObservable.ts @@ -0,0 +1,117 @@ +import { Observable, Subject } from "rxjs" +import { FactoryObservableOptions, defaultFactoryOptions } from "./options" +import { distinctUntilChanged } from "rxjs/operators" +import { useEffect, useRef, useLayoutEffect, useState } from "react" +import delayUnsubscription from "./operators/delay-unsubscription" + +interface ConnectInstanceObservable { + ( + getObservable: (a: Observable) => Observable, + initialValue: I, + options?: FactoryObservableOptions, + ): (a: T1) => O | I + ( + getObservable: (a: Observable, b: Observable) => Observable, + initialValue: I, + options?: FactoryObservableOptions, + ): (a: T1, b: T2) => O | I + ( + getObservable: ( + a: Observable, + b: Observable, + c: Observable, + ) => Observable, + initialValue: I, + options?: FactoryObservableOptions, + ): (a: T1, b: T2, c: T3) => O | I + ( + getObservable: ( + a: Observable, + b: Observable, + c: Observable, + d: Observable, + ) => Observable, + initialValue: I, + options?: FactoryObservableOptions, + ): (a: T1, b: T2, c: T3, d: T4) => O | I + ( + getObservable: ( + a: Observable, + b: Observable, + c: Observable, + d: Observable, + e: Observable, + ) => Observable, + initialValue: I, + options?: FactoryObservableOptions, + ): (a: T1, b: T2, c: T3, d: T4, e: T5) => O | I + ( + getObservable: ( + a: Observable, + b: Observable, + c: Observable, + d: Observable, + e: Observable, + f: Observable, + ) => Observable, + initialValue: I, + options?: FactoryObservableOptions, + ): (a: T1, b: T2, c: T3, d: T4, e: T5, f: T6) => O | I + ( + getObservable: ( + a: Observable, + b: Observable, + c: Observable, + d: Observable, + e: Observable, + f: Observable, + g: Observable, + ) => Observable, + initialValue: I, + options?: FactoryObservableOptions, + ): (a: T1, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7) => O | I +} + +export const connectInstanceObservable: ConnectInstanceObservable = ( + getObservable: any, + initialValue: any, + _options?: any, +) => { + const options = { + ...defaultFactoryOptions, + ..._options, + } + + const useInstance = (...input: any) => { + const subjectsRef = useRef() + const [state, setState] = useState(initialValue) + + useLayoutEffect(() => { + const subjects = [] as Subject[] + for (let i = 0; i < getObservable.length; i++) { + subjects.push(new Subject()) + } + subjectsRef.current = subjects + + const inputs = subjects.map(s => s.pipe(distinctUntilChanged())) as any + const subscription = getObservable(...inputs) + .pipe( + distinctUntilChanged(), + delayUnsubscription(options.unsubscribeGraceTime), + ) + .subscribe(setState) + + return () => subscription.unsubscribe() + }, []) + + useEffect(() => { + input.forEach((value: any, idx: any) => + subjectsRef.current![idx].next(value), + ) + }, input) + + return state + } + + return useInstance as any +} diff --git a/src/index.tsx b/src/index.tsx index c841ee1..d7ab2fb 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -1,4 +1,5 @@ export { connectObservable } from "./connectObservable" export { connectFactoryObservable } from "./connectFactoryObservable" +export { connectInstanceObservable } from "./connectInstanceObservable" export { default as distinctShareReplay } from "./operators/distinct-share-replay" export { default as useSharedReplayableObservable } from "./useSharedReplayableObservable" diff --git a/test/connectInstanceObservable.test.ts b/test/connectInstanceObservable.test.ts new file mode 100644 index 0000000..d61e042 --- /dev/null +++ b/test/connectInstanceObservable.test.ts @@ -0,0 +1,68 @@ +import { connectInstanceObservable } from "../src" +import { of, Subject, Observable, combineLatest, merge } from "rxjs" +import { renderHook } from "@testing-library/react-hooks" +import { + ignoreElements, + switchMap, + distinctUntilChanged, + scan, + shareReplay, + tap, + map, + startWith, +} from "rxjs/operators" +import { groupInMap } from "@josepot/rxjs-utils" + +const getPriceData = (key: string, period: number) => + of( + key + .split("") + .map((_, idx) => idx) + .concat(period), + ) + +const tagAndPeriod$ = new Subject<[string, number]>() +const tagsData$ = tagAndPeriod$.pipe( + groupInMap( + ([tag]) => tag, + stream$ => + stream$.pipe( + scan((maxPeriod, [, period]) => Math.max(maxPeriod, period), 0), + distinctUntilChanged(), + switchMap(period => getPriceData(stream$.key, period)), + ), + ), + startWith(new Map()), + shareReplay(1), +) +const empty: number[] = [] +const usePriceData = connectInstanceObservable( + (tags$: Observable, periods$: Observable) => { + const plug$ = combineLatest(tags$, periods$).pipe( + tap(tagAndPeriod$), + ignoreElements(), + ) + + const data$ = tags$.pipe( + switchMap(tag => + tagsData$.pipe( + map(tags => tags.get(tag) || empty), + distinctUntilChanged(), + ), + ), + ) + + return combineLatest(data$, merge(periods$, plug$)).pipe( + map(([data, period]) => data.slice(-period)), + ) + }, + [], +) + +describe("connectInstanceObservable", () => { + it("works", async () => { + const { result } = renderHook(() => usePriceData("hello", 9)) + + expect(result.current).toEqual([0, 1, 2, 3, 4, 9]) + }) +})