Rename mergeActiveKeys for combineKeys, add tests and jsdocs

This commit is contained in:
Víctor Oliva 2021-03-30 12:24:25 +02:00 committed by Josep M Sobrepere
parent f976661c2e
commit 35f46c96b8
3 changed files with 137 additions and 5 deletions

View File

@ -0,0 +1,122 @@
import { Observable } from "rxjs"
import { map, scan } from "rxjs/operators"
import { TestScheduler } from "rxjs/testing"
import { combineKeys } from "./"
const scheduler = () =>
new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
describe("combineKeys", () => {
it("emits a map with the latest value of the stream of each key", () => {
scheduler().run(({ expectObservable, cold }) => {
const keys = cold(" ab---cd---").pipe(scan((acc, v) => [...acc, v], []))
const a = cold(" --1---2---")
const b = cold(" ---------")
const c = cold(" 1----")
const d = cold(" 9---")
const expectedStr = "--e--f(gh)"
const innerStreams = { a, b, c, d }
const result = combineKeys(
keys,
(v): Observable<string> => innerStreams[v],
).pipe(map((x) => Object.fromEntries(x.entries())))
expectObservable(result).toBe(expectedStr, {
e: { a: "1" },
f: { a: "1", c: "1" },
g: { a: "2", c: "1" },
h: { a: "2", c: "1", d: "9" },
})
})
})
it("removes the entry of a key when that key is removed from the source", () => {
scheduler().run(({ expectObservable, cold }) => {
const keys = cold(" a-b---c---").pipe(map((v) => [v]))
const a = cold(" ----------")
const b = cold(" -1-2-3-4")
const c = cold(" -2-3")
const expectedStr = "---e-fgh-i"
const innerStreams = { a, b, c }
const result = combineKeys(
keys,
(v): Observable<string> => innerStreams[v],
).pipe(map((x) => Object.fromEntries(x.entries())))
expectObservable(result).toBe(expectedStr, {
e: { b: "1" },
f: { b: "2" },
g: {},
h: { c: "2" },
i: { c: "3" },
})
})
})
it("completes when the key stream completes", () => {
scheduler().run(({ expectObservable, cold }) => {
const keys = cold(" a-b---|").pipe(scan((acc, v) => [...acc, v], []))
const a = cold(" -1-----")
const b = cold(" -1---")
const expectedStr = "-e-f--|"
const innerStreams = { a, b }
const result = combineKeys(
keys,
(v): Observable<string> => innerStreams[v],
).pipe(map((x) => Object.fromEntries(x.entries())))
expectObservable(result).toBe(expectedStr, {
e: { a: "1" },
f: { a: "1", b: "1" },
})
})
})
it("propagates errors from the inner streams", () => {
scheduler().run(({ expectObservable, cold }) => {
const keys = cold(" a-b---|").pipe(scan((acc, v) => [...acc, v], []))
const a = cold(" -1-----")
const b = cold(" -#")
const expectedStr = "-e-#"
const innerStreams = { a, b }
const result = combineKeys(
keys,
(v): Observable<string> => innerStreams[v],
).pipe(map((x) => Object.fromEntries(x.entries())))
expectObservable(result).toBe(expectedStr, {
e: { a: "1" },
})
})
})
it("propagates errors from the key stream", () => {
scheduler().run(({ expectObservable, cold }) => {
const keys = cold(" a-b#").pipe(scan((acc, v) => [...acc, v], []))
const a = cold(" -1--")
const b = cold(" -1")
const expectedStr = "-e-#"
const innerStreams = { a, b }
const result = combineKeys(
keys,
(v): Observable<string> => innerStreams[v],
).pipe(map((x) => Object.fromEntries(x.entries())))
expectObservable(result).toBe(expectedStr, {
e: { a: "1" },
})
})
})
})

View File

@ -1,7 +1,14 @@
import { Observable, Subscription } from "rxjs"
export const mergeActiveKeys = <K, T>(
activeKeys$: Observable<Iterable<K>>,
/**
* Creates a stream that combines the result of the streams from each key of the input stream.
*
* @param keys$ Stream of the list of keys to subscribe to.
* @param getInner$ Function that returns the stream for each key.
* @returns An stream with a map containing the latest value from the stream of each key.
*/
export const combineKeys = <K, T>(
keys$: Observable<Array<K> | Set<K>>,
getInner$: (key: K) => Observable<T>,
): Observable<Map<K, T>> =>
new Observable((observer) => {
@ -12,16 +19,18 @@ export const mergeActiveKeys = <K, T>(
if (!updatingSource) observer.next(new Map(currentValue))
}
const subscription = activeKeys$.subscribe(
const subscription = keys$.subscribe(
(nextKeysArr) => {
updatingSource = true
const nextKeys = new Set(nextKeysArr)
let changes = false
innerSubscriptions.forEach((sub, key) => {
if (!nextKeys.has(key)) {
changes = true
sub.unsubscribe()
currentValue.delete(key)
if (currentValue.has(key)) {
changes = true
currentValue.delete(key)
}
} else {
nextKeys.delete(key)
}

View File

@ -1,5 +1,6 @@
export { collectValues } from "./collectValues"
export { collect } from "./collect"
export { combineKeys } from "./combineKeys"
export { getGroupedObservable } from "./getGroupedObservable"
export { createSignal } from "./createSignal"
export { createKeyedSignal } from "./createKeyedSignal"