fix(core-bind): shared observable should not complete

This commit is contained in:
Josep M Sobrepere 2020-09-08 14:34:14 +02:00
parent 10c3141d59
commit debd996cec
8 changed files with 23 additions and 33 deletions

View File

@ -389,7 +389,7 @@ describe("connectFactoryObservable", () => {
})
describe("observable", () => {
it("it completes when the source observable completes, regardless of mounted componentes being subscribed to the source", async () => {
it("it does not complete when the source observable completes", async () => {
let diff = -1
const [useLatestNumber, getShared] = bind((_: number) => {
diff++
@ -404,7 +404,8 @@ describe("connectFactoryObservable", () => {
})
expect(latestValue1).toBe(4)
expect(nUpdates).toBe(4)
expect(sub1.closed).toBe(true)
expect(sub1.closed).toBe(false)
sub1.unsubscribe()
const { result, unmount } = renderHook(() => useLatestNumber(0))
expect(result.current).toBe(5)
@ -417,7 +418,8 @@ describe("connectFactoryObservable", () => {
})
expect(latestValue2).toBe(5)
expect(nUpdates).toBe(5)
expect(sub2.closed).toBe(true)
expect(sub2.closed).toBe(false)
sub2.unsubscribe()
let latestValue3: number = 0
const sub3 = getShared(0).subscribe((x) => {
@ -426,7 +428,8 @@ describe("connectFactoryObservable", () => {
})
expect(latestValue3).toBe(5)
expect(nUpdates).toBe(6)
expect(sub3.closed).toBe(true)
expect(sub3.closed).toBe(false)
sub3.unsubscribe()
unmount()
await wait(260)
@ -438,7 +441,8 @@ describe("connectFactoryObservable", () => {
})
expect(latestValue4).toBe(6)
expect(nUpdates).toBe(10)
expect(sub4.closed).toBe(true)
expect(sub4.closed).toBe(false)
sub4.unsubscribe()
})
})
})

View File

@ -4,7 +4,6 @@ import reactEnhancer from "../internal/react-enhancer"
import { BehaviorObservable } from "../internal/BehaviorObservable"
import { useObservable } from "../internal/useObservable"
import { SUSPENSE } from "../SUSPENSE"
import { takeUntilComplete } from "../internal/take-until-complete"
/**
* Accepts: A factory function that returns an Observable.
@ -46,14 +45,18 @@ export default function connectFactoryObservable<A extends [], O>(
return cachedVal
}
const sharedObservable$ = shareLatest(getObservable(...input), () => {
cache.delete(keys)
})
const sharedObservable$ = shareLatest(
getObservable(...input),
false,
() => {
cache.delete(keys)
},
)
const reactObservable$ = reactEnhancer(sharedObservable$)
const result: [Observable<O>, BehaviorObservable<O>] = [
takeUntilComplete(sharedObservable$),
sharedObservable$,
reactObservable$,
]

View File

@ -2,7 +2,6 @@ import { Observable } from "rxjs"
import shareLatest from "../internal/share-latest"
import reactEnhancer from "../internal/react-enhancer"
import { useObservable } from "../internal/useObservable"
import { takeUntilComplete } from "../internal/take-until-complete"
/**
* Accepts: An Observable.
@ -20,9 +19,8 @@ import { takeUntilComplete } from "../internal/take-until-complete"
* for the first value.
*/
export default function connectObservable<T>(observable: Observable<T>) {
const sharedObservable$ = shareLatest<T>(observable)
const sharedObservable$ = shareLatest<T>(observable, false)
const reactObservable$ = reactEnhancer(sharedObservable$)
const outputObservable$ = takeUntilComplete(sharedObservable$)
const useStaticObservable = () => useObservable(reactObservable$)
return [useStaticObservable, outputObservable$] as const
return [useStaticObservable, sharedObservable$] as const
}

View File

@ -1 +0,0 @@
export const COMPLETE = Symbol("COMPLETE")

View File

@ -3,7 +3,6 @@ import { take, filter, tap } from "rxjs/operators"
import { SUSPENSE } from "../SUSPENSE"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
import { COMPLETE } from "./COMPLETE"
const reactEnhancer = <T>(source$: Observable<T>): BehaviorObservable<T> => {
let refCount = 0
@ -15,11 +14,7 @@ const reactEnhancer = <T>(source$: Observable<T>): BehaviorObservable<T> => {
let latestValue = EMPTY_VALUE
const subscription = source$.subscribe(
(value) => {
if (
isActive &&
value !== (COMPLETE as any) &&
!Object.is(latestValue, value)
) {
if (isActive && !Object.is(latestValue, value)) {
subscriber.next((latestValue = value))
}
},

View File

@ -2,10 +2,10 @@ import { Observable, Subscription, Subject, noop } from "rxjs"
import { SUSPENSE } from "../SUSPENSE"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
import { COMPLETE } from "./COMPLETE"
const shareLatest = <T>(
source$: Observable<T>,
shouldComplete = true,
teardown = noop,
): BehaviorObservable<T> => {
let subject: Subject<T> | undefined
@ -32,7 +32,7 @@ const shareLatest = <T>(
},
() => {
subscription = undefined
subject!.next(COMPLETE as any)
shouldComplete && subject!.complete()
},
)
if (subscription.closed) subscription = undefined
@ -40,9 +40,6 @@ const shareLatest = <T>(
innerSub = subject.subscribe(subscriber)
if (currentValue !== EMPTY_VALUE) {
subscriber.next(currentValue)
if (subscription === undefined) {
subscriber.next(COMPLETE as any)
}
}
}

View File

@ -1,5 +0,0 @@
import { takeWhile } from "rxjs/operators"
import { COMPLETE } from "./COMPLETE"
const isActive = <T>(x: T) => x !== (COMPLETE as any)
export const takeUntilComplete = takeWhile(isActive)

View File

@ -1,6 +1,5 @@
import { Observable, MonoTypeOperatorFunction } from "rxjs"
import internalShareLatest from "./internal/share-latest"
import { takeUntilComplete } from "./internal/take-until-complete"
/**
* A RxJS pipeable operator which shares and replays the latest emitted value.
@ -18,4 +17,4 @@ import { takeUntilComplete } from "./internal/take-until-complete"
*/
export const shareLatest = <T>(): MonoTypeOperatorFunction<T> => (
source$: Observable<T>,
) => takeUntilComplete(internalShareLatest(source$))
) => internalShareLatest(source$)