From 31f7029c36aab43a6f7c76fb7c5e086fb818bcbe Mon Sep 17 00:00:00 2001 From: Josep M Sobrepere Date: Tue, 25 Aug 2020 19:10:39 +0200 Subject: [PATCH] feat(utils): add collect operator --- packages/utils/README.md | 11 ++++ packages/utils/src/collect.test.ts | 95 ++++++++++++++++++++++++++++++ packages/utils/src/collect.ts | 66 +++++++++++++++++++++ packages/utils/src/index.tsx | 1 + packages/utils/tsconfig.json | 7 +-- 5 files changed, 175 insertions(+), 5 deletions(-) create mode 100644 packages/utils/src/collect.test.ts create mode 100644 packages/utils/src/collect.ts diff --git a/packages/utils/README.md b/packages/utils/README.md index 4d5fc0c..1de1281 100644 --- a/packages/utils/README.md +++ b/packages/utils/README.md @@ -101,6 +101,17 @@ votesByKey$.next({ key: "bar" }) // > counters$: ``` +### collect + +A pipeable operator that collects all the GroupedObservables emitted by +the source and emits a Map with the active inner observables. + +Arguments: + +- `filter?`: A function that receives the inner Observable and returns an + Observable of boolean values, which indicates whether the inner observable + should be collected. + ### mergeWithKey Emits the values from all the streams of the provided object, in a result diff --git a/packages/utils/src/collect.test.ts b/packages/utils/src/collect.test.ts new file mode 100644 index 0000000..0798bae --- /dev/null +++ b/packages/utils/src/collect.test.ts @@ -0,0 +1,95 @@ +import { + map, + shareReplay, + skip, + startWith, + ignoreElements, +} from "rxjs/operators" +import { TestScheduler } from "rxjs/testing" +import { collect } from "./" +import { GroupedObservable } from "rxjs" + +const scheduler = () => + new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected) + }) + +describe("collect", () => { + it("emits a map with the latest grouped stream", () => { + scheduler().run(({ expectObservable, cold }) => { + const toGrouped = (source: string, key: string) => { + const result = cold(source).pipe(shareReplay(1)) as GroupedObservable< + string, + string + > + result.key = key + return result + } + + const a = toGrouped("-------| ", "a") + const b = toGrouped(" ---| ", "b") + const c = toGrouped(" ------- ", "c") + const d = toGrouped(" -----| ", "d") + + const sourceStr = " ab---cd--| " + const expectedStr = "ef--ghij-(k|)" + + const source = cold(sourceStr, { a, b, c, d }).pipe(skip(1), startWith(a)) + const result = source.pipe( + collect(), + map((x) => Object.fromEntries(x.entries())), + ) + + expectObservable(result).toBe(expectedStr, { + e: { a }, + f: { a, b }, + g: { a }, + h: { a, c }, + i: { a, c, d }, + j: { c, d }, + k: {}, + }) + }) + }) + + it("emits a map with the latest filtered grouped stream", () => { + scheduler().run(({ expectObservable, cold }) => { + const toGrouped = (source: string, key: string) => { + const result = cold(source).pipe(shareReplay(1)) as GroupedObservable< + string, + string + > + result.key = key + return result + } + + const a = toGrouped("-------| ", "a") + const b = toGrouped(" ---| ", "b") + const c = toGrouped(" ------- ", "c") + const d = toGrouped(" -----| ", "d") + + const sourceStr = " ab---cd--| " + const expectedStr = "ef--g-i--(k|)" + const excluded = ["a", "c"] + + const source = cold(sourceStr, { a, b, c, d }).pipe(skip(1), startWith(a)) + const result = source.pipe( + collect((inner$) => + inner$.pipe( + ignoreElements(), + startWith(!excluded.includes(inner$.key)), + ), + ), + map((x) => Object.fromEntries(x.entries())), + ) + + expectObservable(result).toBe(expectedStr, { + e: {}, + f: { b }, + g: {}, + i: { d }, + k: {}, + }) + }) + }) +}) diff --git a/packages/utils/src/collect.ts b/packages/utils/src/collect.ts new file mode 100644 index 0000000..e726205 --- /dev/null +++ b/packages/utils/src/collect.ts @@ -0,0 +1,66 @@ +import { GroupedObservable, Observable } from "rxjs" +import { + takeUntil, + takeLast, + startWith, + endWith, + ignoreElements, + publish, + mergeMap, + map, + distinctUntilChanged, + skipWhile, +} from "rxjs/operators" +import { scanWithDefaultValue } from "./internal-utils" + +const defaultFilter = (source$: Observable) => + source$.pipe(ignoreElements(), startWith(true), endWith(false)) + +const set = "s" as const +const del = "d" as const +const complete = "c" as const + +/** + * A pipeable operator that collects all the GroupedObservables emitted by + * the source and emits a Map with the active inner observables + * + * @param filter? A function that receives the inner Observable and returns an + * Observable of boolean values, which indicates whether the inner observable + * should be collected. + */ +export const collect = ( + filter?: (source$: GroupedObservable) => Observable, +) => { + const enhancer = filter + ? (source$: GroupedObservable) => + filter(source$).pipe( + endWith(false), + skipWhile((x) => !x), + distinctUntilChanged(), + ) + : defaultFilter + + return (source$: Observable>) => + source$.pipe( + publish((multicasted$) => + multicasted$.pipe( + mergeMap((o) => map((x) => ({ t: x ? set : del, o }))(enhancer(o))), + takeUntil(takeLast(1)(multicasted$)), + ), + ), + endWith({ t: complete }), + scanWithDefaultValue( + (acc, val) => { + if (val.t === set) { + acc.set(val.o.key, val.o) + } else if (val.t === del) { + acc.delete(val.o.key) + } else { + acc.clear() + } + return acc + }, + () => new Map>(), + ), + ) +} diff --git a/packages/utils/src/index.tsx b/packages/utils/src/index.tsx index 033c347..550e8f5 100644 --- a/packages/utils/src/index.tsx +++ b/packages/utils/src/index.tsx @@ -1,5 +1,6 @@ export { useSubscribe } from "./useSubscribe" export { Subscribe } from "./Subscribe" export { collectValues } from "./collectValues" +export { collect } from "./collect" export { mergeWithKey } from "./mergeWithKey" export { split } from "./split" diff --git a/packages/utils/tsconfig.json b/packages/utils/tsconfig.json index c37dcb8..9085962 100644 --- a/packages/utils/tsconfig.json +++ b/packages/utils/tsconfig.json @@ -1,5 +1,5 @@ { - "include": ["src", "types", "test"], + "include": ["src"], "compilerOptions": { "target": "es5", "module": "esnext", @@ -26,8 +26,5 @@ }, "jsx": "react", "esModuleInterop": true - }, - "exclude": [ - "**/*.test.ts" - ] + } }