feat(utils): deprecate groupInMap in favour of collectValues

This commit is contained in:
Josep M Sobrepere Profitós 2020-08-20 19:55:07 +02:00 committed by Josep M Sobrepere
parent 27c90bc693
commit 90de29b08a
7 changed files with 140 additions and 115 deletions

View File

@ -33,20 +33,15 @@ Properties:
Important: This Component doesn't trigger any updates.
### groupInMap
### collectValues
A RxJS pipeable operator which groups all values by key and emits a Map that
holds the latest value for each key
Arguments:
- `keyGetter`: A function that extracts the key for each item.
- `projection`: Projection function for each group.
A pipeable operator that collects all the GroupedObservables emitted by
the source and emits a Map with the latest values of the inner observables.
```ts
const votesByKey$ = new Subject<{ key: string }>()
const counters$ = votesByKey$.pipe(
groupInMap(
split(
(vote) => vote.key,
(votes$) =>
votes$.pipe(
@ -55,6 +50,7 @@ const counters$ = votesByKey$.pipe(
takeWhile((count) => count < 3),
),
),
collectValues(),
)
counters$.subscribe((counters) => {

View File

@ -1,13 +1,14 @@
import { map, takeWhile } from "rxjs/operators"
import { TestScheduler } from "rxjs/testing"
import { groupInMap } from "./"
import { split, collectValues } from "./"
import { from } from "rxjs"
const scheduler = () =>
new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
describe("groupInMap", () => {
describe("collectValues", () => {
it("emits a map with the latest value for each group", () => {
scheduler().run(({ expectObservable, cold }) => {
const values = {
@ -24,14 +25,15 @@ describe("groupInMap", () => {
quantity: 3,
},
}
const source = cold("a-b-c-|", values)
const expected = " m-n-o-(p|)"
const source = cold("--a-b-c-|", values)
const expected = " p-m-n-o-(p|)"
const result = source.pipe(
groupInMap(
split(
(value) => value.key,
(value$) => value$.pipe(map((value) => value.quantity)),
),
collectValues(),
map((groups) =>
Array.from(groups.entries())
.map(([key, value]) => `${key}:${value}`)
@ -48,7 +50,48 @@ describe("groupInMap", () => {
})
})
it("propates errors", () => {
it("handles synchronous values", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = from([
{
key: "group1",
quantity: 1,
},
{
key: "group2",
quantity: 2,
},
{
key: "group1",
quantity: 3,
},
] as Array<{ key: string; quantity: number }>)
const expected = "(mnop|)"
const result = source.pipe(
split(
(value) => value.key,
(value$) => value$.pipe(map((value) => value.quantity)),
),
collectValues(),
map((groups) =>
Array.from(groups.entries())
.map(([key, value]) => `${key}:${value}`)
.join(","),
),
)
expectObservable(result).toBe(expected, {
m: "group1:1",
n: "group1:1,group2:2",
o: "group1:3,group2:2",
p: "",
})
})
})
it("propagates errors", () => {
scheduler().run(({ expectObservable, cold }) => {
const values = {
a: {
@ -64,14 +107,15 @@ describe("groupInMap", () => {
quantity: 3,
},
}
const source = cold("a-b-c-#", values)
const expected = " m-n-o-#"
const source = cold("-a-b-c-#", values)
const expected = " pm-n-o-#"
const result = source.pipe(
groupInMap(
split(
(value) => value.key,
(value$) => value$.pipe(map((value) => value.quantity)),
),
collectValues(),
map((groups) =>
Array.from(groups.entries())
.map(([key, value]) => `${key}:${value}`)
@ -83,6 +127,7 @@ describe("groupInMap", () => {
m: "group1:1",
n: "group1:1,group2:2",
o: "group1:3,group2:2",
p: "",
})
})
})
@ -103,11 +148,11 @@ describe("groupInMap", () => {
quantity: 3,
},
}
const source = cold("a-b-c", values)
const expected = " m-n-o"
const source = cold("-a-b-c", values)
const expected = " pm-n-o"
const result = source.pipe(
groupInMap(
split(
(value) => value.key,
(value$) =>
value$.pipe(
@ -115,6 +160,7 @@ describe("groupInMap", () => {
takeWhile((v) => v < 3),
),
),
collectValues(),
map((groups) =>
Array.from(groups.entries())
.map(([key, value]) => `${key}:${value}`)
@ -126,6 +172,7 @@ describe("groupInMap", () => {
m: "group1:1",
n: "group1:1,group2:2",
o: "group2:2",
p: "",
})
})
})

View File

@ -0,0 +1,45 @@
import { Observable, GroupedObservable } from "rxjs"
import {
map,
mergeMap,
endWith,
publish,
takeLast,
takeUntil,
} from "rxjs/operators"
import { scanWithDefaultValue } from "./internal-utils"
/**
* A pipeable operator that collects all the GroupedObservables emitted by
* the source and emits a Map with the latest values of the inner observables.
*/
export const collectValues = <K, V>() => (
source$: Observable<GroupedObservable<K, V>>,
): Observable<Map<K, V>> =>
source$.pipe(
publish((multicasted$) =>
multicasted$.pipe(
mergeMap((inner$) =>
inner$.pipe(
map((v) => ({ t: "s" as const, k: inner$.key, v })),
endWith({ t: "d" as const, k: inner$.key }),
),
),
takeUntil(multicasted$.pipe(takeLast(1))),
),
),
endWith({ t: "c" as const }),
scanWithDefaultValue(
(acc, val) => {
if (val.t === "s") {
acc.set(val.k, val.v)
} else if (val.t === "d") {
acc.delete(val.k)
} else {
acc.clear()
}
return acc
},
() => new Map<K, V>(),
),
)

View File

@ -1,41 +0,0 @@
import { Observable, Subject, GroupedObservable, BehaviorSubject } from "rxjs"
import { finalize, share } from "rxjs/operators"
const continuousGroupBy = <I, O>(mapper: (x: I) => O) => (
stream: Observable<I>,
) =>
new Observable<GroupedObservable<O, I>>((subscriber) => {
const groups: Map<O, Subject<I>> = new Map()
return stream.subscribe(
(x) => {
const key = mapper(x)
if (groups.has(key)) {
return groups.get(key)!.next(x)
}
const subject = new BehaviorSubject<I>(x)
groups.set(key, subject)
const res = subject.pipe(
finalize(() => groups.delete(key)),
share(),
) as GroupedObservable<O, I>
res.key = key
subscriber.next(res)
},
(e) => {
subscriber.error(e)
/* istanbul ignore next */
groups.forEach((g) => g.error(e))
},
() => {
subscriber.complete()
/* istanbul ignore next */
groups.forEach((g) => g.complete())
},
)
})
export default continuousGroupBy

View File

@ -1,52 +0,0 @@
import { Observable, GroupedObservable, concat, of, defer } from "rxjs"
import {
map,
mergeMap,
scan,
publish,
takeUntil,
takeLast,
} from "rxjs/operators"
import continuousGroupBy from "./continuousGroupBy"
const DELETE = Symbol("DELETE")
/**
* A pipeable operator that groups all values by key and emits a Map that holds
* the latest value for each key.
*
* @param keyGetter A function that extracts the key for each item.
* @param projection Projection function for each group.
*/
export const groupInMap = <T, K, V>(
keyGetter: (x: T) => K,
projection: (x: GroupedObservable<K, T>) => Observable<V>,
) => (source$: Observable<T>): Observable<Map<K, V>> => {
const res = new Map<K, V>()
return concat(
source$.pipe(
continuousGroupBy(keyGetter),
publish((multicasted$) => {
return multicasted$.pipe(
mergeMap((inner$) =>
concat(
projection(inner$).pipe(map((v) => [inner$.key, v] as const)),
of([inner$.key, DELETE] as const),
),
),
takeUntil(multicasted$.pipe(takeLast(1))),
)
}),
scan((acc, [key, value]) => {
if (value !== DELETE) return acc.set(key, value)
acc.delete(key)
return acc
}, res),
),
defer(() => {
res.clear()
return of(res)
}),
)
}

View File

@ -1,5 +1,5 @@
export { useSubscribe } from "./useSubscribe"
export { Subscribe } from "./Subscribe"
export { groupInMap } from "./groupInMap"
export { collectValues } from "./collectValues"
export { mergeWithKey } from "./mergeWithKey"
export { split } from "./split"

View File

@ -0,0 +1,30 @@
import { Observable, defer } from "rxjs"
import { scan } from "rxjs/operators"
export const defaultStart = <T>(value: T) => (source$: Observable<T>) =>
new Observable<T>((observer) => {
let emitted = false
const subscription = source$.subscribe(
(x) => {
emitted = true
observer.next(x)
},
(e) => observer.error(e),
() => observer.complete(),
)
if (!emitted) {
observer.next(value)
}
return subscription
})
export const scanWithDefaultValue = <I, O>(
accumulator: (acc: O, current: I) => O,
getSeed: () => O,
) => (source: Observable<I>) =>
defer(() => {
const seed = getSeed()
return source.pipe(scan(accumulator, seed), defaultStart(seed))
})