diff --git a/packages/utils/src/collect.test.ts b/packages/utils/src/collect.test.ts index fe408af..0798bae 100644 --- a/packages/utils/src/collect.test.ts +++ b/packages/utils/src/collect.test.ts @@ -92,53 +92,4 @@ describe("collect", () => { }) }) }) - describe("collect/get", () => { - it("returns the inner observable for the giving key", () => { - scheduler().run(({ expectObservable, cold }) => { - const toGrouped = (source: string, key: string) => { - const result = cold(source).pipe(shareReplay(1)) as GroupedObservable< - string, - string - > - result.key = key - return result - } - - const a = toGrouped("-------| ", "a") - const b = toGrouped("---| ", "b") - const c = toGrouped("------| ", "c") - const d = toGrouped("------| ", "d") - - const sourceStr = " (abc)d---------------------| " - const source = cold(sourceStr, { a, b, c, d }).pipe( - skip(1), - startWith(a), - ) - const result = collect()(source) - result.subscribe() - expectObservable(result.get("a")).toBe("-------|") - expectObservable(result.get("b")).toBe("---|") - expectObservable(result.get("c")).toBe("------|") - expectObservable(result.get("d")).toBe("-----------|") - }) - }) - - it("errors when the outter stream errors", () => { - scheduler().run(({ expectObservable, cold }) => { - const sourceStr = "--#" - const source = cold(sourceStr) as any - const result = collect()(source) - expectObservable(result.get("foo")).toBe("--#") - }) - }) - - it("completes when the outter stream completes", () => { - scheduler().run(({ expectObservable, cold }) => { - const sourceStr = "--|" - const source = cold(sourceStr) as any - const result = collect()(source) - expectObservable(result.get("bar")).toBe("--|") - }) - }) - }) }) diff --git a/packages/utils/src/collect.ts b/packages/utils/src/collect.ts index 5016811..5b56693 100644 --- a/packages/utils/src/collect.ts +++ b/packages/utils/src/collect.ts @@ -1,10 +1,4 @@ -import { - GroupedObservable, - Observable, - OperatorFunction, - pipe, - Subscription, -} from "rxjs" +import { GroupedObservable, Observable, pipe } from "rxjs" import { startWith, endWith, @@ -17,12 +11,6 @@ import { CollectorAction, collector } from "./internal-utils" const defaultFilter = pipe(ignoreElements(), startWith(true), endWith(false)) -export type CollectedObservable = Observable< - Map> -> & { - get: (key: K) => Observable -} - /** * A pipeable operator that collects all the GroupedObservables emitted by * the source and emits a Map with the active inner observables @@ -35,7 +23,7 @@ export const collect = ( filter?: (source$: GroupedObservable) => Observable, ): (( source$: Observable>, -) => CollectedObservable) => { +) => Observable>>) => { const enhancer = filter ? (source$: GroupedObservable) => filter(source$).pipe( @@ -45,38 +33,11 @@ export const collect = ( ) : defaultFilter - const operator: OperatorFunction< - GroupedObservable, - Map> - > = collector((o) => + return collector((o) => map((x) => ({ t: x ? (CollectorAction.Set as const) : (CollectorAction.Delete as const), k: o.key, v: o, }))(enhancer(o)), ) - - return (source$) => { - const result$ = operator(source$) - const get = (key: K) => - new Observable((observer) => { - let innerSub: Subscription | undefined - let outterSub: Subscription = result$.subscribe( - (n) => { - innerSub = innerSub || n.get(key)?.subscribe(observer) - }, - (e) => { - observer.error(e) - }, - () => { - observer.complete() - }, - ) - return () => { - innerSub && innerSub.unsubscribe() - outterSub.unsubscribe() - } - }) - return Object.assign(result$, { get }) - } } diff --git a/packages/utils/src/getGroupedObservable.test.ts b/packages/utils/src/getGroupedObservable.test.ts new file mode 100644 index 0000000..2842f9a --- /dev/null +++ b/packages/utils/src/getGroupedObservable.test.ts @@ -0,0 +1,57 @@ +import { TestScheduler } from "rxjs/testing" +import { shareReplay, skip, startWith } from "rxjs/operators" +import { GroupedObservable } from "rxjs" +import { collect } from "./collect" +import { getGroupedObservable } from "./" + +const scheduler = () => + new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected) + }) + +describe("getGroupedObservable", () => { + it("returns the inner observable for the giving key", () => { + scheduler().run(({ expectObservable, cold }) => { + const toGrouped = (source: string, key: string) => { + const result = cold(source).pipe(shareReplay(1)) as GroupedObservable< + string, + string + > + result.key = key + return result + } + + const a = toGrouped("-------| ", "a") + const b = toGrouped("---| ", "b") + const c = toGrouped("------| ", "c") + const d = toGrouped("------| ", "d") + + const sourceStr = " (abc)d---------------------| " + const source = cold(sourceStr, { a, b, c, d }).pipe(skip(1), startWith(a)) + const result = collect()(source) + result.subscribe() + expectObservable(getGroupedObservable(result, "a")).toBe("-------|") + expectObservable(getGroupedObservable(result, "b")).toBe("---|") + expectObservable(getGroupedObservable(result, "c")).toBe("------|") + expectObservable(getGroupedObservable(result, "d")).toBe("-----------|") + }) + }) + + it("errors when the outter stream errors", () => { + scheduler().run(({ expectObservable, cold }) => { + const sourceStr = "--#" + const source = cold(sourceStr) as any + const result = collect()(source) + expectObservable(getGroupedObservable(result, "foo")).toBe("--#") + }) + }) + + it("completes when the outter stream completes", () => { + scheduler().run(({ expectObservable, cold }) => { + const sourceStr = "--|" + const source = cold(sourceStr) as any + const result = collect()(source) + expectObservable(getGroupedObservable(result, "bar")).toBe("--|") + }) + }) +}) diff --git a/packages/utils/src/getGroupedObservable.ts b/packages/utils/src/getGroupedObservable.ts new file mode 100644 index 0000000..aa01a83 --- /dev/null +++ b/packages/utils/src/getGroupedObservable.ts @@ -0,0 +1,27 @@ +import { GroupedObservable, Observable, Subscription } from "rxjs" + +export const getGroupedObservable = ( + source$: Observable>>, + key: K, +) => { + const result = new Observable((observer) => { + let innerSub: Subscription | undefined + let outterSub: Subscription = source$.subscribe( + (n) => { + innerSub = innerSub || n.get(key)?.subscribe(observer) + }, + (e) => { + observer.error(e) + }, + () => { + observer.complete() + }, + ) + return () => { + innerSub && innerSub.unsubscribe() + outterSub.unsubscribe() + } + }) as GroupedObservable + result.key = key + return result +} diff --git a/packages/utils/src/index.tsx b/packages/utils/src/index.tsx index 2da0f63..d39b8ff 100644 --- a/packages/utils/src/index.tsx +++ b/packages/utils/src/index.tsx @@ -1,5 +1,6 @@ export { collectValues } from "./collectValues" export { collect } from "./collect" +export { getGroupedObservable } from "./getGroupedObservable" export { createListener } from "./createListener" export { mergeWithKey } from "./mergeWithKey" export { split } from "./split"