From 38fd3c719a08ebdf7c1cc45bf1fbef18d0ad6e52 Mon Sep 17 00:00:00 2001 From: Josep M Sobrepere Date: Mon, 14 Sep 2020 15:36:24 +0200 Subject: [PATCH] fix(core): factory bind multiple (un/re)subscriptions --- .../bind/connectFactoryObservable.test.tsx | 77 ++++++++++++++++++- .../core/src/bind/connectFactoryObservable.ts | 12 ++- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/packages/core/src/bind/connectFactoryObservable.test.tsx b/packages/core/src/bind/connectFactoryObservable.test.tsx index c6e5db7..70ba7b5 100644 --- a/packages/core/src/bind/connectFactoryObservable.test.tsx +++ b/packages/core/src/bind/connectFactoryObservable.test.tsx @@ -9,7 +9,7 @@ import { Subject, } from "rxjs" import { renderHook, act as actHook } from "@testing-library/react-hooks" -import { switchMap, delay } from "rxjs/operators" +import { switchMap, delay, take } from "rxjs/operators" import { FC, Suspense, useState } from "react" import React from "react" import { @@ -444,5 +444,80 @@ describe("connectFactoryObservable", () => { expect(sub4.closed).toBe(false) sub4.unsubscribe() }) + + describe("re-subscriptions on disposed observables", () => { + it("registers itself when no other observable has been registered for that key", () => { + const key = 0 + let sideEffects = 0 + + const [, getShared] = bind((_: number) => + defer(() => { + return of(++sideEffects) + }), + ) + + const stream = getShared(key) + + let val + stream.pipe(take(1)).subscribe((x) => { + val = x + }) + expect(val).toBe(1) + + stream.pipe(take(1)).subscribe((x) => { + val = x + }) + expect(val).toBe(2) + + const subscription = stream.subscribe((x) => { + val = x + }) + expect(val).toBe(3) + + getShared(key) + .pipe(take(1)) + .subscribe((x) => { + val = x + }) + expect(val).toBe(3) + subscription.unsubscribe() + }) + + it("subscribes to the currently registered observable if a new observalbe has been registered for that key", () => { + const key = 0 + let sideEffects = 0 + + const [, getShared] = bind((_: number) => + defer(() => { + return of(++sideEffects) + }), + ) + + const stream = getShared(key) + + let val + stream.pipe(take(1)).subscribe((x) => { + val = x + }) + expect(val).toBe(1) + + const subscription = getShared(key).subscribe((x) => { + val = x + }) + expect(val).toBe(2) + + stream.pipe(take(1)).subscribe((x) => { + val = x + }) + expect(val).toBe(2) + + stream.pipe(take(1)).subscribe((x) => { + val = x + }) + expect(val).toBe(2) + + subscription.unsubscribe() + }) + }) }) }) diff --git a/packages/core/src/bind/connectFactoryObservable.ts b/packages/core/src/bind/connectFactoryObservable.ts index f2f5287..d5c03fb 100644 --- a/packages/core/src/bind/connectFactoryObservable.ts +++ b/packages/core/src/bind/connectFactoryObservable.ts @@ -1,4 +1,4 @@ -import { Observable } from "rxjs" +import { Observable, defer } from "rxjs" import shareLatest from "../internal/share-latest" import reactEnhancer from "../internal/react-enhancer" import { BehaviorObservable } from "../internal/BehaviorObservable" @@ -55,8 +55,16 @@ export default function connectFactoryObservable( const reactObservable$ = reactEnhancer(sharedObservable$) + const publicShared$: Observable = defer(() => { + const inCache = cache.get(keys) + if (inCache) { + return inCache[0] === publicShared$ ? sharedObservable$ : inCache[0] + } + return getSharedObservables$(input)[0] + }) + const result: [Observable, BehaviorObservable] = [ - sharedObservable$, + publicShared$, reactObservable$, ]