From 047f83d0a2da3225fe370dd59a534d5650e2b9ea Mon Sep 17 00:00:00 2001 From: Josep M Sobrepere Date: Wed, 26 Aug 2020 19:30:07 +0200 Subject: [PATCH] chore(utils): collector abstraction to reduce bundle size --- packages/utils/src/collect.ts | 35 ++--------------------- packages/utils/src/collectValues.ts | 42 ++++------------------------ packages/utils/src/internal-utils.ts | 41 +++++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 70 deletions(-) diff --git a/packages/utils/src/collect.ts b/packages/utils/src/collect.ts index 17f4e8a..cc07209 100644 --- a/packages/utils/src/collect.ts +++ b/packages/utils/src/collect.ts @@ -1,26 +1,17 @@ import { GroupedObservable, Observable } from "rxjs" import { - takeUntil, - takeLast, startWith, endWith, ignoreElements, - publish, - mergeMap, map, distinctUntilChanged, skipWhile, } from "rxjs/operators" -import { shareLatest } from "@react-rxjs/core" -import { scanWithDefaultValue } from "./internal-utils" +import { set, del, collector } from "./internal-utils" const defaultFilter = (source$: Observable) => source$.pipe(ignoreElements(), startWith(true), endWith(false)) -const set = "s" as const -const del = "d" as const -const complete = "c" as const - /** * A pipeable operator that collects all the GroupedObservables emitted by * the source and emits a Map with the active inner observables @@ -42,27 +33,7 @@ export const collect = ( : defaultFilter return (source$: Observable>) => - source$.pipe( - publish((multicasted$) => - multicasted$.pipe( - mergeMap((o) => map((x) => ({ t: x ? set : del, o }))(enhancer(o))), - takeUntil(takeLast(1)(multicasted$)), - ), - ), - endWith({ t: complete }), - scanWithDefaultValue( - (acc, val) => { - if (val.t === set) { - acc.set(val.o.key, val.o) - } else if (val.t === del) { - acc.delete(val.o.key) - } else { - acc.clear() - } - return acc - }, - () => new Map>(), - ), - shareLatest(), + collector(source$, (o) => + map((x) => ({ t: x ? set : del, k: o.key, v: o }))(enhancer(o)), ) } diff --git a/packages/utils/src/collectValues.ts b/packages/utils/src/collectValues.ts index 60da2f1..1156394 100644 --- a/packages/utils/src/collectValues.ts +++ b/packages/utils/src/collectValues.ts @@ -1,14 +1,6 @@ import { Observable, GroupedObservable } from "rxjs" -import { - map, - mergeMap, - endWith, - publish, - takeLast, - takeUntil, -} from "rxjs/operators" -import { shareLatest } from "@react-rxjs/core" -import { scanWithDefaultValue } from "./internal-utils" +import { map, endWith } from "rxjs/operators" +import { set, del, collector } from "./internal-utils" /** * A pipeable operator that collects all the GroupedObservables emitted by @@ -17,31 +9,9 @@ import { scanWithDefaultValue } from "./internal-utils" export const collectValues = () => ( source$: Observable>, ): Observable> => - source$.pipe( - publish((multicasted$) => - multicasted$.pipe( - mergeMap((inner$) => - inner$.pipe( - map((v) => ({ t: "s" as const, k: inner$.key, v })), - endWith({ t: "d" as const, k: inner$.key }), - ), - ), - takeUntil(multicasted$.pipe(takeLast(1))), - ), + collector(source$, (inner$) => + inner$.pipe( + map((v) => ({ t: set, k: inner$.key, v })), + endWith({ t: del, k: inner$.key }), ), - endWith({ t: "c" as const }), - scanWithDefaultValue( - (acc, val) => { - if (val.t === "s") { - acc.set(val.k, val.v) - } else if (val.t === "d") { - acc.delete(val.k) - } else { - acc.clear() - } - return acc - }, - () => new Map(), - ), - shareLatest(), ) diff --git a/packages/utils/src/internal-utils.ts b/packages/utils/src/internal-utils.ts index a4a98ad..bed7003 100644 --- a/packages/utils/src/internal-utils.ts +++ b/packages/utils/src/internal-utils.ts @@ -1,5 +1,13 @@ -import { Observable, defer } from "rxjs" -import { scan } from "rxjs/operators" +import { Observable, defer, GroupedObservable } from "rxjs" +import { shareLatest } from "@react-rxjs/core" +import { + scan, + publish, + endWith, + takeLast, + takeUntil, + mergeMap, +} from "rxjs/operators" export const defaultStart = (value: T) => (source$: Observable) => new Observable((observer) => { @@ -28,3 +36,32 @@ export const scanWithDefaultValue = ( const seed = getSeed() return source.pipe(scan(accumulator, seed), defaultStart(seed)) }) + +export const set = "s" as const +export const del = "d" as const +export const complete = "c" as const + +export const collector = ( + source: Observable>, + enhancer: ( + source: GroupedObservable, + ) => Observable<{ t: "d"; k: K } | { t: "s"; k: K; v: VV }>, +): Observable> => + source.pipe( + publish((x) => x.pipe(mergeMap(enhancer), takeUntil(takeLast(1)(x)))), + endWith({ t: complete }), + scanWithDefaultValue( + (acc, val) => { + if (val.t === set) { + acc.set(val.k, val.v) + } else if (val.t === del) { + acc.delete(val.k) + } else { + acc.clear() + } + return acc + }, + () => new Map(), + ), + shareLatest(), + )