fix(groupInMap): just emit empty map on source completion

This commit is contained in:
Josep M Sobrepere 2020-07-17 01:14:52 +02:00
parent 367128f238
commit 3afe196301
3 changed files with 108 additions and 43 deletions

View File

@ -1,35 +1,41 @@
import { Observable, Subject, GroupedObservable, BehaviorSubject } from 'rxjs';
import { finalize, takeUntil, share } from 'rxjs/operators';
import { Observable, Subject, GroupedObservable, BehaviorSubject } from "rxjs"
import { finalize, share } from "rxjs/operators"
const continuousGroupBy = <I, O>(mapper: (x: I) => O) => (
stream: Observable<I>
stream: Observable<I>,
) =>
new Observable<GroupedObservable<O, I>>(subscriber => {
const groups: Map<O, Subject<I>> = new Map();
const sourceSubscriptionEnd: Subject<undefined> = new Subject();
new Observable<GroupedObservable<O, I>>((subscriber) => {
const groups: Map<O, Subject<I>> = new Map()
return stream
.subscribe(x => {
const key = mapper(x);
return stream.subscribe(
(x) => {
const key = mapper(x)
if (groups.has(key)) {
return groups.get(key)!.next(x);
return groups.get(key)!.next(x)
}
const subject = new BehaviorSubject<I>(x);
groups.set(key, subject);
const subject = new BehaviorSubject<I>(x)
groups.set(key, subject)
const res = subject.pipe(
finalize(() => groups.delete(key)),
takeUntil(sourceSubscriptionEnd),
share()
) as GroupedObservable<O, I>;
res.key = key;
share(),
) as GroupedObservable<O, I>
res.key = key
subscriber.next(res);
}, subscriber.error.bind(subscriber), subscriber.complete.bind(subscriber))
.add(() => {
sourceSubscriptionEnd.next();
});
});
subscriber.next(res)
},
(e) => {
subscriber.error(e)
/* istanbul ignore next */
groups.forEach((g) => g.error(e))
},
() => {
subscriber.complete()
/* istanbul ignore next */
groups.forEach((g) => g.complete())
},
)
})
export default continuousGroupBy;
export default continuousGroupBy

View File

@ -1,6 +1,6 @@
import { map, takeWhile } from "rxjs/operators"
import { TestScheduler } from "rxjs/testing"
import { groupInMap } from "./groupInMap"
import { groupInMap } from "./"
const scheduler = () =>
new TestScheduler((actual, expected) => {
@ -25,7 +25,47 @@ describe("groupInMap", () => {
},
}
const source = cold("a-b-c-|", values)
const expected = " m-n-o-(pq|)"
const expected = " m-n-o-(p|)"
const result = source.pipe(
groupInMap(
(value) => value.key,
(value$) => value$.pipe(map((value) => value.quantity)),
),
map((groups) =>
Array.from(groups.entries())
.map(([key, value]) => `${key}:${value}`)
.join(","),
),
)
expectObservable(result).toBe(expected, {
m: "group1:1",
n: "group1:1,group2:2",
o: "group1:3,group2:2",
p: "",
})
})
})
it("propates errors", () => {
scheduler().run(({ expectObservable, cold }) => {
const values = {
a: {
key: "group1",
quantity: 1,
},
b: {
key: "group2",
quantity: 2,
},
c: {
key: "group1",
quantity: 3,
},
}
const source = cold("a-b-c-#", values)
const expected = " m-n-o-#"
const result = source.pipe(
groupInMap(
@ -43,8 +83,6 @@ describe("groupInMap", () => {
m: "group1:1",
n: "group1:1,group2:2",
o: "group1:3,group2:2",
p: "group2:2", // TODO - I don't think this should be expected
q: "",
})
})
})

View File

@ -1,5 +1,12 @@
import { Observable, GroupedObservable, concat, of } from "rxjs"
import { map, mergeMap, scan } from "rxjs/operators"
import { Observable, GroupedObservable, concat, of, defer } from "rxjs"
import {
map,
mergeMap,
scan,
publish,
takeUntil,
takeLast,
} from "rxjs/operators"
import continuousGroupBy from "./continuousGroupBy"
const DELETE = Symbol("DELETE")
@ -7,25 +14,39 @@ const DELETE = Symbol("DELETE")
/**
* Groups all values by key and emits a Map that hold the latest value for each
* key.
*
*
* @param keyGetter Key getter.
* @param projection Projection function for each group.
*/
export const groupInMap = <T, K, V>(
keyGetter: (x: T) => K,
projection: (x: GroupedObservable<K, T>) => Observable<V>,
) => (source$: Observable<T>): Observable<Map<K, V>> =>
source$.pipe(
continuousGroupBy(keyGetter),
mergeMap(inner$ =>
concat(
projection(inner$).pipe(map(v => [inner$.key, v] as const)),
of([inner$.key, DELETE] as const),
),
) => (source$: Observable<T>): Observable<Map<K, V>> => {
const res = new Map<K, V>()
return concat(
source$.pipe(
continuousGroupBy(keyGetter),
publish((multicasted$) => {
return multicasted$.pipe(
mergeMap((inner$) =>
concat(
projection(inner$).pipe(map((v) => [inner$.key, v] as const)),
of([inner$.key, DELETE] as const),
),
),
takeUntil(multicasted$.pipe(takeLast(1))),
)
}),
scan((acc, [key, value]) => {
if (value !== DELETE) return acc.set(key, value)
acc.delete(key)
return acc
}, res),
),
scan((acc, [key, value]) => {
if (value !== DELETE) return acc.set(key, value)
acc.delete(key)
return acc
}, new Map<K, V>()),
defer(() => {
res.clear()
return of(res)
}),
)
}