From 35f46c96b89cee425acf80e52ac407e732fe7ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Oliva?= Date: Tue, 30 Mar 2021 12:24:25 +0200 Subject: [PATCH] Rename mergeActiveKeys for combineKeys, add tests and jsdocs --- packages/utils/src/combineKeys.test.ts | 122 ++++++++++++++++++ .../src/{mergeByKey.ts => combineKeys.ts} | 19 ++- packages/utils/src/index.tsx | 1 + 3 files changed, 137 insertions(+), 5 deletions(-) create mode 100644 packages/utils/src/combineKeys.test.ts rename packages/utils/src/{mergeByKey.ts => combineKeys.ts} (72%) diff --git a/packages/utils/src/combineKeys.test.ts b/packages/utils/src/combineKeys.test.ts new file mode 100644 index 0000000..73b9433 --- /dev/null +++ b/packages/utils/src/combineKeys.test.ts @@ -0,0 +1,122 @@ +import { Observable } from "rxjs" +import { map, scan } from "rxjs/operators" +import { TestScheduler } from "rxjs/testing" +import { combineKeys } from "./" + +const scheduler = () => + new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected) + }) + +describe("combineKeys", () => { + it("emits a map with the latest value of the stream of each key", () => { + scheduler().run(({ expectObservable, cold }) => { + const keys = cold(" ab---cd---").pipe(scan((acc, v) => [...acc, v], [])) + const a = cold(" --1---2---") + const b = cold(" ---------") + const c = cold(" 1----") + const d = cold(" 9---") + const expectedStr = "--e--f(gh)" + + const innerStreams = { a, b, c, d } + + const result = combineKeys( + keys, + (v): Observable => innerStreams[v], + ).pipe(map((x) => Object.fromEntries(x.entries()))) + + expectObservable(result).toBe(expectedStr, { + e: { a: "1" }, + f: { a: "1", c: "1" }, + g: { a: "2", c: "1" }, + h: { a: "2", c: "1", d: "9" }, + }) + }) + }) + + it("removes the entry of a key when that key is removed from the source", () => { + scheduler().run(({ expectObservable, cold }) => { + const keys = cold(" a-b---c---").pipe(map((v) => [v])) + const a = cold(" ----------") + const b = cold(" -1-2-3-4") + const c = cold(" -2-3") + const expectedStr = "---e-fgh-i" + + const innerStreams = { a, b, c } + + const result = combineKeys( + keys, + (v): Observable => innerStreams[v], + ).pipe(map((x) => Object.fromEntries(x.entries()))) + + expectObservable(result).toBe(expectedStr, { + e: { b: "1" }, + f: { b: "2" }, + g: {}, + h: { c: "2" }, + i: { c: "3" }, + }) + }) + }) + + it("completes when the key stream completes", () => { + scheduler().run(({ expectObservable, cold }) => { + const keys = cold(" a-b---|").pipe(scan((acc, v) => [...acc, v], [])) + const a = cold(" -1-----") + const b = cold(" -1---") + const expectedStr = "-e-f--|" + + const innerStreams = { a, b } + + const result = combineKeys( + keys, + (v): Observable => innerStreams[v], + ).pipe(map((x) => Object.fromEntries(x.entries()))) + + expectObservable(result).toBe(expectedStr, { + e: { a: "1" }, + f: { a: "1", b: "1" }, + }) + }) + }) + + it("propagates errors from the inner streams", () => { + scheduler().run(({ expectObservable, cold }) => { + const keys = cold(" a-b---|").pipe(scan((acc, v) => [...acc, v], [])) + const a = cold(" -1-----") + const b = cold(" -#") + const expectedStr = "-e-#" + + const innerStreams = { a, b } + + const result = combineKeys( + keys, + (v): Observable => innerStreams[v], + ).pipe(map((x) => Object.fromEntries(x.entries()))) + + expectObservable(result).toBe(expectedStr, { + e: { a: "1" }, + }) + }) + }) + + it("propagates errors from the key stream", () => { + scheduler().run(({ expectObservable, cold }) => { + const keys = cold(" a-b#").pipe(scan((acc, v) => [...acc, v], [])) + const a = cold(" -1--") + const b = cold(" -1") + const expectedStr = "-e-#" + + const innerStreams = { a, b } + + const result = combineKeys( + keys, + (v): Observable => innerStreams[v], + ).pipe(map((x) => Object.fromEntries(x.entries()))) + + expectObservable(result).toBe(expectedStr, { + e: { a: "1" }, + }) + }) + }) +}) diff --git a/packages/utils/src/mergeByKey.ts b/packages/utils/src/combineKeys.ts similarity index 72% rename from packages/utils/src/mergeByKey.ts rename to packages/utils/src/combineKeys.ts index abf58c7..8ceca36 100644 --- a/packages/utils/src/mergeByKey.ts +++ b/packages/utils/src/combineKeys.ts @@ -1,7 +1,14 @@ import { Observable, Subscription } from "rxjs" -export const mergeActiveKeys = ( - activeKeys$: Observable>, +/** + * Creates a stream that combines the result of the streams from each key of the input stream. + * + * @param keys$ Stream of the list of keys to subscribe to. + * @param getInner$ Function that returns the stream for each key. + * @returns An stream with a map containing the latest value from the stream of each key. + */ +export const combineKeys = ( + keys$: Observable | Set>, getInner$: (key: K) => Observable, ): Observable> => new Observable((observer) => { @@ -12,16 +19,18 @@ export const mergeActiveKeys = ( if (!updatingSource) observer.next(new Map(currentValue)) } - const subscription = activeKeys$.subscribe( + const subscription = keys$.subscribe( (nextKeysArr) => { updatingSource = true const nextKeys = new Set(nextKeysArr) let changes = false innerSubscriptions.forEach((sub, key) => { if (!nextKeys.has(key)) { - changes = true sub.unsubscribe() - currentValue.delete(key) + if (currentValue.has(key)) { + changes = true + currentValue.delete(key) + } } else { nextKeys.delete(key) } diff --git a/packages/utils/src/index.tsx b/packages/utils/src/index.tsx index f2479e3..e3f1608 100644 --- a/packages/utils/src/index.tsx +++ b/packages/utils/src/index.tsx @@ -1,5 +1,6 @@ export { collectValues } from "./collectValues" export { collect } from "./collect" +export { combineKeys } from "./combineKeys" export { getGroupedObservable } from "./getGroupedObservable" export { createSignal } from "./createSignal" export { createKeyedSignal } from "./createKeyedSignal"