feat(shareLatest): removing distinctShareReplay in favor of shareLatest

This commit is contained in:
Josep M Sobrepere 2020-06-28 20:22:18 +02:00
parent 8ebcea5b6d
commit 455b511999
21 changed files with 142 additions and 226 deletions

View File

@ -94,35 +94,32 @@ Returns `[1, 2]`
1. A React Hook function with the same parameters as the factory function. This hook
will yield the latest update from the observable returned from the factory function.
2. A shared replayable version of the observable generated by the factory function that
can be used for composing other streams that depend on it. This observable is disposed
when its `refCount` goes down to zero.
2. A `sharedLatest` version of the observable returned by the factory function that
does not complete. It can be used for composing other streams that depend on it.
### distinctShareReplay
### shareLatest
```ts
const activePlanetName$ = planet$.pipe(
filter(planet => planet.isActive),
map(planet => planet.name),
distinctShareReplay()
shareLatest()
)
```
A RxJS pipeable operator which performs a custom `shareReplay` that can be useful
when working with these bindings. It's roughly the equivalent of:
A RxJS pipeable operator which shares and replays the latest emitted value. It's
the equivalent of:
```ts
const distinctShareReplay = <T>(compare = Object.is) => (
source$: Observable<T>,
): Observable<T> =>
const shareLatest = <T>(): Observable<T> =>
source$.pipe(
distinctUntilChanged(compare),
multicast(() => new ReplaySubject<T>(1)),
refCount(),
)
```
The enhanced observables returned from `connectObservable` and `connectFactoryObservable`
have been enhanced with this operator.
have been enhanced with this operator, but do not complete. Meaning that the latest
emitted value will be available until the `refCount` drops to zero.
### SUSPENSE

View File

