v0.2.0-alpha.6 connectInstanceObservable

This commit is contained in:
Josep M Sobrepere 2020-06-04 04:52:44 +02:00
parent c7f6acd2ca
commit 1e67a47ea1
5 changed files with 195 additions and 2 deletions

8
package-lock.json generated
View File

@ -1,6 +1,6 @@
{
"name": "@josepot/react-rxjs",
"version": "0.2.0-alpha.5",
"version": "0.2.0-alpha.6",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@ -1411,6 +1411,12 @@
"@types/yargs": "^13.0.0"
}
},
"@josepot/rxjs-utils": {
"version": "0.12.0",
"resolved": "https://registry.npmjs.org/@josepot/rxjs-utils/-/rxjs-utils-0.12.0.tgz",
"integrity": "sha512-tgD3tO8E1OTnPgotqg4UiA1E98bD/qRsifEsrChc1hHSqdC9vFOemzlLw1BjwJ3rLnd+Xp6PBupVD0c6raQq0A==",
"dev": true
},
"@rollup/plugin-commonjs": {
"version": "11.1.0",
"resolved": "https://registry.npmjs.org/@rollup/plugin-commonjs/-/plugin-commonjs-11.1.0.tgz",

View File

@ -1,5 +1,5 @@
{
"version": "0.2.0-alpha.5",
"version": "0.2.0-alpha.6",
"sideEffects": false,
"repository": {
"type": "git",
@ -36,6 +36,7 @@
"author": "Josep M Sobrepere",
"module": "dist/react-rxjs.esm.js",
"devDependencies": {
"@josepot/rxjs-utils": "^0.12.0",
"@testing-library/react-hooks": "^3.2.1",
"@types/jest": "^25.2.3",
"@types/react": "^16.9.35",

View File

@ -0,0 +1,117 @@
import { Observable, Subject } from "rxjs"
import { FactoryObservableOptions, defaultFactoryOptions } from "./options"
import { distinctUntilChanged } from "rxjs/operators"
import { useEffect, useRef, useLayoutEffect, useState } from "react"
import delayUnsubscription from "./operators/delay-unsubscription"
interface ConnectInstanceObservable {
<I, T1, O>(
getObservable: (a: Observable<T1>) => Observable<O>,
initialValue: I,
options?: FactoryObservableOptions<O>,
): (a: T1) => O | I
<I, T1, T2, O>(
getObservable: (a: Observable<T1>, b: Observable<T2>) => Observable<O>,
initialValue: I,
options?: FactoryObservableOptions<O>,
): (a: T1, b: T2) => O | I
<I, T1, T2, T3, O>(
getObservable: (
a: Observable<T1>,
b: Observable<T2>,
c: Observable<T3>,
) => Observable<O>,
initialValue: I,
options?: FactoryObservableOptions<O>,
): (a: T1, b: T2, c: T3) => O | I
<I, T1, T2, T3, T4, O>(
getObservable: (
a: Observable<T1>,
b: Observable<T2>,
c: Observable<T3>,
d: Observable<T4>,
) => Observable<O>,
initialValue: I,
options?: FactoryObservableOptions<O>,
): (a: T1, b: T2, c: T3, d: T4) => O | I
<I, T1, T2, T3, T4, T5, O>(
getObservable: (
a: Observable<T1>,
b: Observable<T2>,
c: Observable<T3>,
d: Observable<T4>,
e: Observable<T5>,
) => Observable<O>,
initialValue: I,
options?: FactoryObservableOptions<O>,
): (a: T1, b: T2, c: T3, d: T4, e: T5) => O | I
<I, T1, T2, T3, T4, T5, T6, O>(
getObservable: (
a: Observable<T1>,
b: Observable<T2>,
c: Observable<T3>,
d: Observable<T4>,
e: Observable<T5>,
f: Observable<T6>,
) => Observable<O>,
initialValue: I,
options?: FactoryObservableOptions<O>,
): (a: T1, b: T2, c: T3, d: T4, e: T5, f: T6) => O | I
<I, T1, T2, T3, T4, T5, T6, T7, O>(
getObservable: (
a: Observable<T1>,
b: Observable<T2>,
c: Observable<T3>,
d: Observable<T4>,
e: Observable<T5>,
f: Observable<T6>,
g: Observable<T7>,
) => Observable<O>,
initialValue: I,
options?: FactoryObservableOptions<O>,
): (a: T1, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7) => O | I
}
export const connectInstanceObservable: ConnectInstanceObservable = (
getObservable: any,
initialValue: any,
_options?: any,
) => {
const options = {
...defaultFactoryOptions,
..._options,
}
const useInstance = (...input: any) => {
const subjectsRef = useRef<any>()
const [state, setState] = useState(initialValue)
useLayoutEffect(() => {
const subjects = [] as Subject<any>[]
for (let i = 0; i < getObservable.length; i++) {
subjects.push(new Subject())
}
subjectsRef.current = subjects
const inputs = subjects.map(s => s.pipe(distinctUntilChanged())) as any
const subscription = getObservable(...inputs)
.pipe(
distinctUntilChanged(),
delayUnsubscription(options.unsubscribeGraceTime),
)
.subscribe(setState)
return () => subscription.unsubscribe()
}, [])
useEffect(() => {
input.forEach((value: any, idx: any) =>
subjectsRef.current![idx].next(value),
)
}, input)
return state
}
return useInstance as any
}

View File

@ -1,4 +1,5 @@
export { connectObservable } from "./connectObservable"
export { connectFactoryObservable } from "./connectFactoryObservable"
export { connectInstanceObservable } from "./connectInstanceObservable"
export { default as distinctShareReplay } from "./operators/distinct-share-replay"
export { default as useSharedReplayableObservable } from "./useSharedReplayableObservable"

View File

@ -0,0 +1,68 @@
import { connectInstanceObservable } from "../src"
import { of, Subject, Observable, combineLatest, merge } from "rxjs"
import { renderHook } from "@testing-library/react-hooks"
import {
ignoreElements,
switchMap,
distinctUntilChanged,
scan,
shareReplay,
tap,
map,
startWith,
} from "rxjs/operators"
import { groupInMap } from "@josepot/rxjs-utils"
const getPriceData = (key: string, period: number) =>
of(
key
.split("")
.map((_, idx) => idx)
.concat(period),
)
const tagAndPeriod$ = new Subject<[string, number]>()
const tagsData$ = tagAndPeriod$.pipe(
groupInMap(
([tag]) => tag,
stream$ =>
stream$.pipe(
scan((maxPeriod, [, period]) => Math.max(maxPeriod, period), 0),
distinctUntilChanged(),
switchMap(period => getPriceData(stream$.key, period)),
),
),
startWith(new Map()),
shareReplay(1),
)
const empty: number[] = []
const usePriceData = connectInstanceObservable(
(tags$: Observable<string>, periods$: Observable<number>) => {
const plug$ = combineLatest(tags$, periods$).pipe(
tap(tagAndPeriod$),
ignoreElements(),
)
const data$ = tags$.pipe(
switchMap(tag =>
tagsData$.pipe(
map(tags => tags.get(tag) || empty),
distinctUntilChanged(),
),
),
)
return combineLatest(data$, merge(periods$, plug$)).pipe(
map(([data, period]) => data.slice(-period)),
)
},
[],
)
describe("connectInstanceObservable", () => {
it("works", async () => {
const { result } = renderHook(() => usePriceData("hello", 9))
expect(result.current).toEqual([0, 1, 2, 3, 4, 9])
})
})