feat(utils): add collect operator

This commit is contained in:
Josep M Sobrepere 2020-08-25 19:10:39 +02:00
parent 1e1ad4e793
commit 31f7029c36
5 changed files with 175 additions and 5 deletions

View File

@ -101,6 +101,17 @@ votesByKey$.next({ key: "bar" })
// > counters$:
```
### collect
A pipeable operator that collects all the GroupedObservables emitted by
the source and emits a Map with the active inner observables.
Arguments:
- `filter?`: A function that receives the inner Observable and returns an
Observable of boolean values, which indicates whether the inner observable
should be collected.
### mergeWithKey
Emits the values from all the streams of the provided object, in a result

View File

@ -0,0 +1,95 @@
import {
map,
shareReplay,
skip,
startWith,
ignoreElements,
} from "rxjs/operators"
import { TestScheduler } from "rxjs/testing"
import { collect } from "./"
import { GroupedObservable } from "rxjs"
const scheduler = () =>
new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
describe("collect", () => {
it("emits a map with the latest grouped stream", () => {
scheduler().run(({ expectObservable, cold }) => {
const toGrouped = (source: string, key: string) => {
const result = cold(source).pipe(shareReplay(1)) as GroupedObservable<
string,
string
>
result.key = key
return result
}
const a = toGrouped("-------| ", "a")
const b = toGrouped(" ---| ", "b")
const c = toGrouped(" ------- ", "c")
const d = toGrouped(" -----| ", "d")
const sourceStr = " ab---cd--| "
const expectedStr = "ef--ghij-(k|)"
const source = cold(sourceStr, { a, b, c, d }).pipe(skip(1), startWith(a))
const result = source.pipe(
collect(),
map((x) => Object.fromEntries(x.entries())),
)
expectObservable(result).toBe(expectedStr, {
e: { a },
f: { a, b },
g: { a },
h: { a, c },
i: { a, c, d },
j: { c, d },
k: {},
})
})
})
it("emits a map with the latest filtered grouped stream", () => {
scheduler().run(({ expectObservable, cold }) => {
const toGrouped = (source: string, key: string) => {
const result = cold(source).pipe(shareReplay(1)) as GroupedObservable<
string,
string
>
result.key = key
return result
}
const a = toGrouped("-------| ", "a")
const b = toGrouped(" ---| ", "b")
const c = toGrouped(" ------- ", "c")
const d = toGrouped(" -----| ", "d")
const sourceStr = " ab---cd--| "
const expectedStr = "ef--g-i--(k|)"
const excluded = ["a", "c"]
const source = cold(sourceStr, { a, b, c, d }).pipe(skip(1), startWith(a))
const result = source.pipe(
collect((inner$) =>
inner$.pipe(
ignoreElements(),
startWith(!excluded.includes(inner$.key)),
),
),
map((x) => Object.fromEntries(x.entries())),
)
expectObservable(result).toBe(expectedStr, {
e: {},
f: { b },
g: {},
i: { d },
k: {},
})
})
})
})

View File

@ -0,0 +1,66 @@
import { GroupedObservable, Observable } from "rxjs"
import {
takeUntil,
takeLast,
startWith,
endWith,
ignoreElements,
publish,
mergeMap,
map,
distinctUntilChanged,
skipWhile,
} from "rxjs/operators"
import { scanWithDefaultValue } from "./internal-utils"
const defaultFilter = (source$: Observable<any>) =>
source$.pipe(ignoreElements(), startWith(true), endWith(false))
const set = "s" as const
const del = "d" as const
const complete = "c" as const
/**
* A pipeable operator that collects all the GroupedObservables emitted by
* the source and emits a Map with the active inner observables
*
* @param filter? A function that receives the inner Observable and returns an
* Observable of boolean values, which indicates whether the inner observable
* should be collected.
*/
export const collect = <K, V>(
filter?: (source$: GroupedObservable<K, V>) => Observable<boolean>,
) => {
const enhancer = filter
? (source$: GroupedObservable<K, V>) =>
filter(source$).pipe(
endWith(false),
skipWhile((x) => !x),
distinctUntilChanged(),
)
: defaultFilter
return (source$: Observable<GroupedObservable<K, V>>) =>
source$.pipe(
publish((multicasted$) =>
multicasted$.pipe(
mergeMap((o) => map((x) => ({ t: x ? set : del, o }))(enhancer(o))),
takeUntil(takeLast(1)(multicasted$)),
),
),
endWith({ t: complete }),
scanWithDefaultValue(
(acc, val) => {
if (val.t === set) {
acc.set(val.o.key, val.o)
} else if (val.t === del) {
acc.delete(val.o.key)
} else {
acc.clear()
}
return acc
},
() => new Map<K, GroupedObservable<K, V>>(),
),
)
}

View File

@ -1,5 +1,6 @@
export { useSubscribe } from "./useSubscribe"
export { Subscribe } from "./Subscribe"
export { collectValues } from "./collectValues"
export { collect } from "./collect"
export { mergeWithKey } from "./mergeWithKey"
export { split } from "./split"

View File

@ -1,5 +1,5 @@
{
"include": ["src", "types", "test"],
"include": ["src"],
"compilerOptions": {
"target": "es5",
"module": "esnext",
@ -26,8 +26,5 @@
},
"jsx": "react",
"esModuleInterop": true
},
"exclude": [
"**/*.test.ts"
]
}
}