diff --git a/packages/utils/src/partitionByKey.test.ts b/packages/utils/src/partitionByKey.test.ts index c4c6aaf..996b729 100644 --- a/packages/utils/src/partitionByKey.test.ts +++ b/packages/utils/src/partitionByKey.test.ts @@ -12,7 +12,7 @@ describe("partitionByKey", () => { describe("activeKeys$", () => { it("emits a list with all the active keys", () => { scheduler().run(({ expectObservable, cold }) => { - const source = cold("-ab---cd---") + const source = cold("-ab-a-cd---") const expectedStr = "efg---hi---" const [, result] = partitionByKey( source, @@ -30,6 +30,23 @@ describe("partitionByKey", () => { }) }) + it("emits all the synchronous groups in a single emission", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = concat(of("a", "b"), cold("--c--")) + const expectedStr = " g-h--" + const [, result] = partitionByKey( + source, + (v) => v, + () => NEVER, + ) + + expectObservable(result).toBe(expectedStr, { + g: ["a", "b"], + h: ["a", "b", "c"], + }) + }) + }) + it("removes a key from the list when its inner stream completes", () => { scheduler().run(({ expectObservable, cold }) => { const source = cold("-ab---c--") @@ -37,7 +54,7 @@ describe("partitionByKey", () => { const b = cold(" ---|") const c = cold(" 1-|") const expectedStr = "efg--hi-j" - const innerStreams: Record> = { a, b, c } + const innerStreams: Record> = { a, b, c } const [, result] = partitionByKey( source, (v) => v, @@ -65,7 +82,7 @@ describe("partitionByKey", () => { const a = cold(" --1---2-|") const b = cold(" ---|") const expectedStr = "efg--h---(i|)" - const innerStreams = { a, b } + const innerStreams: Record> = { a, b } const [, result] = partitionByKey( source, (v) => v, @@ -85,6 +102,84 @@ describe("partitionByKey", () => { }) }) }) + + it("completes when no key is alive and the source completes", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab---|") + const a = cold(" --1|") + const b = cold(" ---|") + const expectedStr = "efg-hi|" + const innerStreams: Record> = { a, b } + const [, result] = partitionByKey( + source, + (v) => v, + (v$) => + v$.pipe( + take(1), + switchMap((v) => innerStreams[v]), + ), + ) + + expectObservable(result).toBe(expectedStr, { + e: [], + f: ["a"], + g: ["a", "b"], + h: ["b"], + i: [], + }) + }) + }) + + it("errors when the source emits an error", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab--#") + const a = cold(" --1---2") + const b = cold(" ------") + const expectedStr = "efg--#" + const innerStreams: Record> = { a, b } + const [, result] = partitionByKey( + source, + (v) => v, + (v$) => + v$.pipe( + take(1), + switchMap((v) => innerStreams[v]), + ), + ) + + expectObservable(result).toBe(expectedStr, { + e: [], + f: ["a"], + g: ["a", "b"], + }) + }) + }) + + it("removes a key when its inner stream emits an error", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("-ab-----") + const a = cold(" --1-#") + const b = cold(" ------") + const expectedStr = "efg--h" + const innerStreams: Record> = { a, b } + const [, result] = partitionByKey( + source, + (v) => v, + (v$) => + v$.pipe( + take(1), + switchMap((v) => innerStreams[v]), + ), + ) + + expectObservable(result).toBe(expectedStr, { + e: [], + f: ["a"], + g: ["a", "b"], + h: ["b"], + }) + }) + }) }) describe("getInstance$", () => { @@ -98,7 +193,7 @@ describe("partitionByKey", () => { const expectB = " -----|" const expectC = " ------1-|" - const innerStreams: Record> = { a, b, c } + const innerStreams: Record> = { a, b, c } const [getInstance$] = partitionByKey( source, (v) => v, diff --git a/packages/utils/src/partitionByKey.ts b/packages/utils/src/partitionByKey.ts index 3922547..dc6e518 100644 --- a/packages/utils/src/partitionByKey.ts +++ b/packages/utils/src/partitionByKey.ts @@ -1,6 +1,13 @@ -import { GroupedObservable, Observable } from "rxjs" +import { shareLatest } from "@react-rxjs/core" +import { + GroupedObservable, + noop, + Observable, + Subject, + Subscription, +} from "rxjs" import { map } from "rxjs/operators" -import { collect, getGroupedObservable, split } from "./" +import { getGroupedObservable } from "./" /** * Groups the elements from the source stream by using `keySelector`, returning @@ -18,9 +25,89 @@ export function partitionByKey( keySelector: (value: T) => K, streamSelector: (grouped: Observable, key: K) => Observable, ): [(key: K) => GroupedObservable, Observable] { - const source$ = stream.pipe(split(keySelector, streamSelector), collect()) + const groupedObservables$ = new Observable>>( + (subscriber) => { + const groups: Map> = new Map() + + let warmup = true + let sourceCompleted = false + const sub = stream.subscribe( + (x) => { + const key = keySelector(x) + if (groups.has(key)) { + return groups.get(key)!.source.next(x) + } + + const subject = new Subject() + + const res = streamSelector(subject, key).pipe( + shareLatest(), + ) as GroupedObservable + res.key = key + + const innerGroup: InnerGroup = { + source: subject, + observable: res, + subscription: new Subscription(), + } + groups.set(key, innerGroup) + + const onFinish = () => { + groups.delete(key) + subscriber.next(mapGroups(groups)) + + if (groups.size === 0 && sourceCompleted) { + subscriber.complete() + } + } + innerGroup.subscription = res.subscribe(noop, onFinish, onFinish) + + subject.next(x) + if (!warmup) { + subscriber.next(mapGroups(groups)) + } + }, + (e) => { + subscriber.error(e) + }, + () => { + sourceCompleted = true + if (groups.size === 0) { + subscriber.complete() + } + groups.forEach((g) => g.source.complete()) + }, + ) + + warmup = false + subscriber.next(mapGroups(groups)) + + return () => { + sub.unsubscribe() + groups.forEach((g) => { + g.source.unsubscribe() + g.subscription.unsubscribe() + }) + } + }, + ).pipe(shareLatest()) + return [ - (key: K) => getGroupedObservable(source$, key), - source$.pipe(map((x) => Array.from(x.keys()))), + (key: K) => getGroupedObservable(groupedObservables$, key), + groupedObservables$.pipe(map((x) => Array.from(x.keys()))), ] } + +interface InnerGroup { + source: Subject + observable: GroupedObservable + subscription: Subscription +} + +function mapGroups( + groups: Map>, +): Map> { + return new Map( + Array.from(groups.entries()).map(([key, group]) => [key, group.observable]), + ) +}