feat(utils): getGroupedObservable

This commit is contained in:
Josep M Sobrepere 2021-02-27 08:56:59 +01:00
parent df25aeb952
commit effba0761a
5 changed files with 88 additions and 91 deletions

View File

@ -92,53 +92,4 @@ describe("collect", () => {
})
})
})
describe("collect/get", () => {
it("returns the inner observable for the giving key", () => {
scheduler().run(({ expectObservable, cold }) => {
const toGrouped = (source: string, key: string) => {
const result = cold(source).pipe(shareReplay(1)) as GroupedObservable<
string,
string
>
result.key = key
return result
}
const a = toGrouped("-------| ", "a")
const b = toGrouped("---| ", "b")
const c = toGrouped("------| ", "c")
const d = toGrouped("------| ", "d")
const sourceStr = " (abc)d---------------------| "
const source = cold(sourceStr, { a, b, c, d }).pipe(
skip(1),
startWith(a),
)
const result = collect<string, string>()(source)
result.subscribe()
expectObservable(result.get("a")).toBe("-------|")
expectObservable(result.get("b")).toBe("---|")
expectObservable(result.get("c")).toBe("------|")
expectObservable(result.get("d")).toBe("-----------|")
})
})
it("errors when the outter stream errors", () => {
scheduler().run(({ expectObservable, cold }) => {
const sourceStr = "--#"
const source = cold(sourceStr) as any
const result = collect<string, string>()(source)
expectObservable(result.get("foo")).toBe("--#")
})
})
it("completes when the outter stream completes", () => {
scheduler().run(({ expectObservable, cold }) => {
const sourceStr = "--|"
const source = cold(sourceStr) as any
const result = collect<string, string>()(source)
expectObservable(result.get("bar")).toBe("--|")
})
})
})
})

View File

@ -1,10 +1,4 @@
import {
GroupedObservable,
Observable,
OperatorFunction,
pipe,
Subscription,
} from "rxjs"
import { GroupedObservable, Observable, pipe } from "rxjs"
import {
startWith,
endWith,
@ -17,12 +11,6 @@ import { CollectorAction, collector } from "./internal-utils"
const defaultFilter = pipe(ignoreElements(), startWith(true), endWith(false))
export type CollectedObservable<K, V> = Observable<
Map<K, GroupedObservable<K, V>>
> & {
get: (key: K) => Observable<V>
}
/**
* A pipeable operator that collects all the GroupedObservables emitted by
* the source and emits a Map with the active inner observables
@ -35,7 +23,7 @@ export const collect = <K, V>(
filter?: (source$: GroupedObservable<K, V>) => Observable<boolean>,
): ((
source$: Observable<GroupedObservable<K, V>>,
) => CollectedObservable<K, V>) => {
) => Observable<Map<K, GroupedObservable<K, V>>>) => {
const enhancer = filter
? (source$: GroupedObservable<K, V>) =>
filter(source$).pipe(
@ -45,38 +33,11 @@ export const collect = <K, V>(
)
: defaultFilter
const operator: OperatorFunction<
GroupedObservable<K, V>,
Map<K, GroupedObservable<K, V>>
> = collector((o) =>
return collector((o) =>
map((x) => ({
t: x ? (CollectorAction.Set as const) : (CollectorAction.Delete as const),
k: o.key,
v: o,
}))(enhancer(o)),
)
return (source$) => {
const result$ = operator(source$)
const get = (key: K) =>
new Observable<V>((observer) => {
let innerSub: Subscription | undefined
let outterSub: Subscription = result$.subscribe(
(n) => {
innerSub = innerSub || n.get(key)?.subscribe(observer)
},
(e) => {
observer.error(e)
},
() => {
observer.complete()
},
)
return () => {
innerSub && innerSub.unsubscribe()
outterSub.unsubscribe()
}
})
return Object.assign(result$, { get })
}
}

View File

@ -0,0 +1,57 @@
import { TestScheduler } from "rxjs/testing"
import { shareReplay, skip, startWith } from "rxjs/operators"
import { GroupedObservable } from "rxjs"
import { collect } from "./collect"
import { getGroupedObservable } from "./"
const scheduler = () =>
new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
describe("getGroupedObservable", () => {
it("returns the inner observable for the giving key", () => {
scheduler().run(({ expectObservable, cold }) => {
const toGrouped = (source: string, key: string) => {
const result = cold(source).pipe(shareReplay(1)) as GroupedObservable<
string,
string
>
result.key = key
return result
}
const a = toGrouped("-------| ", "a")
const b = toGrouped("---| ", "b")
const c = toGrouped("------| ", "c")
const d = toGrouped("------| ", "d")
const sourceStr = " (abc)d---------------------| "
const source = cold(sourceStr, { a, b, c, d }).pipe(skip(1), startWith(a))
const result = collect<string, string>()(source)
result.subscribe()
expectObservable(getGroupedObservable(result, "a")).toBe("-------|")
expectObservable(getGroupedObservable(result, "b")).toBe("---|")
expectObservable(getGroupedObservable(result, "c")).toBe("------|")
expectObservable(getGroupedObservable(result, "d")).toBe("-----------|")
})
})
it("errors when the outter stream errors", () => {
scheduler().run(({ expectObservable, cold }) => {
const sourceStr = "--#"
const source = cold(sourceStr) as any
const result = collect<string, string>()(source)
expectObservable(getGroupedObservable(result, "foo")).toBe("--#")
})
})
it("completes when the outter stream completes", () => {
scheduler().run(({ expectObservable, cold }) => {
const sourceStr = "--|"
const source = cold(sourceStr) as any
const result = collect<string, string>()(source)
expectObservable(getGroupedObservable(result, "bar")).toBe("--|")
})
})
})

View File

@ -0,0 +1,27 @@
import { GroupedObservable, Observable, Subscription } from "rxjs"
export const getGroupedObservable = <K, T>(
source$: Observable<Map<K, GroupedObservable<K, T>>>,
key: K,
) => {
const result = new Observable<T>((observer) => {
let innerSub: Subscription | undefined
let outterSub: Subscription = source$.subscribe(
(n) => {
innerSub = innerSub || n.get(key)?.subscribe(observer)
},
(e) => {
observer.error(e)
},
() => {
observer.complete()
},
)
return () => {
innerSub && innerSub.unsubscribe()
outterSub.unsubscribe()
}
}) as GroupedObservable<K, T>
result.key = key
return result
}

View File

@ -1,5 +1,6 @@
export { collectValues } from "./collectValues"
export { collect } from "./collect"
export { getGroupedObservable } from "./getGroupedObservable"
export { createListener } from "./createListener"
export { mergeWithKey } from "./mergeWithKey"
export { split } from "./split"