diff --git a/packages/utils/src/index.tsx b/packages/utils/src/index.tsx index e3f1608..00b49a2 100644 --- a/packages/utils/src/index.tsx +++ b/packages/utils/src/index.tsx @@ -5,6 +5,7 @@ export { getGroupedObservable } from "./getGroupedObservable" export { createSignal } from "./createSignal" export { createKeyedSignal } from "./createKeyedSignal" export { mergeWithKey } from "./mergeWithKey" +export { partitionByKey } from "./partitionByKey" export { split } from "./split" export { suspend } from "./suspend" export { suspended } from "./suspended" diff --git a/packages/utils/src/partitionByKey.test.ts b/packages/utils/src/partitionByKey.test.ts new file mode 100644 index 0000000..427c1ea --- /dev/null +++ b/packages/utils/src/partitionByKey.test.ts @@ -0,0 +1,116 @@ +import { NEVER, Subject } from "rxjs" +import { switchMap, take } from "rxjs/operators" +import { TestScheduler } from "rxjs/testing" +import { partitionByKey } from "./" + +const scheduler = () => + new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected) + }) + +describe("partitionByKey", () => { + describe("activeKeys$", () => { + it("emits a list with all the active keys", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab---cd---") + const expectedStr = "efg---hi---" + const [result] = partitionByKey( + source, + (v) => v, + () => NEVER, + ) + + expectObservable(result).toBe(expectedStr, { + e: [], + f: ["a"], + g: ["a", "b"], + h: ["a", "b", "c"], + i: ["a", "b", "c", "d"], + }) + }) + }) + + it("removes a key from the list when its inner stream completes", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab---c--") + const a = cold(" --1---2-") + const b = cold(" ---|") + const c = cold(" 1-|") + const expectedStr = "efg--hi-j" + const innerStreams = { a, b, c } + const [result] = partitionByKey( + source, + (v) => v, + (v$) => + v$.pipe( + take(1), + switchMap((v) => innerStreams[v]), + ), + ) + + expectObservable(result).toBe(expectedStr, { + e: [], + f: ["a"], + g: ["a", "b"], + h: ["a"], + i: ["a", "c"], + j: ["a"], + }) + }) + }) + }) + + describe("getInstance$", () => { + it("returns the values for the selected key", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab---c--") + const a = cold(" --1---2-") + const b = cold(" ---|") + const c = cold(" 1-|") + const expectA = " ---1---2--" + const expectB = " -----|" + const expectC = " ------1-|" + + const innerStreams = { a, b, c } + const [, getInstance$] = partitionByKey( + source, + (v) => v, + (v$) => + v$.pipe( + take(1), + switchMap((v) => innerStreams[v]), + ), + ) + + expectObservable(getInstance$("a")).toBe(expectA) + expectObservable(getInstance$("b")).toBe(expectB) + expectObservable(getInstance$("c")).toBe(expectC) + }) + }) + + it("replays the latest value for each key", () => { + const source$ = new Subject() + const inner$ = new Subject() + const [, getInstance$] = partitionByKey( + source$, + (v) => v, + () => inner$, + ) + + const next = jest.fn() + getInstance$("a").subscribe(next) + + source$.next("a") + expect(next).not.toHaveBeenCalled() + + inner$.next(1) + expect(next).toHaveBeenCalledTimes(1) + expect(next).toHaveBeenCalledWith(1) + + const lateNext = jest.fn() + getInstance$("a").subscribe(lateNext) + expect(lateNext).toHaveBeenCalledTimes(1) + expect(lateNext).toHaveBeenCalledWith(1) + }) + }) +}) diff --git a/packages/utils/src/partitionByKey.ts b/packages/utils/src/partitionByKey.ts index 8ee5679..5b4157d 100644 --- a/packages/utils/src/partitionByKey.ts +++ b/packages/utils/src/partitionByKey.ts @@ -2,6 +2,17 @@ import { GroupedObservable, Observable } from "rxjs" import { map } from "rxjs/operators" import { collect, getGroupedObservable, split } from "./" +/** + * Groups the elements from the source stream by using `keySelector`, returning + * a stream of the active keys, and a function to get the stream of a specific group + * + * @param stream Input stream + * @param keySelector Function that specifies the key for each element in `stream` + * @param streamSelector Function to apply to each resulting group + * @returns [1, 2] + * 1. A stream with the list of active keys + * 2. A function that accepts a key and returns the stream for the group of that key. + */ export function partitionByKey( stream: Observable, keySelector: (value: T) => K,