diff --git a/packages/utils/src/partitionByKey.test.ts b/packages/utils/src/partitionByKey.test.ts index dba0197..4aa236d 100644 --- a/packages/utils/src/partitionByKey.test.ts +++ b/packages/utils/src/partitionByKey.test.ts @@ -16,11 +16,7 @@ describe("partitionByKey", () => { const expectOdd = " -1--3-5--" const expectEven = " --2--4-6-" - const [getInstance$] = partitionByKey( - source, - (v) => Number(v) % 2, - (v$) => v$, - ) + const [getInstance$] = partitionByKey(source, (v) => Number(v) % 2) expectObservable(getInstance$(0)).toBe(expectEven) expectObservable(getInstance$(1)).toBe(expectOdd) diff --git a/packages/utils/src/partitionByKey.ts b/packages/utils/src/partitionByKey.ts index 563d36c..aeb8e86 100644 --- a/packages/utils/src/partitionByKey.ts +++ b/packages/utils/src/partitionByKey.ts @@ -5,6 +5,7 @@ import { Observable, Subject, Subscription, + identity, } from "rxjs" import { map } from "rxjs/operators" @@ -23,6 +24,27 @@ export function partitionByKey( stream: Observable, keySelector: (value: T) => K, streamSelector: (grouped: Observable, key: K) => Observable, +): [(key: K) => GroupedObservable, Observable] + +/** + * Groups the elements from the source stream by using `keySelector`, returning + * a stream of the active keys, and a function to get the stream of a specific group + * + * @param stream Input stream + * @param keySelector Function that specifies the key for each element in `stream` + * @returns [1, 2] + * 1. A function that accepts a key and returns the stream for the group of that key. + * 2. A stream with the list of active keys + */ +export function partitionByKey( + stream: Observable, + keySelector: (value: T) => K, +): [(key: K) => GroupedObservable, Observable] + +export function partitionByKey( + stream: Observable, + keySelector: (value: T) => K, + streamSelector?: (grouped: Observable, key: K) => Observable, ): [(key: K) => GroupedObservable, Observable] { const groupedObservables$ = new Observable>>( (subscriber) => { @@ -39,8 +61,8 @@ export function partitionByKey( const subject = new Subject() - const res = streamSelector(subject, key).pipe( - shareLatest(), + const res = shareLatest()( + (streamSelector || identity)(subject, key), ) as GroupedObservable ;(res as any).key = key