@ -1,10 +1,9 @@
import { Observable, NEVER, concat } from "rxjs"
import { distinctShareReplay } from "./operators/distinct-share-replay"
import reactEnhancer from "./operators/react-enhancer"
import { ConnectorOptions, defaultConnectorOptions } from "./options"
import { BehaviorObservable } from "./BehaviorObservable"
import { Observable } from "rxjs"
import shareLatest from "./internal/share-latest"
import reactEnhancer from "./internal/react-enhancer"
import { BehaviorObservable } from "./internal/BehaviorObservable"
import { useObservable } from "./internal/useObservable"
import { SUSPENSE } from "./SUSPENSE"
import { useObservable } from "./useObservable"
/**
* Accepts: A factory function that returns an Observable.
@ -19,27 +18,20 @@ import { useObservable } from "./useObservable"
*
* @param getObservable Factory of observables. The arguments of this function
* will be the ones used in the hook.
* @param options ConnectorOptions:
* - unsubscribeGraceTime (= 200): Amount of time in ms that the shared
* observable should wait before unsubscribing from the source observable
* when there are no new subscribers.
* - compare (= Object.is): Equality function.
* @param unsubscribeGraceTime (= 200): Amount of time in ms that the shared
* observable should wait before unsubscribing from the source observable
* when there are no new subscribers.
*/
export function connectFactoryObservable<
A extends (number | string | boolean | null)[],
O
>(
getObservable: (...args: A) => Observable<O>,
options?: ConnectorOptions<O>,
unsubscribeGraceTime = 200,
): [
(...args: A) => Exclude<O, typeof SUSPENSE>,
(...args: A) => Observable<O>,
] {
const _options = {
...defaultConnectorOptions,
...options,
}
const cache = new Map<
string,
[BehaviorObservable<O>, BehaviorObservable<O>]
@ -55,13 +47,13 @@ export function connectFactoryObservable<
return cachedVal
}
const sharedObservable$ = distinctShareReplay(_options.compare, () => {
const sharedObservable$ = shareLatest<O>(false, () => {
cache.delete(key)
})(concat(getObservable(...input), NEVER))
})(getObservable(...input))
const reactObservable$ = reactEnhancer(
sharedObservable$,
_options.unsubscribeGraceTime,
unsubscribeGraceTime,
)
const result: [BehaviorObservable<O>, BehaviorObservable<O>] = [
sharedObservable$,

View File

@ -1,8 +1,7 @@
import { Observable, NEVER, concat } from "rxjs"
import { distinctShareReplay } from "./operators/distinct-share-replay"
import reactEnhancer from "./operators/react-enhancer"
import { useObservable } from "./useObservable"
import { ConnectorOptions, defaultConnectorOptions } from "./options"
import { Observable } from "rxjs"
import shareLatest from "./internal/share-latest"
import reactEnhancer from "./internal/react-enhancer"
import { useObservable } from "./internal/useObservable"
/**
* Returns a hook that provides the latest update of the accepted observable,
@ -13,30 +12,19 @@ import { ConnectorOptions, defaultConnectorOptions } from "./options"
* observable.
*
* @param observable Source observable to be used by the hook.
* @param options ConnectorOptions:
* - unsubscribeGraceTime (= 200): Amount of time in ms that the shared
* observable should wait before unsubscribing from the source observable
* when there are no new subscribers.
* - compare (= Object.is): Equality function.
* @param unsubscribeGraceTime (= 200): Amount of time in ms that the shared
* observable should wait before unsubscribing from the source observable
* when there are no new subscribers.
*/
export function connectObservable<T>(
observable: Observable<T>,
options?: ConnectorOptions<T>,
unsubscribeGraceTime = 200,
) {
const _options = {
...defaultConnectorOptions,
...options,
}
const sharedObservable$ = distinctShareReplay(_options.compare)(
concat(observable, NEVER),
)
const sharedObservable$ = shareLatest<T>(false)(observable)
const reactObservable$ = reactEnhancer(
sharedObservable$,
_options.unsubscribeGraceTime,
unsubscribeGraceTime,
)
const useStaticObservable = () => useObservable(reactObservable$)
return [useStaticObservable, sharedObservable$] as const
return [useStaticObservable, sharedObservable$ as Observable<T>] as const
}

View File

@ -1,5 +1,5 @@
import { Subject, Observable, ReplaySubject } from "rxjs"
import { distinctShareReplay } from "./operators/distinct-share-replay"
import shareLatest from "./internal/share-latest"
interface CreateInput {
/**
@ -22,7 +22,6 @@ interface CreateInput {
}
const empty = Symbol("empty") as any
const F = () => false
const createInput_ = <T>(defaultValue: T = empty) => {
const cache = new Map<string, [Subject<T>, { latest: T }, Observable<T>]>()
const getEntry = (key: string) => {
@ -33,7 +32,7 @@ const createInput_ = <T>(defaultValue: T = empty) => {
if (defaultValue !== empty) {
subject.next((current.latest = defaultValue))
}
const source = distinctShareReplay(F, () => cache.delete(key))(
const source = shareLatest(true, () => cache.delete(key))(
subject,
) as Observable<T>
result = [subject, current, source]

View File

@ -1,4 +1,4 @@
import { distinctShareReplay as internalDistinctShareReplay } from "./operators/distinct-share-replay"
export { shareLatest } from "./operators/shareLatest"
// support for React Suspense
export { SUSPENSE } from "./SUSPENSE"
@ -10,26 +10,5 @@ export { switchMapSuspended } from "./operators/switchMapSuspended"
export { connectObservable } from "./connectObservable"
export { connectFactoryObservable } from "./connectFactoryObservable"
/**
* A RxJS pipeable operator which performs a custom shareReplay that can be
* useful when working with these bindings. It's roughly the equivalent of:
*
* ```ts
* source$.pipe(
* distinctUntilChanged(compare),
* multicast(() => new ReplaySubject<T>(1)),
* refCount(),
* )
* ```
*
* @param compareFn Equality function.
*
* @remarks The enhanced observables returned from connectObservable and
* connectFactoryObservable have been enhanced with this operator.
*/
export function distinctShareReplay<T>(compareFn?: (a: T, b: T) => boolean) {
return internalDistinctShareReplay(compareFn)
}
// utils
export { createInput } from "./createInput"

View File

@ -0,0 +1 @@
export const EMPTY_VALUE: any = {}

1
src/internal/noop.ts Normal file
View File

@ -0,0 +1 @@
export const noop = Function.prototype as () => void

View File

@ -1,13 +1,14 @@
import { Observable, of, Subscription, Subject, race } from "rxjs"
import { delay, takeUntil, take, filter, tap } from "rxjs/operators"
import { BehaviorObservable } from "../BehaviorObservable"
import { SUSPENSE } from "../SUSPENSE"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
import { noop } from "./noop"
const IS_SSR =
typeof window === "undefined" ||
typeof window.document === "undefined" ||
typeof window.document.createElement === "undefined"
const noop = Function.prototype as () => void
const reactEnhancer = <T>(
source$: Observable<T>,
@ -17,10 +18,11 @@ const reactEnhancer = <T>(
const onSubscribe = new Subject()
const result = new Observable<T>(subscriber => {
let isActive = true
let latestValue = EMPTY_VALUE
const subscription = source$.subscribe({
next(value) {
if (isActive) {
subscriber.next(value)
if (isActive && !Object.is(latestValue, value)) {
subscriber.next((latestValue = value))
}
},
error(e) {

View File

@ -1,15 +1,12 @@
import { Observable, Subscription, Subject } from "rxjs"
import { SUSPENSE } from "../SUSPENSE"
import { BehaviorObservable } from "../BehaviorObservable"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
import { noop } from "./noop"
function defaultTeardown() {}
export const EMPTY_VALUE: any = {}
export const distinctShareReplay = <T>(
compareFn: (a: T, b: T) => boolean = Object.is,
teardown = defaultTeardown,
) => (source$: Observable<T>): BehaviorObservable<T> => {
const shareLatest = <T>(complete = true, teardown = noop) => (
source$: Observable<T>,
): BehaviorObservable<T> => {
let subject: Subject<T> | undefined
let subscription: Subscription | undefined
let refCount = 0
@ -24,20 +21,17 @@ export const distinctShareReplay = <T>(
innerSub = subject.subscribe(subscriber)
subscription = source$.subscribe({
next(value) {
if (
currentValue.value === EMPTY_VALUE ||
!compareFn(value, currentValue.value)
) {
subject!.next((currentValue.value = value))
}
subject!.next((currentValue.value = value))
},
error(err) {
subscription = undefined
subject!.error(err)
},
complete() {
subscription = undefined
subject!.complete()
if (complete) {
subscription = undefined
subject!.complete()
}
},
})
} else {
@ -73,3 +67,4 @@ export const distinctShareReplay = <T>(
return result
}
export default shareLatest

View File

@ -1,6 +1,6 @@
import { useEffect, useReducer } from "react"
import { SUSPENSE } from "./SUSPENSE"
import { BehaviorObservable } from "./BehaviorObservable"
import { SUSPENSE } from "../SUSPENSE"
const reducer = (
_: any,

View File

@ -0,0 +1,19 @@
import internalShareLatest from "../internal/share-latest"
/**
* A RxJS pipeable operator which shares and replays the latest emitted value.
* It's the equivalent of:
*
* ```ts
* source$.pipe(
* multicast(() => new ReplaySubject<T>(1)),
* refCount(),
* )
* ```
*
* @remarks The enhanced observables returned from `connectObservable` and
* `connectFactoryObservable` have been enhanced with this operator, but do not
* complete. Meaning that the latest emitted value will be available until the
* `refCount` drops to zero.
*/
export const shareLatest = <T>() => internalShareLatest<T>()

View File

@ -1,8 +0,0 @@
export interface ConnectorOptions<T> {
unsubscribeGraceTime?: number
compare?: (a: T, b: T) => boolean
}
export const defaultConnectorOptions = {
unsubscribeGraceTime: 200,
compare: Object.is,
}

View File

@ -78,9 +78,7 @@ describe("connectFactoryObservable", () => {
const [useLatestNumber] = connectFactoryObservable(
(id: number) => concat(observable$, of(id)),
{
unsubscribeGraceTime: 100,
},
100,
)
const { unmount } = renderHook(() => useLatestNumber(6))
const { unmount: unmount2 } = renderHook(() => useLatestNumber(6))

View File

@ -5,7 +5,7 @@ import {
screen,
} from "@testing-library/react"
import { act, renderHook } from "@testing-library/react-hooks"
import React, { Suspense, useEffect, useRef, FC } from "react"
import React, { Suspense, useEffect, FC } from "react"
import { BehaviorSubject, defer, from, of, Subject } from "rxjs"
import { delay, scan, startWith, map } from "rxjs/operators"
import { connectObservable, SUSPENSE } from "../src"
@ -78,53 +78,6 @@ describe("connectObservable", () => {
expect(updates).toHaveBeenCalledTimes(2)
})
it("Only update when the previous and current update are distinct according to the comparator function", async () => {
interface TestUpdate {
value: number
valueToIgnore: string
}
const stream$ = new BehaviorSubject<TestUpdate>({
value: 0,
valueToIgnore: "A",
})
const compare = (a: TestUpdate, b: TestUpdate) => a.value === b.value
const [useLatestValue] = connectObservable(stream$, { compare })
const useLatestValueWithUpdates = () => {
const nUpdates = useRef(0)
const latestValue = useLatestValue()
useEffect(() => {
nUpdates.current++
})
return {
latestValue,
nUpdates,
}
}
const { result } = renderHook(() => useLatestValueWithUpdates())
expect(result.current.latestValue.valueToIgnore).toEqual("A")
expect(result.current.latestValue.value).toEqual(0)
expect(result.current.nUpdates.current).toEqual(1)
act(() => {
stream$.next({ value: 0, valueToIgnore: "B" })
})
//should not update to the latest value in the stream
expect(result.current.latestValue.valueToIgnore).toEqual("A")
expect(result.current.latestValue.value).toEqual(0)
//should not trigger a react update
expect(result.current.nUpdates.current).toEqual(1)
act(() => {
stream$.next({ value: 1, valueToIgnore: "B" })
})
expect(result.current.latestValue.valueToIgnore).toEqual("B")
expect(result.current.latestValue.value).toEqual(1)
expect(result.current.nUpdates.current).toEqual(2)
})
it("shares the source subscription until the refCount has stayed at zero for the grace-period", async () => {
let nInitCount = 0
const observable$ = defer(() => {
@ -132,9 +85,7 @@ describe("connectObservable", () => {
return from([1, 2, 3, 4, 5])
})
const [useLatestNumber] = connectObservable(observable$, {
unsubscribeGraceTime: 100,
})
const [useLatestNumber] = connectObservable(observable$, 100)
const { unmount } = renderHook(() => useLatestNumber())
const { unmount: unmount2 } = renderHook(() => useLatestNumber())
const { unmount: unmount3 } = renderHook(() => useLatestNumber())

View File

@ -1,6 +1,6 @@
import reactEnhancer from "../../src/operators/react-enhancer"
import { distinctShareReplay, SUSPENSE } from "../../src"
import { BehaviorObservable } from "../../src/BehaviorObservable"
import reactEnhancer from "../../src/internal/react-enhancer"
import { shareLatest, SUSPENSE } from "../../src"
import { BehaviorObservable } from "../../src/internal/BehaviorObservable"
import { TestScheduler } from "rxjs/testing"
import { Subject } from "rxjs"
@ -74,7 +74,7 @@ describe("operators/reactEnhancer", () => {
it("getValue returns the latest emitted value", () => {
const input = new Subject<string>()
const obs$ = reactEnhancer(
input.pipe(distinctShareReplay()),
input.pipe(shareLatest()),
0,
) as BehaviorObservable<string>
@ -92,7 +92,7 @@ describe("operators/reactEnhancer", () => {
it("if nothing has been emitted, then getValue throws a promise that will resolve when the first value is emitted", async () => {
const input = new Subject<string>()
const obs$ = reactEnhancer(
input.pipe(distinctShareReplay()),
input.pipe(shareLatest()),
0,
) as BehaviorObservable<string>
@ -115,7 +115,7 @@ describe("operators/reactEnhancer", () => {
it("if the latest emitted value is SUSPENSE, then getValue throws a promise that will resolve when the next non SUSPENSE value is emitted", async () => {
const input = new Subject<any>()
const obs$ = reactEnhancer(
input.pipe(distinctShareReplay()),
input.pipe(shareLatest()),
0,
) as BehaviorObservable<any>

View File

@ -1,7 +1,6 @@
import { distinctShareReplay, SUSPENSE } from "../../src"
import { BehaviorObservable } from "../../src/BehaviorObservable"
import { EMPTY_VALUE } from "../../src/operators/distinct-share-replay"
import { cold } from "jest-marbles"
import { shareLatest, SUSPENSE } from "../../src"
import { BehaviorObservable } from "../../src/internal/BehaviorObservable"
import { EMPTY_VALUE } from "../../src/internal/empty-value"
import { TestScheduler } from "rxjs/testing"
import { Subject, from } from "rxjs"
@ -10,33 +9,7 @@ const scheduler = () =>
expect(actual).toEqual(expected)
})
describe("operators/distinctShareReplay", () => {
it("only emits distinct values", () => {
const values = {
a: { val: 1 },
b: { val: 2 },
c: { val: 3 },
d: { val: 4 },
}
let source = " a-b-b-b-c-c-d|"
let expected = "a-b-----c---d|"
expect(cold(source, values).pipe(distinctShareReplay())).toBeObservable(
cold(expected, values),
)
const customCompare = (a: { val: number }, b: { val: number }) =>
a.val === b.val
values.c.val = 2
source = " a-b-b-b-c-c-d|"
expected = "a-b---------d|"
expect(
cold(source, values).pipe(distinctShareReplay(customCompare)),
).toBeObservable(cold(expected, values))
})
describe("operators/shareLatest", () => {
// prettier-ignore
it("should restart due to unsubscriptions", () => {
scheduler().run(({ expectObservable, expectSubscriptions, cold }) => {
@ -49,7 +22,7 @@ describe("operators/distinctShareReplay", () => {
const sub2 = " -----------^------------------"
const expected2 = " -----------a-b-c-d-e-f-g-h-i-j"
const shared = source.pipe(distinctShareReplay())
const shared = source.pipe(shareLatest())
expectObservable(shared, sub1).toBe(expected1)
expectObservable(shared, sub2).toBe(expected2)
@ -69,7 +42,7 @@ describe("operators/distinctShareReplay", () => {
const sub2 = '-----------^--!';
const expected2 = '-----------a-(b|)';
const shared = source.pipe(distinctShareReplay());
const shared = source.pipe(shareLatest());
expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub2).toBe(expected2);
@ -84,7 +57,7 @@ describe("operators/distinctShareReplay", () => {
const sub1 = '^';
const expected1 = " (abcd|)"
const shared = source.pipe(distinctShareReplay());
const shared = source.pipe(shareLatest());
expectObservable(shared, sub1).toBe(expected1);
})
@ -93,9 +66,7 @@ describe("operators/distinctShareReplay", () => {
describe("Returns a BehaviorObservable which exposes a getValue function", () => {
it("getValue returns the latest emitted value", () => {
const input = new Subject<string>()
const obs$ = input.pipe(distinctShareReplay()) as BehaviorObservable<
string
>
const obs$ = input.pipe(shareLatest()) as BehaviorObservable<string>
const subscription = obs$.subscribe()
@ -110,9 +81,7 @@ describe("operators/distinctShareReplay", () => {
it("getValue throws EMPTY_VALUE if nothing has been emitted", () => {
const input = new Subject<string>()
const obs$ = input.pipe(distinctShareReplay()) as BehaviorObservable<
string
>
const obs$ = input.pipe(shareLatest()) as BehaviorObservable<string>
const subscription = obs$.subscribe()
let error: any
@ -127,7 +96,7 @@ describe("operators/distinctShareReplay", () => {
it("getValue throws SUSPENSE if the latest emitted value is SUSPENSE", () => {
const input = new Subject<any>()
const obs$ = input.pipe(distinctShareReplay()) as BehaviorObservable<any>
const obs$ = input.pipe(shareLatest()) as BehaviorObservable<any>
const subscription = obs$.subscribe()
input.next(SUSPENSE)

View File

@ -1,6 +1,5 @@
import { TestScheduler } from "rxjs/testing"
import { suspend } from "../../src/operators/suspend"
import { SUSPENSE } from "../../src/SUSPENSE"
import { suspend, SUSPENSE } from "../../src"
const scheduler = () =>
new TestScheduler((actual, expected) => {

View File

@ -1,6 +1,5 @@
import { TestScheduler } from "rxjs/testing"
import { suspended } from "../../src/operators/suspended"
import { SUSPENSE } from "../../src/SUSPENSE"
import { suspended, SUSPENSE } from "../../src"
const scheduler = () =>
new TestScheduler((actual, expected) => {

View File

@ -1,6 +1,5 @@
import { TestScheduler } from "rxjs/testing"
import { switchMapSuspended } from "../../src/operators/switchMapSuspended"
import { SUSPENSE } from "../../src/SUSPENSE"
import { switchMapSuspended, SUSPENSE } from "../../src"
const scheduler = () =>
new TestScheduler((actual, expected) => {

View File

@ -1,16 +1,17 @@
import React, { useState, Suspense } from "react"
import React, { useState, Suspense, useRef, useEffect } from "react"
import { render, fireEvent, screen } from "@testing-library/react"
import { defer, of, Subject, NEVER, concat } from "rxjs"
import { defer, of, Subject, NEVER, BehaviorSubject, Observable } from "rxjs"
import { renderHook, act } from "@testing-library/react-hooks"
import { useObservable } from "../src/useObservable"
import reactEnhancer from "../src/operators/react-enhancer"
import { SUSPENSE, distinctShareReplay } from "../src"
import { BehaviorObservable } from "../src/BehaviorObservable"
import { useObservable } from "../src/internal/useObservable"
import shareLatest from "../src/internal/share-latest"
import reactEnhancer from "../src/internal/react-enhancer"
import { SUSPENSE } from "../src"
import { BehaviorObservable } from "../src/internal/BehaviorObservable"
const wait = (ms: number) => new Promise(res => setTimeout(res, ms))
const enhancer = (source$: any) =>
reactEnhancer(concat(source$, NEVER).pipe(distinctShareReplay()), 20)
const enhancer = <T extends any>(source$: Observable<T>) =>
reactEnhancer<T>(source$.pipe(shareLatest(false)), 20)
describe("useObservable", () => {
it("works", async () => {
@ -63,7 +64,42 @@ describe("useObservable", () => {
expect(counter).toBe(2)
})
const observables: any = [NEVER, of(10), of(SUSPENSE), of(20)].map(enhancer)
it("doesn't trigger react updates when the observable emits the same value", () => {
const subject$ = new BehaviorSubject(1)
const src$ = subject$.pipe(enhancer) as BehaviorObservable<number>
const useLatestNumber = () => {
const result = useObservable(src$)
const nUpdatesRef = useRef(0)
useEffect(() => {
nUpdatesRef.current++
})
return { result, nUpdatesRef }
}
const { result } = renderHook(() => useLatestNumber())
expect(result.current.result).toBe(1)
expect(result.current.nUpdatesRef.current).toBe(1)
act(() => {
subject$.next(1)
subject$.next(1)
subject$.next(1)
subject$.next(1)
})
expect(result.current.nUpdatesRef.current).toBe(1)
act(() => {
subject$.next(20)
})
expect(result.current.result).toBe(20)
expect(result.current.nUpdatesRef.current).toBe(2)
})
const observables: any = [NEVER, of(10), of(SUSPENSE), of(20)].map((x: any) =>
enhancer(x),
)
const Result: React.FC<{ idx: number }> = ({ idx }) => {
const result = useObservable(observables[idx % observables.length])
return <div>Result {result}</div>