From c6b4010cd01f7d5bae14b07b5171eca9f7b67b66 Mon Sep 17 00:00:00 2001 From: Josep M Sobrepere Date: Thu, 6 May 2021 12:47:24 +0200 Subject: [PATCH] fix(shareLatest): properly closing sync observables --- packages/core/src/internal/share-latest.ts | 63 +++++++++++----------- packages/core/src/share-latest.test.ts | 32 +++++++---- 2 files changed, 55 insertions(+), 40 deletions(-) diff --git a/packages/core/src/internal/share-latest.ts b/packages/core/src/internal/share-latest.ts index 532382e..8838550 100644 --- a/packages/core/src/internal/share-latest.ts +++ b/packages/core/src/internal/share-latest.ts @@ -1,4 +1,4 @@ -import { Observable, Subscription, Subject, noop } from "rxjs" +import { Observable, Subscription, Subject, noop, Subscriber } from "rxjs" import { BehaviorObservable } from "./BehaviorObservable" import { EMPTY_VALUE } from "./empty-value" import { SUSPENSE } from "../SUSPENSE" @@ -10,7 +10,7 @@ const shareLatest = ( teardown = noop, ): BehaviorObservable => { let subject: Subject | null - let subscription: Subscription | null + let subscription: Subscriber | null let refCount = 0 let currentValue: T = EMPTY_VALUE let promise: Promise | null @@ -29,36 +29,8 @@ const shareLatest = ( refCount++ let innerSub: Subscription - if (!subject) { - subject = new Subject() - innerSub = subject.subscribe(subscriber) - subscription = null - subscription = source$.subscribe( - (value) => { - subject!.next((currentValue = value)) - }, - (err) => { - const _subject = subject - subscription = null - subject = null - _subject!.error(err) - }, - () => { - subscription = null - emitIfEmpty() - subject!.complete() - }, - ) - if (subscription.closed) subscription = null - emitIfEmpty() - } else { - innerSub = subject.subscribe(subscriber) - if (currentValue !== EMPTY_VALUE) { - subscriber.next(currentValue) - } - } - return () => { + subscriber.add(() => { refCount-- innerSub.unsubscribe() if (refCount === 0) { @@ -71,6 +43,35 @@ const shareLatest = ( subscription = null promise = null } + }) + + if (!subject) { + subject = new Subject() + innerSub = subject.subscribe(subscriber) + subscription = null + subscription = new Subscriber( + (value: T) => { + subject!.next((currentValue = value)) + }, + (err: any) => { + const _subject = subject + subscription = null + subject = null + _subject!.error(err) + }, + () => { + subscription = null + emitIfEmpty() + subject!.complete() + }, + ) + source$.subscribe(subscription) + emitIfEmpty() + } else { + innerSub = subject.subscribe(subscriber) + if (currentValue !== EMPTY_VALUE) { + subscriber.next(currentValue) + } } }) as BehaviorObservable diff --git a/packages/core/src/share-latest.test.ts b/packages/core/src/share-latest.test.ts index 256e454..36d2ea0 100644 --- a/packages/core/src/share-latest.test.ts +++ b/packages/core/src/share-latest.test.ts @@ -1,7 +1,7 @@ import { TestScheduler } from "rxjs/testing" -import { from, merge, defer } from "rxjs" +import { from, merge, defer, Observable, noop } from "rxjs" import { shareLatest } from "./" -import { withLatestFrom, startWith, map } from "rxjs/operators" +import { withLatestFrom, startWith, map, take } from "rxjs/operators" const scheduler = () => new TestScheduler((actual, expected) => { @@ -75,15 +75,29 @@ describe("shareLatest", () => { // prettier-ignore it("should not skip values on a sync source", () => { - scheduler().run(({ expectObservable }) => { - const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)") - const sub1 = '^'; - const expected1 = " (abcd|)" + scheduler().run(({ expectObservable }) => { + const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)") + const sub1 = '^'; + const expected1 = " (abcd|)" - const shared = shareLatest()(source); + const shared = shareLatest()(source); - expectObservable(shared, sub1).toBe(expected1); + expectObservable(shared, sub1).toBe(expected1); + }) + }) + + it("should stop listening to a synchronous observable when unsubscribed", () => { + let sideEffects = 0 + const synchronousObservable = new Observable((subscriber) => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects++ + subscriber.next(i) + } + }) + synchronousObservable.pipe(shareLatest(), take(3)).subscribe(noop) + expect(sideEffects).toBe(3) }) }) - }) })