diff --git a/packages/utils/src/collect.ts b/packages/utils/src/collect.ts index 5179e67..9ac3bdb 100644 --- a/packages/utils/src/collect.ts +++ b/packages/utils/src/collect.ts @@ -35,14 +35,11 @@ export const collect = ( ) : defaultFilter - return (source$: Observable>) => - collector(source$, (o) => - map((x) => ({ - t: x - ? (CollectorAction.Set as const) - : (CollectorAction.Delete as const), - k: o.key, - v: o, - }))(enhancer(o)), - ) + return collector((o) => + map((x) => ({ + t: x ? (CollectorAction.Set as const) : (CollectorAction.Delete as const), + k: o.key, + v: o, + }))(enhancer(o)), + ) } diff --git a/packages/utils/src/collectValues.ts b/packages/utils/src/collectValues.ts index 120fbc0..7972354 100644 --- a/packages/utils/src/collectValues.ts +++ b/packages/utils/src/collectValues.ts @@ -1,4 +1,4 @@ -import { Observable, GroupedObservable, OperatorFunction } from "rxjs" +import { GroupedObservable, OperatorFunction } from "rxjs" import { map, endWith } from "rxjs/operators" import { CollectorAction, collector } from "./internal-utils" @@ -9,8 +9,8 @@ import { CollectorAction, collector } from "./internal-utils" export const collectValues = (): OperatorFunction< GroupedObservable, Map -> => (source$: Observable>): Observable> => - collector(source$, (inner$) => +> => + collector((inner$) => inner$.pipe( map((v) => ({ t: CollectorAction.Set as const, k: inner$.key, v })), endWith({ t: CollectorAction.Delete, k: inner$.key }), diff --git a/packages/utils/src/internal-utils.ts b/packages/utils/src/internal-utils.ts index a5d8404..9b16e1a 100644 --- a/packages/utils/src/internal-utils.ts +++ b/packages/utils/src/internal-utils.ts @@ -1,4 +1,4 @@ -import { Observable, defer, GroupedObservable } from "rxjs" +import { Observable, defer, GroupedObservable, pipe } from "rxjs" import { shareLatest } from "@react-rxjs/core" import { scan, @@ -44,15 +44,14 @@ export const enum CollectorAction { } export const collector = ( - source: Observable>, enhancer: ( source: GroupedObservable, ) => Observable< | { t: CollectorAction.Delete; k: K } | { t: CollectorAction.Set; k: K; v: VV } >, -): Observable> => - source.pipe( +): ((source: Observable>) => Observable>) => + pipe( publish((x) => x.pipe(mergeMap(enhancer), takeUntil(takeLast(1)(x)))), endWith({ t: CollectorAction.Complete as const }), scanWithDefaultValue( diff --git a/packages/utils/src/suspend.ts b/packages/utils/src/suspend.ts index 7a2a74c..b9bd718 100644 --- a/packages/utils/src/suspend.ts +++ b/packages/utils/src/suspend.ts @@ -1,4 +1,4 @@ -import { ObservableInput, from } from "rxjs" +import { ObservableInput, from, Observable } from "rxjs" import { startWith } from "rxjs/operators" import { SUSPENSE } from "@react-rxjs/core" @@ -7,5 +7,7 @@ import { SUSPENSE } from "@react-rxjs/core" * * @param source$ Source observable */ -export const suspend = (source$: ObservableInput) => - from(source$).pipe(startWith(SUSPENSE)) +export const suspend: ( + source$: ObservableInput, +) => Observable = (source$: ObservableInput) => + startWith(SUSPENSE)(from(source$)) as any diff --git a/packages/utils/src/switchMapSuspended.ts b/packages/utils/src/switchMapSuspended.ts index 1888a1d..4f6b458 100644 --- a/packages/utils/src/switchMapSuspended.ts +++ b/packages/utils/src/switchMapSuspended.ts @@ -1,9 +1,4 @@ -import { - ObservableInput, - Observable, - OperatorFunction, - ObservedValueOf, -} from "rxjs" +import { ObservableInput, OperatorFunction, ObservedValueOf, pipe } from "rxjs" import { switchMap } from "rxjs/operators" import { suspend } from "./suspend" import { SUSPENSE } from "@react-rxjs/core" @@ -16,6 +11,5 @@ import { SUSPENSE } from "@react-rxjs/core" */ export const switchMapSuspended = >( project: (value: T, index: number) => O, -): OperatorFunction | typeof SUSPENSE> => ( - src$: Observable, -) => src$.pipe(switchMap((x, index) => suspend(project(x, index)))) +): OperatorFunction | typeof SUSPENSE> => + pipe(switchMap((x, index) => suspend(project(x, index))))