mirror of
https://github.com/re-rxjs/react-rxjs.git
synced 2025-12-08 18:01:51 +00:00
v0.1.1 lots of improvements
This commit is contained in:
parent
7d68afd5cd
commit
22d7d3b2d4
14
package-lock.json
generated
14
package-lock.json
generated
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@josepot/react-rxjs",
|
||||
"version": "0.1.0",
|
||||
"version": "0.1.1",
|
||||
"lockfileVersion": 1,
|
||||
"requires": true,
|
||||
"dependencies": {
|
||||
@ -1564,9 +1564,9 @@
|
||||
}
|
||||
},
|
||||
"@types/jest": {
|
||||
"version": "25.2.2",
|
||||
"resolved": "https://registry.npmjs.org/@types/jest/-/jest-25.2.2.tgz",
|
||||
"integrity": "sha512-aRctFbG8Pb7DSLzUt/fEtL3q/GKb9mretFuYhRub2J0q6NhzBYbx9HTQzHrWgBNIxYOlxGNVe6Z54cpbUt+Few==",
|
||||
"version": "25.2.3",
|
||||
"resolved": "https://registry.npmjs.org/@types/jest/-/jest-25.2.3.tgz",
|
||||
"integrity": "sha512-JXc1nK/tXHiDhV55dvfzqtmP4S3sy3T3ouV2tkViZgxY/zeUkcpQcQPGRlgF4KmWzWW5oiWYSZwtCB+2RsE4Fw==",
|
||||
"dev": true,
|
||||
"requires": {
|
||||
"jest-diff": "^25.2.1",
|
||||
@ -9181,9 +9181,9 @@
|
||||
"dev": true
|
||||
},
|
||||
"typescript": {
|
||||
"version": "3.9.2",
|
||||
"resolved": "https://registry.npmjs.org/typescript/-/typescript-3.9.2.tgz",
|
||||
"integrity": "sha512-q2ktq4n/uLuNNShyayit+DTobV2ApPEo/6so68JaD5ojvc/6GClBipedB9zNWYxRSAlZXAe405Rlijzl6qDiSw==",
|
||||
"version": "3.9.3",
|
||||
"resolved": "https://registry.npmjs.org/typescript/-/typescript-3.9.3.tgz",
|
||||
"integrity": "sha512-D/wqnB2xzNFIcoBG9FG8cXRDjiqSTbG2wd8DMZeQyJlP1vfTkIxH4GKveWaEBYySKIg+USu+E+EDIR47SqnaMQ==",
|
||||
"dev": true
|
||||
},
|
||||
"unicode-canonical-property-names-ecmascript": {
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
{
|
||||
"version": "0.1.0",
|
||||
"version": "0.1.1",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git+https://github.com/josepot/react-rxjs.git"
|
||||
@ -36,7 +36,7 @@
|
||||
"module": "dist/react-rxjs.esm.js",
|
||||
"devDependencies": {
|
||||
"@testing-library/react-hooks": "^3.2.1",
|
||||
"@types/jest": "^25.2.2",
|
||||
"@types/jest": "^25.2.3",
|
||||
"@types/react": "^16.9.35",
|
||||
"@types/react-dom": "^16.9.8",
|
||||
"husky": "^4.2.5",
|
||||
@ -46,6 +46,6 @@
|
||||
"rxjs": "^6.5.5",
|
||||
"tsdx": "^0.13.2",
|
||||
"tslib": "^2.0.0",
|
||||
"typescript": "^3.9.2"
|
||||
"typescript": "^3.9.3"
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,8 +0,0 @@
|
||||
import { Observable } from "rxjs"
|
||||
import { debounceTime } from "rxjs/operators"
|
||||
|
||||
export const batchUpdates: <T>(
|
||||
source: Observable<T>,
|
||||
) => Observable<T> = debounceTime(0)
|
||||
|
||||
export default batchUpdates
|
||||
@ -1,11 +1,12 @@
|
||||
import { Observable } from "rxjs"
|
||||
import { useEffect, useState } from "react"
|
||||
import reactOperator from "./react-operator"
|
||||
import { batchUpdates } from "./batch-updates"
|
||||
import { Observable, of } from "rxjs"
|
||||
import { finalize, delay, takeUntil } from "rxjs/operators"
|
||||
import {
|
||||
StaticObservableOptions,
|
||||
defaultStaticOptions,
|
||||
} from "./connectObservable"
|
||||
import distinctShareReplay from "./operators/distinct-share-replay"
|
||||
import reactOptimizations from "./operators/react-optimizations"
|
||||
|
||||
interface FactoryObservableOptions<T> extends StaticObservableOptions<T> {
|
||||
suspenseTime: number
|
||||
@ -26,12 +27,14 @@ export function connectFactoryObservable<
|
||||
options?: Partial<FactoryObservableOptions<O>>,
|
||||
): [(...args: A) => O | I, (...args: A) => Observable<O>] {
|
||||
const { suspenseTime, unsubscribeGraceTime, compare } = {
|
||||
...options,
|
||||
...defaultOptions,
|
||||
...options,
|
||||
}
|
||||
|
||||
const reactEnhander = reactOptimizations(unsubscribeGraceTime)
|
||||
const cache = new Map<string, Observable<O>>()
|
||||
|
||||
const getReactObservable$ = (...input: A): Observable<O> => {
|
||||
const getSharedObservable$ = (...input: A): Observable<O> => {
|
||||
const key = JSON.stringify(input)
|
||||
const cachedVal = cache.get(key)
|
||||
|
||||
@ -39,14 +42,11 @@ export function connectFactoryObservable<
|
||||
return cachedVal
|
||||
}
|
||||
|
||||
const reactObservable$ = reactOperator(
|
||||
getObservable(...input),
|
||||
initialValue,
|
||||
unsubscribeGraceTime,
|
||||
compare,
|
||||
() => {
|
||||
const reactObservable$ = getObservable(...input).pipe(
|
||||
distinctShareReplay(compare),
|
||||
finalize(() => {
|
||||
cache.delete(key)
|
||||
},
|
||||
}),
|
||||
)
|
||||
|
||||
cache.set(key, reactObservable$)
|
||||
@ -58,31 +58,26 @@ export function connectFactoryObservable<
|
||||
const [value, setValue] = useState<I | O>(initialValue)
|
||||
|
||||
useEffect(() => {
|
||||
let timeoutToken: NodeJS.Timeout | null = null
|
||||
const sharedObservable$ = getSharedObservable$(...input)
|
||||
const subscription = reactEnhander(sharedObservable$).subscribe(
|
||||
setValue,
|
||||
)
|
||||
|
||||
if (suspenseTime === 0) {
|
||||
setValue(initialValue)
|
||||
} else if (suspenseTime < Infinity) {
|
||||
timeoutToken = setTimeout(() => {
|
||||
timeoutToken = null
|
||||
setValue(initialValue)
|
||||
}, suspenseTime)
|
||||
subscription.add(
|
||||
of(initialValue)
|
||||
.pipe(delay(suspenseTime), takeUntil(sharedObservable$))
|
||||
.subscribe(setValue),
|
||||
)
|
||||
}
|
||||
|
||||
const subscription = batchUpdates(
|
||||
getReactObservable$(...input),
|
||||
).subscribe(value => {
|
||||
if (timeoutToken !== null) clearTimeout(timeoutToken)
|
||||
setValue(value)
|
||||
})
|
||||
return () => {
|
||||
subscription.unsubscribe()
|
||||
if (timeoutToken !== null) clearTimeout(timeoutToken)
|
||||
}
|
||||
return () => subscription.unsubscribe()
|
||||
}, input)
|
||||
|
||||
return value
|
||||
},
|
||||
getReactObservable$,
|
||||
getSharedObservable$,
|
||||
]
|
||||
}
|
||||
|
||||
@ -1,14 +1,14 @@
|
||||
import { Observable } from "rxjs"
|
||||
import { useEffect, useState } from "react"
|
||||
import reactOperator from "./react-operator"
|
||||
import batchUpdates from "./batch-updates"
|
||||
import { Observable } from "rxjs"
|
||||
import reactOptimizations from "./operators/react-optimizations"
|
||||
import distinctShareReplay from "./operators/distinct-share-replay"
|
||||
|
||||
export interface StaticObservableOptions<T> {
|
||||
unsubscribeGraceTime: number
|
||||
compare: (a: T, b: T) => boolean
|
||||
}
|
||||
export const defaultStaticOptions: StaticObservableOptions<any> = {
|
||||
unsubscribeGraceTime: 100,
|
||||
unsubscribeGraceTime: 120,
|
||||
compare: (a, b) => a === b,
|
||||
}
|
||||
|
||||
@ -18,25 +18,21 @@ export function connectObservable<O, IO>(
|
||||
options?: Partial<StaticObservableOptions<O>>,
|
||||
) {
|
||||
const { unsubscribeGraceTime, compare } = {
|
||||
...options,
|
||||
...defaultStaticOptions,
|
||||
...options,
|
||||
}
|
||||
const reactObservable$ = reactOperator(
|
||||
observable,
|
||||
initialValue,
|
||||
unsubscribeGraceTime,
|
||||
compare,
|
||||
const sharedObservable$ = observable.pipe(distinctShareReplay(compare))
|
||||
const reactObservable$ = sharedObservable$.pipe(
|
||||
reactOptimizations(unsubscribeGraceTime),
|
||||
)
|
||||
|
||||
const useStaticObservable = () => {
|
||||
const [value, setValue] = useState<O | IO>(
|
||||
reactObservable$.getCurrentValue(),
|
||||
)
|
||||
const [value, setValue] = useState<O | IO>(initialValue)
|
||||
useEffect(() => {
|
||||
const subscription = batchUpdates(reactObservable$).subscribe(setValue)
|
||||
const subscription = reactObservable$.subscribe(setValue)
|
||||
return () => subscription.unsubscribe()
|
||||
}, [])
|
||||
return value
|
||||
}
|
||||
return [useStaticObservable, reactObservable$] as const
|
||||
return [useStaticObservable, sharedObservable$] as const
|
||||
}
|
||||
|
||||
@ -1,3 +1,3 @@
|
||||
export { connectObservable } from "./connectObservable"
|
||||
export { connectFactoryObservable } from "./connectFactoryObservable"
|
||||
export { ReactObservable } from "./react-operator"
|
||||
export { default as distinctShareReplay } from "./operators/distinct-share-replay"
|
||||
|
||||
31
src/operators/delay-unsubscription.ts
Normal file
31
src/operators/delay-unsubscription.ts
Normal file
@ -0,0 +1,31 @@
|
||||
import { Observable } from "rxjs"
|
||||
|
||||
const delayUnsubscription = <T>(delayTime: number) => (
|
||||
source$: Observable<T>,
|
||||
): Observable<T> =>
|
||||
delayTime === 0
|
||||
? source$
|
||||
: new Observable<T>(subscriber => {
|
||||
let isActive = true
|
||||
const subscription = source$.subscribe({
|
||||
next(value) {
|
||||
if (isActive) {
|
||||
subscriber.next(value)
|
||||
}
|
||||
},
|
||||
error(e) {
|
||||
subscriber.error(e)
|
||||
},
|
||||
complete() {
|
||||
subscriber.complete()
|
||||
},
|
||||
})
|
||||
return () => {
|
||||
isActive = false
|
||||
setTimeout(() => {
|
||||
subscription.unsubscribe()
|
||||
}, delayTime)
|
||||
}
|
||||
})
|
||||
|
||||
export default delayUnsubscription
|
||||
14
src/operators/distinct-share-replay.ts
Normal file
14
src/operators/distinct-share-replay.ts
Normal file
@ -0,0 +1,14 @@
|
||||
import { Observable, ReplaySubject, concat, NEVER } from "rxjs"
|
||||
import { distinctUntilChanged, multicast, refCount } from "rxjs/operators"
|
||||
|
||||
const distinctShareReplay = <T>(compare?: (a: T, b: T) => boolean) => (
|
||||
source$: Observable<T>,
|
||||
): Observable<T> =>
|
||||
source$.pipe(
|
||||
distinctUntilChanged(compare),
|
||||
innerSource => concat(innerSource, NEVER),
|
||||
multicast(() => new ReplaySubject<T>(1)),
|
||||
refCount(),
|
||||
)
|
||||
|
||||
export default distinctShareReplay
|
||||
9
src/operators/react-optimizations.ts
Normal file
9
src/operators/react-optimizations.ts
Normal file
@ -0,0 +1,9 @@
|
||||
import { Observable } from "rxjs"
|
||||
import { debounceTime } from "rxjs/operators"
|
||||
import delayUnsubscription from "./delay-unsubscription"
|
||||
|
||||
const reactOptimizations = (delayTime: number) => <T>(
|
||||
source: Observable<T>,
|
||||
): Observable<T> => source.pipe(delayUnsubscription(delayTime), debounceTime(0))
|
||||
|
||||
export default reactOptimizations
|
||||
@ -1,75 +0,0 @@
|
||||
import { Observable, ReplaySubject, Subscription } from "rxjs"
|
||||
import { distinctUntilChanged } from "rxjs/operators"
|
||||
|
||||
export interface ReactObservable<O, IO> extends Observable<O> {
|
||||
getCurrentValue: () => O | IO
|
||||
}
|
||||
|
||||
const reactOperator = <T, I>(
|
||||
source$: Observable<T>,
|
||||
initialValue: I,
|
||||
gracePeriod: number,
|
||||
compare: (a: T | I, b: T) => boolean,
|
||||
teardown?: () => void,
|
||||
): ReactObservable<T, I> => {
|
||||
let subject: ReplaySubject<T> | undefined
|
||||
let subscription: Subscription | undefined
|
||||
let timeoutToken: NodeJS.Timeout | undefined = undefined
|
||||
let refCount = 0
|
||||
let hasError = false
|
||||
let currentValue: T | I = initialValue
|
||||
|
||||
const observable$ = new Observable<T>(subscriber => {
|
||||
if (timeoutToken !== undefined) {
|
||||
clearTimeout(timeoutToken)
|
||||
}
|
||||
refCount++
|
||||
if (!subject || hasError) {
|
||||
hasError = false
|
||||
subject = new ReplaySubject<T>(1)
|
||||
subscription = distinctUntilChanged(compare)(source$).subscribe({
|
||||
next(value) {
|
||||
currentValue = value
|
||||
subject!.next(value)
|
||||
},
|
||||
error(err) {
|
||||
hasError = true
|
||||
subject!.error(err)
|
||||
},
|
||||
complete() {
|
||||
subscription = undefined
|
||||
subject!.complete()
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
const innerSub = subject.subscribe(subscriber)
|
||||
const cleanup = () => {
|
||||
timeoutToken = undefined
|
||||
currentValue = initialValue
|
||||
teardown?.()
|
||||
if (subscription) {
|
||||
subscription.unsubscribe()
|
||||
subscription = undefined
|
||||
}
|
||||
subject = undefined
|
||||
}
|
||||
return () => {
|
||||
refCount--
|
||||
innerSub.unsubscribe()
|
||||
if (refCount === 0) {
|
||||
if (gracePeriod > 0) {
|
||||
timeoutToken = setTimeout(cleanup, gracePeriod)
|
||||
} else {
|
||||
cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const result = observable$ as ReactObservable<T, I>
|
||||
result.getCurrentValue = () => currentValue
|
||||
return result
|
||||
}
|
||||
|
||||
export default reactOperator
|
||||
104
test/connectFactoryObservable.test.tsx
Normal file
104
test/connectFactoryObservable.test.tsx
Normal file
@ -0,0 +1,104 @@
|
||||
import { connectFactoryObservable } from "../src"
|
||||
import { NEVER, from, of, defer, concat } from "rxjs"
|
||||
import { renderHook, act } from "@testing-library/react-hooks"
|
||||
import { useEffect, useState } from "react"
|
||||
import { delay } from "rxjs/operators"
|
||||
|
||||
const wait = (ms: number) => new Promise(res => setTimeout(res, ms))
|
||||
|
||||
describe("connectObservable", () => {
|
||||
it("returns the initial value when the stream has not emitted anything", async () => {
|
||||
const [useSomething] = connectFactoryObservable(
|
||||
(id: number) => concat(NEVER, of(id)),
|
||||
"initialValue",
|
||||
)
|
||||
const { result } = renderHook(() => useSomething(5))
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
|
||||
expect(result.current).toBe("initialValue")
|
||||
})
|
||||
|
||||
it("returns the latest emitted value", async () => {
|
||||
const [useNumber] = connectFactoryObservable((id: number) => of(id), 0)
|
||||
const { result } = renderHook(() => useNumber(1))
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(result.current).toBe(1)
|
||||
})
|
||||
|
||||
it("batches the updates that happen on the same event-loop", async () => {
|
||||
const observable$ = from([1, 2, 3, 4, 5])
|
||||
const [useLatestNumber] = connectFactoryObservable(
|
||||
(id: number) => concat(observable$, of(id).pipe(delay(1000))),
|
||||
0,
|
||||
)
|
||||
const useLatestNumberTest = () => {
|
||||
const latestNumber = useLatestNumber(6)
|
||||
const [emittedValues, setEmittedValues] = useState<number[]>([])
|
||||
useEffect(() => {
|
||||
setEmittedValues(prev => [...prev, latestNumber])
|
||||
}, [latestNumber])
|
||||
return emittedValues
|
||||
}
|
||||
|
||||
const { result } = renderHook(() => useLatestNumberTest())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(result.current).toEqual([0, 5])
|
||||
})
|
||||
|
||||
it("shares the source subscription until the refCount has stayed at zero for the grace-period", async () => {
|
||||
let nInitCount = 0
|
||||
const observable$ = defer(() => {
|
||||
nInitCount += 1
|
||||
return from([1, 2, 3, 4, 5])
|
||||
})
|
||||
|
||||
const [useLatestNumber] = connectFactoryObservable(
|
||||
(id: number) => concat(observable$, of(id)),
|
||||
0,
|
||||
{
|
||||
unsubscribeGraceTime: 100,
|
||||
},
|
||||
)
|
||||
const { unmount } = renderHook(() => useLatestNumber(6))
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
const { unmount: unmount2 } = renderHook(() => useLatestNumber(6))
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
const { unmount: unmount3 } = renderHook(() => useLatestNumber(6))
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(nInitCount).toBe(1)
|
||||
unmount()
|
||||
unmount2()
|
||||
unmount3()
|
||||
|
||||
await act(async () => {
|
||||
await wait(90)
|
||||
})
|
||||
const { unmount: unmount4 } = renderHook(() => useLatestNumber(6))
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(nInitCount).toBe(1)
|
||||
unmount4()
|
||||
|
||||
await act(async () => {
|
||||
await wait(101)
|
||||
})
|
||||
renderHook(() => useLatestNumber(6))
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(nInitCount).toBe(2)
|
||||
})
|
||||
})
|
||||
@ -6,16 +6,22 @@ import { useEffect, useState } from "react"
|
||||
const wait = (ms: number) => new Promise(res => setTimeout(res, ms))
|
||||
|
||||
describe("connectObservable", () => {
|
||||
it("returns the initial value when the stream has not emitted anything", () => {
|
||||
it("returns the initial value when the stream has not emitted anything", async () => {
|
||||
const [useSomething] = connectObservable(NEVER, "initialValue")
|
||||
const { result } = renderHook(() => useSomething())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
|
||||
expect(result.current).toBe("initialValue")
|
||||
})
|
||||
|
||||
it("returns the latest emitted value", () => {
|
||||
it("returns the latest emitted value", async () => {
|
||||
const [useNumber] = connectObservable(of(1), 0)
|
||||
const { result } = renderHook(() => useNumber())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(result.current).toBe(1)
|
||||
})
|
||||
|
||||
@ -45,10 +51,21 @@ describe("connectObservable", () => {
|
||||
return from([1, 2, 3, 4, 5])
|
||||
})
|
||||
|
||||
const [useLatestNumber] = connectObservable(observable$, 0)
|
||||
const [useLatestNumber] = connectObservable(observable$, 0, {
|
||||
unsubscribeGraceTime: 100,
|
||||
})
|
||||
const { unmount } = renderHook(() => useLatestNumber())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
const { unmount: unmount2 } = renderHook(() => useLatestNumber())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
const { unmount: unmount3 } = renderHook(() => useLatestNumber())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(nInitCount).toBe(1)
|
||||
unmount()
|
||||
unmount2()
|
||||
@ -58,6 +75,9 @@ describe("connectObservable", () => {
|
||||
await wait(90)
|
||||
})
|
||||
const { unmount: unmount4 } = renderHook(() => useLatestNumber())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(nInitCount).toBe(1)
|
||||
unmount4()
|
||||
|
||||
@ -65,6 +85,9 @@ describe("connectObservable", () => {
|
||||
await wait(101)
|
||||
})
|
||||
renderHook(() => useLatestNumber())
|
||||
await act(async () => {
|
||||
await wait(0)
|
||||
})
|
||||
expect(nInitCount).toBe(2)
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user