diff --git a/README.md b/README.md index 1d9b5b6..36392ae 100644 --- a/README.md +++ b/README.md @@ -94,35 +94,32 @@ Returns `[1, 2]` 1. A React Hook function with the same parameters as the factory function. This hook will yield the latest update from the observable returned from the factory function. -2. A shared replayable version of the observable generated by the factory function that -can be used for composing other streams that depend on it. This observable is disposed -when its `refCount` goes down to zero. +2. A `sharedLatest` version of the observable returned by the factory function that +does not complete. It can be used for composing other streams that depend on it. -### distinctShareReplay +### shareLatest ```ts const activePlanetName$ = planet$.pipe( filter(planet => planet.isActive), map(planet => planet.name), - distinctShareReplay() + shareLatest() ) ``` -A RxJS pipeable operator which performs a custom `shareReplay` that can be useful -when working with these bindings. It's roughly the equivalent of: +A RxJS pipeable operator which shares and replays the latest emitted value. It's +the equivalent of: ```ts -const distinctShareReplay = (compare = Object.is) => ( - source$: Observable, -): Observable => +const shareLatest = (): Observable => source$.pipe( - distinctUntilChanged(compare), multicast(() => new ReplaySubject(1)), refCount(), ) ``` The enhanced observables returned from `connectObservable` and `connectFactoryObservable` -have been enhanced with this operator. +have been enhanced with this operator, but do not complete. Meaning that the latest +emitted value will be available until the `refCount` drops to zero. ### SUSPENSE diff --git a/src/connectFactoryObservable.ts b/src/connectFactoryObservable.ts index 2d3000b..afcec5f 100644 --- a/src/connectFactoryObservable.ts +++ b/src/connectFactoryObservable.ts @@ -1,10 +1,9 @@ -import { Observable, NEVER, concat } from "rxjs" -import { distinctShareReplay } from "./operators/distinct-share-replay" -import reactEnhancer from "./operators/react-enhancer" -import { ConnectorOptions, defaultConnectorOptions } from "./options" -import { BehaviorObservable } from "./BehaviorObservable" +import { Observable } from "rxjs" +import shareLatest from "./internal/share-latest" +import reactEnhancer from "./internal/react-enhancer" +import { BehaviorObservable } from "./internal/BehaviorObservable" +import { useObservable } from "./internal/useObservable" import { SUSPENSE } from "./SUSPENSE" -import { useObservable } from "./useObservable" /** * Accepts: A factory function that returns an Observable. @@ -19,27 +18,20 @@ import { useObservable } from "./useObservable" * * @param getObservable Factory of observables. The arguments of this function * will be the ones used in the hook. - * @param options ConnectorOptions: - * - unsubscribeGraceTime (= 200): Amount of time in ms that the shared - * observable should wait before unsubscribing from the source observable - * when there are no new subscribers. - * - compare (= Object.is): Equality function. + * @param unsubscribeGraceTime (= 200): Amount of time in ms that the shared + * observable should wait before unsubscribing from the source observable + * when there are no new subscribers. */ export function connectFactoryObservable< A extends (number | string | boolean | null)[], O >( getObservable: (...args: A) => Observable, - options?: ConnectorOptions, + unsubscribeGraceTime = 200, ): [ (...args: A) => Exclude, (...args: A) => Observable, ] { - const _options = { - ...defaultConnectorOptions, - ...options, - } - const cache = new Map< string, [BehaviorObservable, BehaviorObservable] @@ -55,13 +47,13 @@ export function connectFactoryObservable< return cachedVal } - const sharedObservable$ = distinctShareReplay(_options.compare, () => { + const sharedObservable$ = shareLatest(false, () => { cache.delete(key) - })(concat(getObservable(...input), NEVER)) + })(getObservable(...input)) const reactObservable$ = reactEnhancer( sharedObservable$, - _options.unsubscribeGraceTime, + unsubscribeGraceTime, ) const result: [BehaviorObservable, BehaviorObservable] = [ sharedObservable$, diff --git a/src/connectObservable.ts b/src/connectObservable.ts index 4da520c..d5617fb 100644 --- a/src/connectObservable.ts +++ b/src/connectObservable.ts @@ -1,8 +1,7 @@ -import { Observable, NEVER, concat } from "rxjs" -import { distinctShareReplay } from "./operators/distinct-share-replay" -import reactEnhancer from "./operators/react-enhancer" -import { useObservable } from "./useObservable" -import { ConnectorOptions, defaultConnectorOptions } from "./options" +import { Observable } from "rxjs" +import shareLatest from "./internal/share-latest" +import reactEnhancer from "./internal/react-enhancer" +import { useObservable } from "./internal/useObservable" /** * Returns a hook that provides the latest update of the accepted observable, @@ -13,30 +12,19 @@ import { ConnectorOptions, defaultConnectorOptions } from "./options" * observable. * * @param observable Source observable to be used by the hook. - * @param options ConnectorOptions: - * - unsubscribeGraceTime (= 200): Amount of time in ms that the shared - * observable should wait before unsubscribing from the source observable - * when there are no new subscribers. - * - compare (= Object.is): Equality function. + * @param unsubscribeGraceTime (= 200): Amount of time in ms that the shared + * observable should wait before unsubscribing from the source observable + * when there are no new subscribers. */ export function connectObservable( observable: Observable, - options?: ConnectorOptions, + unsubscribeGraceTime = 200, ) { - const _options = { - ...defaultConnectorOptions, - ...options, - } - const sharedObservable$ = distinctShareReplay(_options.compare)( - concat(observable, NEVER), - ) - + const sharedObservable$ = shareLatest(false)(observable) const reactObservable$ = reactEnhancer( sharedObservable$, - _options.unsubscribeGraceTime, + unsubscribeGraceTime, ) - const useStaticObservable = () => useObservable(reactObservable$) - - return [useStaticObservable, sharedObservable$] as const + return [useStaticObservable, sharedObservable$ as Observable] as const } diff --git a/src/createInput.ts b/src/createInput.ts index 3839bb9..0f0ccea 100644 --- a/src/createInput.ts +++ b/src/createInput.ts @@ -1,5 +1,5 @@ import { Subject, Observable, ReplaySubject } from "rxjs" -import { distinctShareReplay } from "./operators/distinct-share-replay" +import shareLatest from "./internal/share-latest" interface CreateInput { /** @@ -22,7 +22,6 @@ interface CreateInput { } const empty = Symbol("empty") as any -const F = () => false const createInput_ = (defaultValue: T = empty) => { const cache = new Map, { latest: T }, Observable]>() const getEntry = (key: string) => { @@ -33,7 +32,7 @@ const createInput_ = (defaultValue: T = empty) => { if (defaultValue !== empty) { subject.next((current.latest = defaultValue)) } - const source = distinctShareReplay(F, () => cache.delete(key))( + const source = shareLatest(true, () => cache.delete(key))( subject, ) as Observable result = [subject, current, source] diff --git a/src/index.tsx b/src/index.tsx index 3c56070..04a2551 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -1,4 +1,4 @@ -import { distinctShareReplay as internalDistinctShareReplay } from "./operators/distinct-share-replay" +export { shareLatest } from "./operators/shareLatest" // support for React Suspense export { SUSPENSE } from "./SUSPENSE" @@ -10,26 +10,5 @@ export { switchMapSuspended } from "./operators/switchMapSuspended" export { connectObservable } from "./connectObservable" export { connectFactoryObservable } from "./connectFactoryObservable" -/** - * A RxJS pipeable operator which performs a custom shareReplay that can be - * useful when working with these bindings. It's roughly the equivalent of: - * - * ```ts - * source$.pipe( - * distinctUntilChanged(compare), - * multicast(() => new ReplaySubject(1)), - * refCount(), - * ) - * ``` - * - * @param compareFn Equality function. - * - * @remarks The enhanced observables returned from connectObservable and - * connectFactoryObservable have been enhanced with this operator. - */ -export function distinctShareReplay(compareFn?: (a: T, b: T) => boolean) { - return internalDistinctShareReplay(compareFn) -} - // utils export { createInput } from "./createInput" diff --git a/src/BehaviorObservable.ts b/src/internal/BehaviorObservable.ts similarity index 100% rename from src/BehaviorObservable.ts rename to src/internal/BehaviorObservable.ts diff --git a/src/internal/empty-value.ts b/src/internal/empty-value.ts new file mode 100644 index 0000000..dbe6b83 --- /dev/null +++ b/src/internal/empty-value.ts @@ -0,0 +1 @@ +export const EMPTY_VALUE: any = {} diff --git a/src/internal/noop.ts b/src/internal/noop.ts new file mode 100644 index 0000000..f4ae299 --- /dev/null +++ b/src/internal/noop.ts @@ -0,0 +1 @@ +export const noop = Function.prototype as () => void diff --git a/src/operators/react-enhancer.ts b/src/internal/react-enhancer.ts similarity index 88% rename from src/operators/react-enhancer.ts rename to src/internal/react-enhancer.ts index ec0a063..2e44883 100644 --- a/src/operators/react-enhancer.ts +++ b/src/internal/react-enhancer.ts @@ -1,13 +1,14 @@ import { Observable, of, Subscription, Subject, race } from "rxjs" import { delay, takeUntil, take, filter, tap } from "rxjs/operators" -import { BehaviorObservable } from "../BehaviorObservable" import { SUSPENSE } from "../SUSPENSE" +import { BehaviorObservable } from "./BehaviorObservable" +import { EMPTY_VALUE } from "./empty-value" +import { noop } from "./noop" const IS_SSR = typeof window === "undefined" || typeof window.document === "undefined" || typeof window.document.createElement === "undefined" -const noop = Function.prototype as () => void const reactEnhancer = ( source$: Observable, @@ -17,10 +18,11 @@ const reactEnhancer = ( const onSubscribe = new Subject() const result = new Observable(subscriber => { let isActive = true + let latestValue = EMPTY_VALUE const subscription = source$.subscribe({ next(value) { - if (isActive) { - subscriber.next(value) + if (isActive && !Object.is(latestValue, value)) { + subscriber.next((latestValue = value)) } }, error(e) { diff --git a/src/operators/distinct-share-replay.ts b/src/internal/share-latest.ts similarity index 72% rename from src/operators/distinct-share-replay.ts rename to src/internal/share-latest.ts index b80330d..c582dd2 100644 --- a/src/operators/distinct-share-replay.ts +++ b/src/internal/share-latest.ts @@ -1,15 +1,12 @@ import { Observable, Subscription, Subject } from "rxjs" import { SUSPENSE } from "../SUSPENSE" -import { BehaviorObservable } from "../BehaviorObservable" +import { BehaviorObservable } from "./BehaviorObservable" +import { EMPTY_VALUE } from "./empty-value" +import { noop } from "./noop" -function defaultTeardown() {} - -export const EMPTY_VALUE: any = {} - -export const distinctShareReplay = ( - compareFn: (a: T, b: T) => boolean = Object.is, - teardown = defaultTeardown, -) => (source$: Observable): BehaviorObservable => { +const shareLatest = (complete = true, teardown = noop) => ( + source$: Observable, +): BehaviorObservable => { let subject: Subject | undefined let subscription: Subscription | undefined let refCount = 0 @@ -24,20 +21,17 @@ export const distinctShareReplay = ( innerSub = subject.subscribe(subscriber) subscription = source$.subscribe({ next(value) { - if ( - currentValue.value === EMPTY_VALUE || - !compareFn(value, currentValue.value) - ) { - subject!.next((currentValue.value = value)) - } + subject!.next((currentValue.value = value)) }, error(err) { subscription = undefined subject!.error(err) }, complete() { - subscription = undefined - subject!.complete() + if (complete) { + subscription = undefined + subject!.complete() + } }, }) } else { @@ -73,3 +67,4 @@ export const distinctShareReplay = ( return result } +export default shareLatest diff --git a/src/useObservable.ts b/src/internal/useObservable.ts similarity index 96% rename from src/useObservable.ts rename to src/internal/useObservable.ts index 2426127..68eb3dd 100644 --- a/src/useObservable.ts +++ b/src/internal/useObservable.ts @@ -1,6 +1,6 @@ import { useEffect, useReducer } from "react" -import { SUSPENSE } from "./SUSPENSE" import { BehaviorObservable } from "./BehaviorObservable" +import { SUSPENSE } from "../SUSPENSE" const reducer = ( _: any, diff --git a/src/operators/shareLatest.ts b/src/operators/shareLatest.ts new file mode 100644 index 0000000..e831caf --- /dev/null +++ b/src/operators/shareLatest.ts @@ -0,0 +1,19 @@ +import internalShareLatest from "../internal/share-latest" + +/** + * A RxJS pipeable operator which shares and replays the latest emitted value. + * It's the equivalent of: + * + * ```ts + * source$.pipe( + * multicast(() => new ReplaySubject(1)), + * refCount(), + * ) + * ``` + * + * @remarks The enhanced observables returned from `connectObservable` and + * `connectFactoryObservable` have been enhanced with this operator, but do not + * complete. Meaning that the latest emitted value will be available until the + * `refCount` drops to zero. + */ +export const shareLatest = () => internalShareLatest() diff --git a/src/options.ts b/src/options.ts deleted file mode 100644 index d2f6c9b..0000000 --- a/src/options.ts +++ /dev/null @@ -1,8 +0,0 @@ -export interface ConnectorOptions { - unsubscribeGraceTime?: number - compare?: (a: T, b: T) => boolean -} -export const defaultConnectorOptions = { - unsubscribeGraceTime: 200, - compare: Object.is, -} diff --git a/test/connectFactoryObservable.test.tsx b/test/connectFactoryObservable.test.tsx index dd43169..72db467 100644 --- a/test/connectFactoryObservable.test.tsx +++ b/test/connectFactoryObservable.test.tsx @@ -78,9 +78,7 @@ describe("connectFactoryObservable", () => { const [useLatestNumber] = connectFactoryObservable( (id: number) => concat(observable$, of(id)), - { - unsubscribeGraceTime: 100, - }, + 100, ) const { unmount } = renderHook(() => useLatestNumber(6)) const { unmount: unmount2 } = renderHook(() => useLatestNumber(6)) diff --git a/test/connectObservable.test.tsx b/test/connectObservable.test.tsx index e597901..754b7f2 100644 --- a/test/connectObservable.test.tsx +++ b/test/connectObservable.test.tsx @@ -5,7 +5,7 @@ import { screen, } from "@testing-library/react" import { act, renderHook } from "@testing-library/react-hooks" -import React, { Suspense, useEffect, useRef, FC } from "react" +import React, { Suspense, useEffect, FC } from "react" import { BehaviorSubject, defer, from, of, Subject } from "rxjs" import { delay, scan, startWith, map } from "rxjs/operators" import { connectObservable, SUSPENSE } from "../src" @@ -78,53 +78,6 @@ describe("connectObservable", () => { expect(updates).toHaveBeenCalledTimes(2) }) - it("Only update when the previous and current update are distinct according to the comparator function", async () => { - interface TestUpdate { - value: number - valueToIgnore: string - } - const stream$ = new BehaviorSubject({ - value: 0, - valueToIgnore: "A", - }) - - const compare = (a: TestUpdate, b: TestUpdate) => a.value === b.value - const [useLatestValue] = connectObservable(stream$, { compare }) - const useLatestValueWithUpdates = () => { - const nUpdates = useRef(0) - const latestValue = useLatestValue() - useEffect(() => { - nUpdates.current++ - }) - return { - latestValue, - nUpdates, - } - } - - const { result } = renderHook(() => useLatestValueWithUpdates()) - expect(result.current.latestValue.valueToIgnore).toEqual("A") - expect(result.current.latestValue.value).toEqual(0) - expect(result.current.nUpdates.current).toEqual(1) - - act(() => { - stream$.next({ value: 0, valueToIgnore: "B" }) - }) - - //should not update to the latest value in the stream - expect(result.current.latestValue.valueToIgnore).toEqual("A") - expect(result.current.latestValue.value).toEqual(0) - //should not trigger a react update - expect(result.current.nUpdates.current).toEqual(1) - - act(() => { - stream$.next({ value: 1, valueToIgnore: "B" }) - }) - expect(result.current.latestValue.valueToIgnore).toEqual("B") - expect(result.current.latestValue.value).toEqual(1) - expect(result.current.nUpdates.current).toEqual(2) - }) - it("shares the source subscription until the refCount has stayed at zero for the grace-period", async () => { let nInitCount = 0 const observable$ = defer(() => { @@ -132,9 +85,7 @@ describe("connectObservable", () => { return from([1, 2, 3, 4, 5]) }) - const [useLatestNumber] = connectObservable(observable$, { - unsubscribeGraceTime: 100, - }) + const [useLatestNumber] = connectObservable(observable$, 100) const { unmount } = renderHook(() => useLatestNumber()) const { unmount: unmount2 } = renderHook(() => useLatestNumber()) const { unmount: unmount3 } = renderHook(() => useLatestNumber()) diff --git a/test/operators/react-enhancer.test.ts b/test/operators/react-enhancer.test.ts index a4098bf..4c4ec84 100644 --- a/test/operators/react-enhancer.test.ts +++ b/test/operators/react-enhancer.test.ts @@ -1,6 +1,6 @@ -import reactEnhancer from "../../src/operators/react-enhancer" -import { distinctShareReplay, SUSPENSE } from "../../src" -import { BehaviorObservable } from "../../src/BehaviorObservable" +import reactEnhancer from "../../src/internal/react-enhancer" +import { shareLatest, SUSPENSE } from "../../src" +import { BehaviorObservable } from "../../src/internal/BehaviorObservable" import { TestScheduler } from "rxjs/testing" import { Subject } from "rxjs" @@ -74,7 +74,7 @@ describe("operators/reactEnhancer", () => { it("getValue returns the latest emitted value", () => { const input = new Subject() const obs$ = reactEnhancer( - input.pipe(distinctShareReplay()), + input.pipe(shareLatest()), 0, ) as BehaviorObservable @@ -92,7 +92,7 @@ describe("operators/reactEnhancer", () => { it("if nothing has been emitted, then getValue throws a promise that will resolve when the first value is emitted", async () => { const input = new Subject() const obs$ = reactEnhancer( - input.pipe(distinctShareReplay()), + input.pipe(shareLatest()), 0, ) as BehaviorObservable @@ -115,7 +115,7 @@ describe("operators/reactEnhancer", () => { it("if the latest emitted value is SUSPENSE, then getValue throws a promise that will resolve when the next non SUSPENSE value is emitted", async () => { const input = new Subject() const obs$ = reactEnhancer( - input.pipe(distinctShareReplay()), + input.pipe(shareLatest()), 0, ) as BehaviorObservable diff --git a/test/operators/distinct-share-replay.test.ts b/test/operators/share-latest.test.ts similarity index 69% rename from test/operators/distinct-share-replay.test.ts rename to test/operators/share-latest.test.ts index a58d2d3..c992bad 100644 --- a/test/operators/distinct-share-replay.test.ts +++ b/test/operators/share-latest.test.ts @@ -1,7 +1,6 @@ -import { distinctShareReplay, SUSPENSE } from "../../src" -import { BehaviorObservable } from "../../src/BehaviorObservable" -import { EMPTY_VALUE } from "../../src/operators/distinct-share-replay" -import { cold } from "jest-marbles" +import { shareLatest, SUSPENSE } from "../../src" +import { BehaviorObservable } from "../../src/internal/BehaviorObservable" +import { EMPTY_VALUE } from "../../src/internal/empty-value" import { TestScheduler } from "rxjs/testing" import { Subject, from } from "rxjs" @@ -10,33 +9,7 @@ const scheduler = () => expect(actual).toEqual(expected) }) -describe("operators/distinctShareReplay", () => { - it("only emits distinct values", () => { - const values = { - a: { val: 1 }, - b: { val: 2 }, - c: { val: 3 }, - d: { val: 4 }, - } - - let source = " a-b-b-b-c-c-d|" - let expected = "a-b-----c---d|" - - expect(cold(source, values).pipe(distinctShareReplay())).toBeObservable( - cold(expected, values), - ) - - const customCompare = (a: { val: number }, b: { val: number }) => - a.val === b.val - values.c.val = 2 - - source = " a-b-b-b-c-c-d|" - expected = "a-b---------d|" - expect( - cold(source, values).pipe(distinctShareReplay(customCompare)), - ).toBeObservable(cold(expected, values)) - }) - +describe("operators/shareLatest", () => { // prettier-ignore it("should restart due to unsubscriptions", () => { scheduler().run(({ expectObservable, expectSubscriptions, cold }) => { @@ -49,7 +22,7 @@ describe("operators/distinctShareReplay", () => { const sub2 = " -----------^------------------" const expected2 = " -----------a-b-c-d-e-f-g-h-i-j" - const shared = source.pipe(distinctShareReplay()) + const shared = source.pipe(shareLatest()) expectObservable(shared, sub1).toBe(expected1) expectObservable(shared, sub2).toBe(expected2) @@ -69,7 +42,7 @@ describe("operators/distinctShareReplay", () => { const sub2 = '-----------^--!'; const expected2 = '-----------a-(b|)'; - const shared = source.pipe(distinctShareReplay()); + const shared = source.pipe(shareLatest()); expectObservable(shared, sub1).toBe(expected1); expectObservable(shared, sub2).toBe(expected2); @@ -84,7 +57,7 @@ describe("operators/distinctShareReplay", () => { const sub1 = '^'; const expected1 = " (abcd|)" - const shared = source.pipe(distinctShareReplay()); + const shared = source.pipe(shareLatest()); expectObservable(shared, sub1).toBe(expected1); }) @@ -93,9 +66,7 @@ describe("operators/distinctShareReplay", () => { describe("Returns a BehaviorObservable which exposes a getValue function", () => { it("getValue returns the latest emitted value", () => { const input = new Subject() - const obs$ = input.pipe(distinctShareReplay()) as BehaviorObservable< - string - > + const obs$ = input.pipe(shareLatest()) as BehaviorObservable const subscription = obs$.subscribe() @@ -110,9 +81,7 @@ describe("operators/distinctShareReplay", () => { it("getValue throws EMPTY_VALUE if nothing has been emitted", () => { const input = new Subject() - const obs$ = input.pipe(distinctShareReplay()) as BehaviorObservable< - string - > + const obs$ = input.pipe(shareLatest()) as BehaviorObservable const subscription = obs$.subscribe() let error: any @@ -127,7 +96,7 @@ describe("operators/distinctShareReplay", () => { it("getValue throws SUSPENSE if the latest emitted value is SUSPENSE", () => { const input = new Subject() - const obs$ = input.pipe(distinctShareReplay()) as BehaviorObservable + const obs$ = input.pipe(shareLatest()) as BehaviorObservable const subscription = obs$.subscribe() input.next(SUSPENSE) diff --git a/test/operators/suspend.test.ts b/test/operators/suspend.test.ts index c27e87b..ea2033e 100644 --- a/test/operators/suspend.test.ts +++ b/test/operators/suspend.test.ts @@ -1,6 +1,5 @@ import { TestScheduler } from "rxjs/testing" -import { suspend } from "../../src/operators/suspend" -import { SUSPENSE } from "../../src/SUSPENSE" +import { suspend, SUSPENSE } from "../../src" const scheduler = () => new TestScheduler((actual, expected) => { diff --git a/test/operators/suspended.test.ts b/test/operators/suspended.test.ts index 10b37d0..ae6b334 100644 --- a/test/operators/suspended.test.ts +++ b/test/operators/suspended.test.ts @@ -1,6 +1,5 @@ import { TestScheduler } from "rxjs/testing" -import { suspended } from "../../src/operators/suspended" -import { SUSPENSE } from "../../src/SUSPENSE" +import { suspended, SUSPENSE } from "../../src" const scheduler = () => new TestScheduler((actual, expected) => { diff --git a/test/operators/switchMapSuspended.test.ts b/test/operators/switchMapSuspended.test.ts index 1d0ff29..0ab3fd6 100644 --- a/test/operators/switchMapSuspended.test.ts +++ b/test/operators/switchMapSuspended.test.ts @@ -1,6 +1,5 @@ import { TestScheduler } from "rxjs/testing" -import { switchMapSuspended } from "../../src/operators/switchMapSuspended" -import { SUSPENSE } from "../../src/SUSPENSE" +import { switchMapSuspended, SUSPENSE } from "../../src" const scheduler = () => new TestScheduler((actual, expected) => { diff --git a/test/useObservable.test.tsx b/test/useObservable.test.tsx index 77282ab..8b52f0b 100644 --- a/test/useObservable.test.tsx +++ b/test/useObservable.test.tsx @@ -1,16 +1,17 @@ -import React, { useState, Suspense } from "react" +import React, { useState, Suspense, useRef, useEffect } from "react" import { render, fireEvent, screen } from "@testing-library/react" -import { defer, of, Subject, NEVER, concat } from "rxjs" +import { defer, of, Subject, NEVER, BehaviorSubject, Observable } from "rxjs" import { renderHook, act } from "@testing-library/react-hooks" -import { useObservable } from "../src/useObservable" -import reactEnhancer from "../src/operators/react-enhancer" -import { SUSPENSE, distinctShareReplay } from "../src" -import { BehaviorObservable } from "../src/BehaviorObservable" +import { useObservable } from "../src/internal/useObservable" +import shareLatest from "../src/internal/share-latest" +import reactEnhancer from "../src/internal/react-enhancer" +import { SUSPENSE } from "../src" +import { BehaviorObservable } from "../src/internal/BehaviorObservable" const wait = (ms: number) => new Promise(res => setTimeout(res, ms)) -const enhancer = (source$: any) => - reactEnhancer(concat(source$, NEVER).pipe(distinctShareReplay()), 20) +const enhancer = (source$: Observable) => + reactEnhancer(source$.pipe(shareLatest(false)), 20) describe("useObservable", () => { it("works", async () => { @@ -63,7 +64,42 @@ describe("useObservable", () => { expect(counter).toBe(2) }) - const observables: any = [NEVER, of(10), of(SUSPENSE), of(20)].map(enhancer) + it("doesn't trigger react updates when the observable emits the same value", () => { + const subject$ = new BehaviorSubject(1) + const src$ = subject$.pipe(enhancer) as BehaviorObservable + const useLatestNumber = () => { + const result = useObservable(src$) + const nUpdatesRef = useRef(0) + useEffect(() => { + nUpdatesRef.current++ + }) + return { result, nUpdatesRef } + } + + const { result } = renderHook(() => useLatestNumber()) + + expect(result.current.result).toBe(1) + expect(result.current.nUpdatesRef.current).toBe(1) + + act(() => { + subject$.next(1) + subject$.next(1) + subject$.next(1) + subject$.next(1) + }) + + expect(result.current.nUpdatesRef.current).toBe(1) + + act(() => { + subject$.next(20) + }) + expect(result.current.result).toBe(20) + expect(result.current.nUpdatesRef.current).toBe(2) + }) + + const observables: any = [NEVER, of(10), of(SUSPENSE), of(20)].map((x: any) => + enhancer(x), + ) const Result: React.FC<{ idx: number }> = ({ idx }) => { const result = useObservable(observables[idx % observables.length]) return
Result {result}