diff --git a/packages/utils/src/partitionByKey.test.ts b/packages/utils/src/partitionByKey.test.ts index 8d13084..85b0df5 100644 --- a/packages/utils/src/partitionByKey.test.ts +++ b/packages/utils/src/partitionByKey.test.ts @@ -610,6 +610,47 @@ describe("partitionByKey", () => { expectObservable(getInstance$("b")).toBe(expectB) }) }) + + it("synchronously emits when the group observable notifies of a new GroupedObservable", () => { + const subject = new Subject() + const [getInner$, keys$] = partitionByKey(subject, (x) => x, take(1)) + + const key = 8 + let receivedValue = 0 + let deleted: number[] = [] + let done = false + let order: string[] = [] + keys$.subscribe((keys) => { + if (keys.type === "add") { + order.push("outer add") + getInner$([...keys.keys][0]).subscribe({ + next: (x) => { + receivedValue = x + order.push("inner next") + }, + complete: () => { + order.push("inner complete") + done = true + }, + }) + } else { + order.push("outer delete") + deleted = [...keys.keys] + } + }) + + subject.next(key) + + expect(receivedValue).toBe(key) + expect(done).toBe(true) + expect(deleted).toEqual([key]) + expect(order).toEqual([ + "outer add", + "inner next", + "outer delete", + "inner complete", + ]) + }) }) describe("performance", () => { diff --git a/packages/utils/src/partitionByKey.ts b/packages/utils/src/partitionByKey.ts index dfafbfe..acce135 100644 --- a/packages/utils/src/partitionByKey.ts +++ b/packages/utils/src/partitionByKey.ts @@ -1,6 +1,5 @@ import { shareLatest } from "@react-rxjs/core" import { - defer, GroupedObservable, identity, noop, @@ -8,7 +7,7 @@ import { Subject, Subscription, } from "rxjs" -import { finalize, map } from "rxjs/operators" +import { map } from "rxjs/operators" export interface KeyChanges { type: "add" | "remove" @@ -59,23 +58,55 @@ export function partitionByKey( const groups: Map> = new Map() let sourceCompleted = false + const finalize = + (type: "error" | "complete") => + (...args: any[]) => { + sourceCompleted = true + if (groups.size) { + groups.forEach((g) => (g.source[type] as any)(...args)) + } else { + subscriber[type](...args) + } + } + const sub = stream.subscribe( (x) => { const key = keySelector(x) - if (groups.has(key)) { - return groups.get(key)!.source.next(x) + if (groups.has(key)) return groups.get(key)!.source.next(x) + + let pendingFirstAdd = true + const emitFirstAdd = () => { + if (pendingFirstAdd) { + pendingFirstAdd = false + subscriber.next({ + groups, + changes: { + type: "add", + keys: [key], + }, + }) + } } const subject = new Subject() + let pendingFirstVal = true + const emitFirstValue = () => { + if (pendingFirstVal) { + pendingFirstVal = false + subject.next(x) + } + } const shared$ = shareLatest()( (streamSelector || identity)(subject, key), ) - - const res = defer(() => { + const res = new Observable((observer) => { incRefcount() - return shared$ - }).pipe(finalize(() => decRefcount())) as any as GroupedObservable + const subscription = shared$.subscribe(observer) + subscription.add(decRefcount) + emitFirstValue() + return subscription + }) as any as GroupedObservable ;(res as any).key = key const innerGroup: InnerGroup = { @@ -85,19 +116,12 @@ export function partitionByKey( } groups.set(key, innerGroup) - subscriber.next({ - groups, - changes: { - type: "add", - keys: [key], - }, - }) - innerGroup.subscription = shared$.subscribe( noop, (e) => subscriber.error(e), () => { groups.delete(key) + emitFirstAdd() subscriber.next({ groups, changes: { @@ -111,24 +135,11 @@ export function partitionByKey( } }, ) - subject.next(x) - }, - (e) => { - sourceCompleted = true - if (groups.size) { - groups.forEach((g) => g.source.error(e)) - } else { - subscriber.error(e) - } - }, - () => { - sourceCompleted = true - if (groups.size) { - groups.forEach((g) => g.source.complete()) - } else { - subscriber.complete() - } + emitFirstAdd() + emitFirstValue() }, + finalize("error"), + finalize("complete"), ) return () => {