diff --git a/packages/core/src/bind/connectFactoryObservable.test.tsx b/packages/core/src/bind/connectFactoryObservable.test.tsx index 5ca989d..c6e5db7 100644 --- a/packages/core/src/bind/connectFactoryObservable.test.tsx +++ b/packages/core/src/bind/connectFactoryObservable.test.tsx @@ -389,7 +389,7 @@ describe("connectFactoryObservable", () => { }) describe("observable", () => { - it("it completes when the source observable completes, regardless of mounted componentes being subscribed to the source", async () => { + it("it does not complete when the source observable completes", async () => { let diff = -1 const [useLatestNumber, getShared] = bind((_: number) => { diff++ @@ -404,7 +404,8 @@ describe("connectFactoryObservable", () => { }) expect(latestValue1).toBe(4) expect(nUpdates).toBe(4) - expect(sub1.closed).toBe(true) + expect(sub1.closed).toBe(false) + sub1.unsubscribe() const { result, unmount } = renderHook(() => useLatestNumber(0)) expect(result.current).toBe(5) @@ -417,7 +418,8 @@ describe("connectFactoryObservable", () => { }) expect(latestValue2).toBe(5) expect(nUpdates).toBe(5) - expect(sub2.closed).toBe(true) + expect(sub2.closed).toBe(false) + sub2.unsubscribe() let latestValue3: number = 0 const sub3 = getShared(0).subscribe((x) => { @@ -426,7 +428,8 @@ describe("connectFactoryObservable", () => { }) expect(latestValue3).toBe(5) expect(nUpdates).toBe(6) - expect(sub3.closed).toBe(true) + expect(sub3.closed).toBe(false) + sub3.unsubscribe() unmount() await wait(260) @@ -438,7 +441,8 @@ describe("connectFactoryObservable", () => { }) expect(latestValue4).toBe(6) expect(nUpdates).toBe(10) - expect(sub4.closed).toBe(true) + expect(sub4.closed).toBe(false) + sub4.unsubscribe() }) }) }) diff --git a/packages/core/src/bind/connectFactoryObservable.ts b/packages/core/src/bind/connectFactoryObservable.ts index c3facf6..f2f5287 100644 --- a/packages/core/src/bind/connectFactoryObservable.ts +++ b/packages/core/src/bind/connectFactoryObservable.ts @@ -4,7 +4,6 @@ import reactEnhancer from "../internal/react-enhancer" import { BehaviorObservable } from "../internal/BehaviorObservable" import { useObservable } from "../internal/useObservable" import { SUSPENSE } from "../SUSPENSE" -import { takeUntilComplete } from "../internal/take-until-complete" /** * Accepts: A factory function that returns an Observable. @@ -46,14 +45,18 @@ export default function connectFactoryObservable( return cachedVal } - const sharedObservable$ = shareLatest(getObservable(...input), () => { - cache.delete(keys) - }) + const sharedObservable$ = shareLatest( + getObservable(...input), + false, + () => { + cache.delete(keys) + }, + ) const reactObservable$ = reactEnhancer(sharedObservable$) const result: [Observable, BehaviorObservable] = [ - takeUntilComplete(sharedObservable$), + sharedObservable$, reactObservable$, ] diff --git a/packages/core/src/bind/connectObservable.ts b/packages/core/src/bind/connectObservable.ts index 9396e6e..a2da331 100644 --- a/packages/core/src/bind/connectObservable.ts +++ b/packages/core/src/bind/connectObservable.ts @@ -2,7 +2,6 @@ import { Observable } from "rxjs" import shareLatest from "../internal/share-latest" import reactEnhancer from "../internal/react-enhancer" import { useObservable } from "../internal/useObservable" -import { takeUntilComplete } from "../internal/take-until-complete" /** * Accepts: An Observable. @@ -20,9 +19,8 @@ import { takeUntilComplete } from "../internal/take-until-complete" * for the first value. */ export default function connectObservable(observable: Observable) { - const sharedObservable$ = shareLatest(observable) + const sharedObservable$ = shareLatest(observable, false) const reactObservable$ = reactEnhancer(sharedObservable$) - const outputObservable$ = takeUntilComplete(sharedObservable$) const useStaticObservable = () => useObservable(reactObservable$) - return [useStaticObservable, outputObservable$] as const + return [useStaticObservable, sharedObservable$] as const } diff --git a/packages/core/src/internal/COMPLETE.ts b/packages/core/src/internal/COMPLETE.ts deleted file mode 100644 index 1b0f1ea..0000000 --- a/packages/core/src/internal/COMPLETE.ts +++ /dev/null @@ -1 +0,0 @@ -export const COMPLETE = Symbol("COMPLETE") diff --git a/packages/core/src/internal/react-enhancer.ts b/packages/core/src/internal/react-enhancer.ts index afbd6c8..ecba076 100644 --- a/packages/core/src/internal/react-enhancer.ts +++ b/packages/core/src/internal/react-enhancer.ts @@ -3,7 +3,6 @@ import { take, filter, tap } from "rxjs/operators" import { SUSPENSE } from "../SUSPENSE" import { BehaviorObservable } from "./BehaviorObservable" import { EMPTY_VALUE } from "./empty-value" -import { COMPLETE } from "./COMPLETE" const reactEnhancer = (source$: Observable): BehaviorObservable => { let refCount = 0 @@ -15,11 +14,7 @@ const reactEnhancer = (source$: Observable): BehaviorObservable => { let latestValue = EMPTY_VALUE const subscription = source$.subscribe( (value) => { - if ( - isActive && - value !== (COMPLETE as any) && - !Object.is(latestValue, value) - ) { + if (isActive && !Object.is(latestValue, value)) { subscriber.next((latestValue = value)) } }, diff --git a/packages/core/src/internal/share-latest.ts b/packages/core/src/internal/share-latest.ts index e7dde4f..bf5e9db 100644 --- a/packages/core/src/internal/share-latest.ts +++ b/packages/core/src/internal/share-latest.ts @@ -2,10 +2,10 @@ import { Observable, Subscription, Subject, noop } from "rxjs" import { SUSPENSE } from "../SUSPENSE" import { BehaviorObservable } from "./BehaviorObservable" import { EMPTY_VALUE } from "./empty-value" -import { COMPLETE } from "./COMPLETE" const shareLatest = ( source$: Observable, + shouldComplete = true, teardown = noop, ): BehaviorObservable => { let subject: Subject | undefined @@ -32,7 +32,7 @@ const shareLatest = ( }, () => { subscription = undefined - subject!.next(COMPLETE as any) + shouldComplete && subject!.complete() }, ) if (subscription.closed) subscription = undefined @@ -40,9 +40,6 @@ const shareLatest = ( innerSub = subject.subscribe(subscriber) if (currentValue !== EMPTY_VALUE) { subscriber.next(currentValue) - if (subscription === undefined) { - subscriber.next(COMPLETE as any) - } } } diff --git a/packages/core/src/internal/take-until-complete.ts b/packages/core/src/internal/take-until-complete.ts deleted file mode 100644 index d1f48dd..0000000 --- a/packages/core/src/internal/take-until-complete.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { takeWhile } from "rxjs/operators" -import { COMPLETE } from "./COMPLETE" - -const isActive = (x: T) => x !== (COMPLETE as any) -export const takeUntilComplete = takeWhile(isActive) diff --git a/packages/core/src/shareLatest.ts b/packages/core/src/shareLatest.ts index c74dd41..1d45f1e 100644 --- a/packages/core/src/shareLatest.ts +++ b/packages/core/src/shareLatest.ts @@ -1,6 +1,5 @@ import { Observable, MonoTypeOperatorFunction } from "rxjs" import internalShareLatest from "./internal/share-latest" -import { takeUntilComplete } from "./internal/take-until-complete" /** * A RxJS pipeable operator which shares and replays the latest emitted value. @@ -18,4 +17,4 @@ import { takeUntilComplete } from "./internal/take-until-complete" */ export const shareLatest = (): MonoTypeOperatorFunction => ( source$: Observable, -) => takeUntilComplete(internalShareLatest(source$)) +) => internalShareLatest(source$)