diff --git a/packages/utils/README.md b/packages/utils/README.md index c0d1e41..2f8131d 100644 --- a/packages/utils/README.md +++ b/packages/utils/README.md @@ -33,20 +33,15 @@ Properties: Important: This Component doesn't trigger any updates. -### groupInMap +### collectValues -A RxJS pipeable operator which groups all values by key and emits a Map that -holds the latest value for each key - -Arguments: - -- `keyGetter`: A function that extracts the key for each item. -- `projection`: Projection function for each group. +A pipeable operator that collects all the GroupedObservables emitted by +the source and emits a Map with the latest values of the inner observables. ```ts const votesByKey$ = new Subject<{ key: string }>() const counters$ = votesByKey$.pipe( - groupInMap( + split( (vote) => vote.key, (votes$) => votes$.pipe( @@ -55,6 +50,7 @@ const counters$ = votesByKey$.pipe( takeWhile((count) => count < 3), ), ), + collectValues(), ) counters$.subscribe((counters) => { diff --git a/packages/utils/src/groupInMap.test.ts b/packages/utils/src/collectValues.test.ts similarity index 64% rename from packages/utils/src/groupInMap.test.ts rename to packages/utils/src/collectValues.test.ts index f58a77a..b8987c3 100644 --- a/packages/utils/src/groupInMap.test.ts +++ b/packages/utils/src/collectValues.test.ts @@ -1,13 +1,14 @@ import { map, takeWhile } from "rxjs/operators" import { TestScheduler } from "rxjs/testing" -import { groupInMap } from "./" +import { split, collectValues } from "./" +import { from } from "rxjs" const scheduler = () => new TestScheduler((actual, expected) => { expect(actual).toEqual(expected) }) -describe("groupInMap", () => { +describe("collectValues", () => { it("emits a map with the latest value for each group", () => { scheduler().run(({ expectObservable, cold }) => { const values = { @@ -24,14 +25,15 @@ describe("groupInMap", () => { quantity: 3, }, } - const source = cold("a-b-c-|", values) - const expected = " m-n-o-(p|)" + const source = cold("--a-b-c-|", values) + const expected = " p-m-n-o-(p|)" const result = source.pipe( - groupInMap( + split( (value) => value.key, (value$) => value$.pipe(map((value) => value.quantity)), ), + collectValues(), map((groups) => Array.from(groups.entries()) .map(([key, value]) => `${key}:${value}`) @@ -48,7 +50,48 @@ describe("groupInMap", () => { }) }) - it("propates errors", () => { + it("handles synchronous values", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = from([ + { + key: "group1", + quantity: 1, + }, + { + key: "group2", + quantity: 2, + }, + { + key: "group1", + quantity: 3, + }, + ] as Array<{ key: string; quantity: number }>) + + const expected = "(mnop|)" + + const result = source.pipe( + split( + (value) => value.key, + (value$) => value$.pipe(map((value) => value.quantity)), + ), + collectValues(), + map((groups) => + Array.from(groups.entries()) + .map(([key, value]) => `${key}:${value}`) + .join(","), + ), + ) + + expectObservable(result).toBe(expected, { + m: "group1:1", + n: "group1:1,group2:2", + o: "group1:3,group2:2", + p: "", + }) + }) + }) + + it("propagates errors", () => { scheduler().run(({ expectObservable, cold }) => { const values = { a: { @@ -64,14 +107,15 @@ describe("groupInMap", () => { quantity: 3, }, } - const source = cold("a-b-c-#", values) - const expected = " m-n-o-#" + const source = cold("-a-b-c-#", values) + const expected = " pm-n-o-#" const result = source.pipe( - groupInMap( + split( (value) => value.key, (value$) => value$.pipe(map((value) => value.quantity)), ), + collectValues(), map((groups) => Array.from(groups.entries()) .map(([key, value]) => `${key}:${value}`) @@ -83,6 +127,7 @@ describe("groupInMap", () => { m: "group1:1", n: "group1:1,group2:2", o: "group1:3,group2:2", + p: "", }) }) }) @@ -103,11 +148,11 @@ describe("groupInMap", () => { quantity: 3, }, } - const source = cold("a-b-c", values) - const expected = " m-n-o" + const source = cold("-a-b-c", values) + const expected = " pm-n-o" const result = source.pipe( - groupInMap( + split( (value) => value.key, (value$) => value$.pipe( @@ -115,6 +160,7 @@ describe("groupInMap", () => { takeWhile((v) => v < 3), ), ), + collectValues(), map((groups) => Array.from(groups.entries()) .map(([key, value]) => `${key}:${value}`) @@ -126,6 +172,7 @@ describe("groupInMap", () => { m: "group1:1", n: "group1:1,group2:2", o: "group2:2", + p: "", }) }) }) diff --git a/packages/utils/src/collectValues.ts b/packages/utils/src/collectValues.ts new file mode 100644 index 0000000..00e12ae --- /dev/null +++ b/packages/utils/src/collectValues.ts @@ -0,0 +1,45 @@ +import { Observable, GroupedObservable } from "rxjs" +import { + map, + mergeMap, + endWith, + publish, + takeLast, + takeUntil, +} from "rxjs/operators" +import { scanWithDefaultValue } from "./internal-utils" + +/** + * A pipeable operator that collects all the GroupedObservables emitted by + * the source and emits a Map with the latest values of the inner observables. + */ +export const collectValues = () => ( + source$: Observable>, +): Observable> => + source$.pipe( + publish((multicasted$) => + multicasted$.pipe( + mergeMap((inner$) => + inner$.pipe( + map((v) => ({ t: "s" as const, k: inner$.key, v })), + endWith({ t: "d" as const, k: inner$.key }), + ), + ), + takeUntil(multicasted$.pipe(takeLast(1))), + ), + ), + endWith({ t: "c" as const }), + scanWithDefaultValue( + (acc, val) => { + if (val.t === "s") { + acc.set(val.k, val.v) + } else if (val.t === "d") { + acc.delete(val.k) + } else { + acc.clear() + } + return acc + }, + () => new Map(), + ), + ) diff --git a/packages/utils/src/continuousGroupBy.ts b/packages/utils/src/continuousGroupBy.ts deleted file mode 100644 index 2065619..0000000 --- a/packages/utils/src/continuousGroupBy.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { Observable, Subject, GroupedObservable, BehaviorSubject } from "rxjs" -import { finalize, share } from "rxjs/operators" - -const continuousGroupBy = (mapper: (x: I) => O) => ( - stream: Observable, -) => - new Observable>((subscriber) => { - const groups: Map> = new Map() - - return stream.subscribe( - (x) => { - const key = mapper(x) - if (groups.has(key)) { - return groups.get(key)!.next(x) - } - - const subject = new BehaviorSubject(x) - groups.set(key, subject) - - const res = subject.pipe( - finalize(() => groups.delete(key)), - share(), - ) as GroupedObservable - res.key = key - - subscriber.next(res) - }, - (e) => { - subscriber.error(e) - /* istanbul ignore next */ - groups.forEach((g) => g.error(e)) - }, - () => { - subscriber.complete() - /* istanbul ignore next */ - groups.forEach((g) => g.complete()) - }, - ) - }) - -export default continuousGroupBy diff --git a/packages/utils/src/groupInMap.ts b/packages/utils/src/groupInMap.ts deleted file mode 100644 index aeb83d9..0000000 --- a/packages/utils/src/groupInMap.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { Observable, GroupedObservable, concat, of, defer } from "rxjs" -import { - map, - mergeMap, - scan, - publish, - takeUntil, - takeLast, -} from "rxjs/operators" -import continuousGroupBy from "./continuousGroupBy" - -const DELETE = Symbol("DELETE") - -/** - * A pipeable operator that groups all values by key and emits a Map that holds - * the latest value for each key. - * - * @param keyGetter A function that extracts the key for each item. - * @param projection Projection function for each group. - */ -export const groupInMap = ( - keyGetter: (x: T) => K, - projection: (x: GroupedObservable) => Observable, -) => (source$: Observable): Observable> => { - const res = new Map() - - return concat( - source$.pipe( - continuousGroupBy(keyGetter), - publish((multicasted$) => { - return multicasted$.pipe( - mergeMap((inner$) => - concat( - projection(inner$).pipe(map((v) => [inner$.key, v] as const)), - of([inner$.key, DELETE] as const), - ), - ), - takeUntil(multicasted$.pipe(takeLast(1))), - ) - }), - scan((acc, [key, value]) => { - if (value !== DELETE) return acc.set(key, value) - acc.delete(key) - return acc - }, res), - ), - defer(() => { - res.clear() - return of(res) - }), - ) -} diff --git a/packages/utils/src/index.tsx b/packages/utils/src/index.tsx index 5dcffbb..033c347 100644 --- a/packages/utils/src/index.tsx +++ b/packages/utils/src/index.tsx @@ -1,5 +1,5 @@ export { useSubscribe } from "./useSubscribe" export { Subscribe } from "./Subscribe" -export { groupInMap } from "./groupInMap" +export { collectValues } from "./collectValues" export { mergeWithKey } from "./mergeWithKey" export { split } from "./split" diff --git a/packages/utils/src/internal-utils.ts b/packages/utils/src/internal-utils.ts new file mode 100644 index 0000000..a4a98ad --- /dev/null +++ b/packages/utils/src/internal-utils.ts @@ -0,0 +1,30 @@ +import { Observable, defer } from "rxjs" +import { scan } from "rxjs/operators" + +export const defaultStart = (value: T) => (source$: Observable) => + new Observable((observer) => { + let emitted = false + const subscription = source$.subscribe( + (x) => { + emitted = true + observer.next(x) + }, + (e) => observer.error(e), + () => observer.complete(), + ) + + if (!emitted) { + observer.next(value) + } + + return subscription + }) + +export const scanWithDefaultValue = ( + accumulator: (acc: O, current: I) => O, + getSeed: () => O, +) => (source: Observable) => + defer(() => { + const seed = getSeed() + return source.pipe(scan(accumulator, seed), defaultStart(seed)) + })