refactor(utils): leverage pipe from RxJS

This commit is contained in:
Josep M Sobrepere 2020-10-02 11:51:27 +02:00
parent c5d3039187
commit 199a791834
5 changed files with 21 additions and 29 deletions

View File

@ -35,14 +35,11 @@ export const collect = <K, V>(
)
: defaultFilter
return (source$: Observable<GroupedObservable<K, V>>) =>
collector(source$, (o) =>
map((x) => ({
t: x
? (CollectorAction.Set as const)
: (CollectorAction.Delete as const),
k: o.key,
v: o,
}))(enhancer(o)),
)
return collector((o) =>
map((x) => ({
t: x ? (CollectorAction.Set as const) : (CollectorAction.Delete as const),
k: o.key,
v: o,
}))(enhancer(o)),
)
}

View File

@ -1,4 +1,4 @@
import { Observable, GroupedObservable, OperatorFunction } from "rxjs"
import { GroupedObservable, OperatorFunction } from "rxjs"
import { map, endWith } from "rxjs/operators"
import { CollectorAction, collector } from "./internal-utils"
@ -9,8 +9,8 @@ import { CollectorAction, collector } from "./internal-utils"
export const collectValues = <K, V>(): OperatorFunction<
GroupedObservable<K, V>,
Map<K, V>
> => (source$: Observable<GroupedObservable<K, V>>): Observable<Map<K, V>> =>
collector(source$, (inner$) =>
> =>
collector((inner$) =>
inner$.pipe(
map((v) => ({ t: CollectorAction.Set as const, k: inner$.key, v })),
endWith({ t: CollectorAction.Delete, k: inner$.key }),

View File

@ -1,4 +1,4 @@
import { Observable, defer, GroupedObservable } from "rxjs"
import { Observable, defer, GroupedObservable, pipe } from "rxjs"
import { shareLatest } from "@react-rxjs/core"
import {
scan,
@ -44,15 +44,14 @@ export const enum CollectorAction {
}
export const collector = <K, V, VV>(
source: Observable<GroupedObservable<K, V>>,
enhancer: (
source: GroupedObservable<K, V>,
) => Observable<
| { t: CollectorAction.Delete; k: K }
| { t: CollectorAction.Set; k: K; v: VV }
>,
): Observable<Map<K, VV>> =>
source.pipe(
): ((source: Observable<GroupedObservable<K, V>>) => Observable<Map<K, VV>>) =>
pipe(
publish((x) => x.pipe(mergeMap(enhancer), takeUntil(takeLast(1)(x)))),
endWith({ t: CollectorAction.Complete as const }),
scanWithDefaultValue(

View File

@ -1,4 +1,4 @@
import { ObservableInput, from } from "rxjs"
import { ObservableInput, from, Observable } from "rxjs"
import { startWith } from "rxjs/operators"
import { SUSPENSE } from "@react-rxjs/core"
@ -7,5 +7,7 @@ import { SUSPENSE } from "@react-rxjs/core"
*
* @param source$ Source observable
*/
export const suspend = <T>(source$: ObservableInput<T>) =>
from(source$).pipe(startWith(SUSPENSE))
export const suspend: <T>(
source$: ObservableInput<T>,
) => Observable<T | typeof SUSPENSE> = <T>(source$: ObservableInput<T>) =>
startWith(SUSPENSE)(from(source$)) as any

View File

@ -1,9 +1,4 @@
import {
ObservableInput,
Observable,
OperatorFunction,
ObservedValueOf,
} from "rxjs"
import { ObservableInput, OperatorFunction, ObservedValueOf, pipe } from "rxjs"
import { switchMap } from "rxjs/operators"
import { suspend } from "./suspend"
import { SUSPENSE } from "@react-rxjs/core"
@ -16,6 +11,5 @@ import { SUSPENSE } from "@react-rxjs/core"
*/
export const switchMapSuspended = <T, O extends ObservableInput<any>>(
project: (value: T, index: number) => O,
): OperatorFunction<T, ObservedValueOf<O> | typeof SUSPENSE> => (
src$: Observable<T>,
) => src$.pipe(switchMap((x, index) => suspend(project(x, index))))
): OperatorFunction<T, ObservedValueOf<O> | typeof SUSPENSE> =>
pipe(switchMap((x, index) => suspend(project(x, index))))