diff --git a/packages/utils/src/continuousGroupBy.ts b/packages/utils/src/continuousGroupBy.ts index 4e6996d..2065619 100644 --- a/packages/utils/src/continuousGroupBy.ts +++ b/packages/utils/src/continuousGroupBy.ts @@ -1,35 +1,41 @@ -import { Observable, Subject, GroupedObservable, BehaviorSubject } from 'rxjs'; -import { finalize, takeUntil, share } from 'rxjs/operators'; +import { Observable, Subject, GroupedObservable, BehaviorSubject } from "rxjs" +import { finalize, share } from "rxjs/operators" const continuousGroupBy = (mapper: (x: I) => O) => ( - stream: Observable + stream: Observable, ) => - new Observable>(subscriber => { - const groups: Map> = new Map(); - const sourceSubscriptionEnd: Subject = new Subject(); + new Observable>((subscriber) => { + const groups: Map> = new Map() - return stream - .subscribe(x => { - const key = mapper(x); + return stream.subscribe( + (x) => { + const key = mapper(x) if (groups.has(key)) { - return groups.get(key)!.next(x); + return groups.get(key)!.next(x) } - const subject = new BehaviorSubject(x); - groups.set(key, subject); + const subject = new BehaviorSubject(x) + groups.set(key, subject) const res = subject.pipe( finalize(() => groups.delete(key)), - takeUntil(sourceSubscriptionEnd), - share() - ) as GroupedObservable; - res.key = key; + share(), + ) as GroupedObservable + res.key = key - subscriber.next(res); - }, subscriber.error.bind(subscriber), subscriber.complete.bind(subscriber)) - .add(() => { - sourceSubscriptionEnd.next(); - }); - }); + subscriber.next(res) + }, + (e) => { + subscriber.error(e) + /* istanbul ignore next */ + groups.forEach((g) => g.error(e)) + }, + () => { + subscriber.complete() + /* istanbul ignore next */ + groups.forEach((g) => g.complete()) + }, + ) + }) -export default continuousGroupBy; +export default continuousGroupBy diff --git a/packages/utils/src/groupInMap.test.ts b/packages/utils/src/groupInMap.test.ts index 6c71d5f..f58a77a 100644 --- a/packages/utils/src/groupInMap.test.ts +++ b/packages/utils/src/groupInMap.test.ts @@ -1,6 +1,6 @@ import { map, takeWhile } from "rxjs/operators" import { TestScheduler } from "rxjs/testing" -import { groupInMap } from "./groupInMap" +import { groupInMap } from "./" const scheduler = () => new TestScheduler((actual, expected) => { @@ -25,7 +25,47 @@ describe("groupInMap", () => { }, } const source = cold("a-b-c-|", values) - const expected = " m-n-o-(pq|)" + const expected = " m-n-o-(p|)" + + const result = source.pipe( + groupInMap( + (value) => value.key, + (value$) => value$.pipe(map((value) => value.quantity)), + ), + map((groups) => + Array.from(groups.entries()) + .map(([key, value]) => `${key}:${value}`) + .join(","), + ), + ) + + expectObservable(result).toBe(expected, { + m: "group1:1", + n: "group1:1,group2:2", + o: "group1:3,group2:2", + p: "", + }) + }) + }) + + it("propates errors", () => { + scheduler().run(({ expectObservable, cold }) => { + const values = { + a: { + key: "group1", + quantity: 1, + }, + b: { + key: "group2", + quantity: 2, + }, + c: { + key: "group1", + quantity: 3, + }, + } + const source = cold("a-b-c-#", values) + const expected = " m-n-o-#" const result = source.pipe( groupInMap( @@ -43,8 +83,6 @@ describe("groupInMap", () => { m: "group1:1", n: "group1:1,group2:2", o: "group1:3,group2:2", - p: "group2:2", // TODO - I don't think this should be expected - q: "", }) }) }) diff --git a/packages/utils/src/groupInMap.ts b/packages/utils/src/groupInMap.ts index cde7ad7..0975345 100644 --- a/packages/utils/src/groupInMap.ts +++ b/packages/utils/src/groupInMap.ts @@ -1,5 +1,12 @@ -import { Observable, GroupedObservable, concat, of } from "rxjs" -import { map, mergeMap, scan } from "rxjs/operators" +import { Observable, GroupedObservable, concat, of, defer } from "rxjs" +import { + map, + mergeMap, + scan, + publish, + takeUntil, + takeLast, +} from "rxjs/operators" import continuousGroupBy from "./continuousGroupBy" const DELETE = Symbol("DELETE") @@ -7,25 +14,39 @@ const DELETE = Symbol("DELETE") /** * Groups all values by key and emits a Map that hold the latest value for each * key. - * + * * @param keyGetter Key getter. * @param projection Projection function for each group. */ export const groupInMap = ( keyGetter: (x: T) => K, projection: (x: GroupedObservable) => Observable, -) => (source$: Observable): Observable> => - source$.pipe( - continuousGroupBy(keyGetter), - mergeMap(inner$ => - concat( - projection(inner$).pipe(map(v => [inner$.key, v] as const)), - of([inner$.key, DELETE] as const), - ), +) => (source$: Observable): Observable> => { + const res = new Map() + + return concat( + source$.pipe( + continuousGroupBy(keyGetter), + publish((multicasted$) => { + return multicasted$.pipe( + mergeMap((inner$) => + concat( + projection(inner$).pipe(map((v) => [inner$.key, v] as const)), + of([inner$.key, DELETE] as const), + ), + ), + takeUntil(multicasted$.pipe(takeLast(1))), + ) + }), + scan((acc, [key, value]) => { + if (value !== DELETE) return acc.set(key, value) + acc.delete(key) + return acc + }, res), ), - scan((acc, [key, value]) => { - if (value !== DELETE) return acc.set(key, value) - acc.delete(key) - return acc - }, new Map()), + defer(() => { + res.clear() + return of(res) + }), ) +}