diff --git a/packages/utils/src/partitionByKey.test.ts b/packages/utils/src/partitionByKey.test.ts index 996b729..dba0197 100644 --- a/packages/utils/src/partitionByKey.test.ts +++ b/packages/utils/src/partitionByKey.test.ts @@ -1,4 +1,4 @@ -import { concat, NEVER, Observable, of, Subject } from "rxjs" +import { concat, from, NEVER, Observable, of, Subject } from "rxjs" import { catchError, switchMap, take } from "rxjs/operators" import { TestScheduler } from "rxjs/testing" import { partitionByKey } from "./" @@ -9,6 +9,193 @@ const scheduler = () => }) describe("partitionByKey", () => { + describe("behaviour", () => { + it("groups observables by using the key function", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-12-3456-") + const expectOdd = " -1--3-5--" + const expectEven = " --2--4-6-" + + const [getInstance$] = partitionByKey( + source, + (v) => Number(v) % 2, + (v$) => v$, + ) + + expectObservable(getInstance$(0)).toBe(expectEven) + expectObservable(getInstance$(1)).toBe(expectOdd) + }) + }) + + it("unsubscribes from all streams when refcount reaches 0", () => { + let innerSubs = 0 + const inner = new Observable(() => { + innerSubs++ + return () => { + innerSubs-- + } + }) + + const sourceSubject = new Subject() + let sourceSubs = 0 + const source = new Observable((obs) => { + sourceSubs++ + sourceSubject.subscribe(obs) + return () => { + sourceSubs-- + } + }) + + const [getObs] = partitionByKey( + source, + (v) => v, + () => inner, + ) + const observable = getObs(1) + + expect(sourceSubs).toBe(0) + expect(innerSubs).toBe(0) + + const sub1 = observable.subscribe() + + expect(sourceSubs).toBe(1) + expect(innerSubs).toBe(0) + + sourceSubject.next(1) + + expect(sourceSubs).toBe(1) + expect(innerSubs).toBe(1) + + const sub2 = observable.subscribe() + + expect(sourceSubs).toBe(1) + expect(innerSubs).toBe(1) + + sub1.unsubscribe() + + expect(sourceSubs).toBe(1) + expect(innerSubs).toBe(1) + + sub2.unsubscribe() + + expect(sourceSubs).toBe(0) + expect(innerSubs).toBe(0) + }) + + it("emits a complete on the inner observable when the source completes", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab-a-|") + const expectA = " -a--a-(c|)" + const expectB = " --b---(c|)" + + const [getInstance$] = partitionByKey( + source, + (v) => v, + (v$) => concat(v$, ["c"]), + ) + + expectObservable(getInstance$("a")).toBe(expectA) + expectObservable(getInstance$("b")).toBe(expectB) + }) + }) + + it("emits the error on the inner observable when the source errors", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab-a-#") + const expectA = " -a--a-(e|)" + const expectB = " --b---(e|)" + + const [getInstance$] = partitionByKey( + source, + (v) => v, + (v$) => v$.pipe(catchError(() => of("e"))), + ) + + expectObservable(getInstance$("a")).toBe(expectA) + expectObservable(getInstance$("b")).toBe(expectB) + }) + }) + + it("handles an empty Observable", () => { + scheduler().run(({ expectSubscriptions, expectObservable, cold }) => { + const e1 = cold(" |") + const e1subs = " (^!)" + const expectObs = "|" + const expectKey = "(x|)" + + const [getObs, keys$] = partitionByKey( + e1, + (v) => v, + (v$) => v$, + ) + + expectObservable(getObs("")).toBe(expectObs) + expectSubscriptions(e1.subscriptions).toBe(e1subs) + expectObservable(keys$).toBe(expectKey, { x: [] }) + }) + }) + + it("handles a never Observable", () => { + scheduler().run(({ expectSubscriptions, expectObservable, cold }) => { + const e1 = cold(" --") + const e1subs = " ^-" + const expectObs = "--" + const expectKey = "x-" + + const [getObs, keys$] = partitionByKey( + e1, + (v) => v, + (v$) => v$, + ) + + expectObservable(getObs("")).toBe(expectObs) + expectSubscriptions(e1.subscriptions).toBe(e1subs) + expectObservable(keys$).toBe(expectKey, { x: [] }) + }) + }) + + it("handles a just-throw Observable", () => { + scheduler().run(({ expectSubscriptions, expectObservable, cold }) => { + const e1 = cold(" #") + const e1subs = " (^!)" + const expectObs = "#" + const expectKey = "(x#)" + + const [getObs, keys$] = partitionByKey( + e1, + (v) => v, + (v$) => v$, + ) + + expectObservable(getObs("")).toBe(expectObs) + expectSubscriptions(e1.subscriptions).toBe(e1subs) + expectObservable(keys$).toBe(expectKey, { x: [] }) + }) + }) + + it("handles synchronous values", () => { + scheduler().run(({ expectObservable }) => { + const e1 = from(["1", "2", "3", "4", "5"]) + const expectOdd = " (135|)" + const expectEven = "(24|)" + const expectKeys = "(wxyz|)" + const [getObs, keys$] = partitionByKey( + e1, + (v) => Number(v) % 2, + (v$) => v$, + ) + expectObservable(keys$).toBe(expectKeys, { + w: [1], + x: [1, 0], + y: [0], + z: [], + }) + expectObservable(getObs(0)).toBe(expectEven) + expectObservable(getObs(1)).toBe(expectOdd) + }) + }) + }) + describe("activeKeys$", () => { it("emits a list with all the active keys", () => { scheduler().run(({ expectObservable, cold }) => { @@ -30,23 +217,6 @@ describe("partitionByKey", () => { }) }) - it("emits all the synchronous groups in a single emission", () => { - scheduler().run(({ expectObservable, cold }) => { - const source = concat(of("a", "b"), cold("--c--")) - const expectedStr = " g-h--" - const [, result] = partitionByKey( - source, - (v) => v, - () => NEVER, - ) - - expectObservable(result).toBe(expectedStr, { - g: ["a", "b"], - h: ["a", "b", "c"], - }) - }) - }) - it("removes a key from the list when its inner stream completes", () => { scheduler().run(({ expectObservable, cold }) => { const source = cold("-ab---c--") @@ -130,21 +300,71 @@ describe("partitionByKey", () => { }) }) - it("errors when the source emits an error", () => { + it("errors when the source emits an error and no group is active", () => { scheduler().run(({ expectObservable, cold }) => { const source = cold("-ab--#") - const a = cold(" --1---2") + const a = cold(" --1|") + const b = cold(" -|") + const expectedStr = "efghi#" + const innerStreams: Record> = { a, b } + const [, result] = partitionByKey( + source, + (v) => v, + (v$) => + v$.pipe( + take(1), + switchMap((v) => innerStreams[v]), + ), + ) + + expectObservable(result).toBe(expectedStr, { + e: [], + f: ["a"], + g: ["a", "b"], + h: ["a"], + i: [], + }) + }) + }) + + it("doesn't error when the source errors and its inner streams stop the error", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab--#") + const a = cold(" --1--2--3|") + const b = cold(" ----|") + const expectedStr = "efg---h---(i|)" + const innerStreams: Record> = { a, b } + const [, result] = partitionByKey( + source, + (v) => v, + (v$) => + v$.pipe( + take(1), + switchMap((v) => innerStreams[v]), + ), + ) + + expectObservable(result).toBe(expectedStr, { + e: [], + f: ["a"], + g: ["a", "b"], + h: ["a"], + i: [], + }) + }) + }) + + it("errors when one of its inner stream emits an error", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab-----") + const a = cold(" --1-#") const b = cold(" ------") const expectedStr = "efg--#" const innerStreams: Record> = { a, b } const [, result] = partitionByKey( source, (v) => v, - (v$) => - v$.pipe( - take(1), - switchMap((v) => innerStreams[v]), - ), + (_, v) => innerStreams[v], ) expectObservable(result).toBe(expectedStr, { @@ -154,32 +374,6 @@ describe("partitionByKey", () => { }) }) }) - - it("removes a key when its inner stream emits an error", () => { - scheduler().run(({ expectObservable, cold }) => { - const source = cold("-ab-----") - const a = cold(" --1-#") - const b = cold(" ------") - const expectedStr = "efg--h" - const innerStreams: Record> = { a, b } - const [, result] = partitionByKey( - source, - (v) => v, - (v$) => - v$.pipe( - take(1), - switchMap((v) => innerStreams[v]), - ), - ) - - expectObservable(result).toBe(expectedStr, { - e: [], - f: ["a"], - g: ["a", "b"], - h: ["b"], - }) - }) - }) }) describe("getInstance$", () => { @@ -235,71 +429,17 @@ describe("partitionByKey", () => { expect(lateNext).toHaveBeenCalledWith(1) }) - it("unsubscribes from all streams when refcount reaches 0", () => { - let innerSubs = 0 - const inner = new Observable(() => { - innerSubs++ - return () => { - innerSubs-- - } - }) - - const sourceSubject = new Subject() - let sourceSubs = 0 - const source = new Observable((obs) => { - sourceSubs++ - sourceSubject.subscribe(obs) - return () => { - sourceSubs-- - } - }) - - const [getObs] = partitionByKey( - source, - (v) => v, - () => inner, - ) - const observable = getObs(1) - - expect(sourceSubs).toBe(0) - expect(innerSubs).toBe(0) - - const sub1 = observable.subscribe() - - expect(sourceSubs).toBe(1) - expect(innerSubs).toBe(0) - - sourceSubject.next(1) - - expect(sourceSubs).toBe(1) - expect(innerSubs).toBe(1) - - const sub2 = observable.subscribe() - - expect(sourceSubs).toBe(1) - expect(innerSubs).toBe(1) - - sub1.unsubscribe() - - expect(sourceSubs).toBe(1) - expect(innerSubs).toBe(1) - - sub2.unsubscribe() - - expect(sourceSubs).toBe(0) - expect(innerSubs).toBe(0) - }) - - it("emits a complete on the inner observable when the source completes", () => { + it("lets the projection function handle completions", () => { scheduler().run(({ expectObservable, cold }) => { const source = cold("-ab-a-|") - const expectA = " -a--a-(c|)" - const expectB = " --b---(c|)" + const concatenated = cold("123|") + const expectA = " -a--a-123|" + const expectB = " --b---123|" const [getInstance$] = partitionByKey( source, (v) => v, - (v$) => concat(v$, ["c"]), + (v$) => concat(v$, concatenated), ) expectObservable(getInstance$("a")).toBe(expectA) @@ -307,12 +447,11 @@ describe("partitionByKey", () => { }) }) - // Do we want this behaviour? - it("emits an error when the source errors", () => { + it("lets the projection function catch source errors", () => { scheduler().run(({ expectObservable, cold }) => { const source = cold("-ab-a-#") - const expectA = " -a--a-#" - const expectB = " --b---#" + const expectA = " -a--a-(e|)" + const expectB = " --b---(e|)" const [getInstance$] = partitionByKey( source, diff --git a/packages/utils/src/partitionByKey.ts b/packages/utils/src/partitionByKey.ts index dc6e518..b1f36f2 100644 --- a/packages/utils/src/partitionByKey.ts +++ b/packages/utils/src/partitionByKey.ts @@ -29,7 +29,7 @@ export function partitionByKey( (subscriber) => { const groups: Map> = new Map() - let warmup = true + let emitted = false let sourceCompleted = false const sub = stream.subscribe( (x) => { @@ -43,7 +43,7 @@ export function partitionByKey( const res = streamSelector(subject, key).pipe( shareLatest(), ) as GroupedObservable - res.key = key + ;(res as any).key = key const innerGroup: InnerGroup = { source: subject, @@ -52,35 +52,42 @@ export function partitionByKey( } groups.set(key, innerGroup) - const onFinish = () => { - groups.delete(key) - subscriber.next(mapGroups(groups)) + innerGroup.subscription = res.subscribe( + noop, + (e) => subscriber.error(e), + () => { + groups.delete(key) + subscriber.next(mapGroups(groups)) - if (groups.size === 0 && sourceCompleted) { - subscriber.complete() - } - } - innerGroup.subscription = res.subscribe(noop, onFinish, onFinish) + if (groups.size === 0 && sourceCompleted) { + subscriber.complete() + } + }, + ) subject.next(x) - if (!warmup) { - subscriber.next(mapGroups(groups)) - } + subscriber.next(mapGroups(groups)) + emitted = true }, (e) => { - subscriber.error(e) + sourceCompleted = true + if (groups.size) { + groups.forEach((g) => g.source.error(e)) + } else { + subscriber.error(e) + } }, () => { sourceCompleted = true - if (groups.size === 0) { + if (groups.size) { + groups.forEach((g) => g.source.complete()) + } else { subscriber.complete() } - groups.forEach((g) => g.source.complete()) }, ) - warmup = false - subscriber.next(mapGroups(groups)) + if (!emitted) subscriber.next(mapGroups(groups)) return () => { sub.unsubscribe()