polish error behaviour on partitionByKey, complete tests

This commit is contained in:
Víctor Oliva 2021-05-31 14:24:00 +02:00 committed by Josep M Sobrepere
parent 9e33084aab
commit 59576a11dc
2 changed files with 278 additions and 132 deletions

View File

@ -1,4 +1,4 @@
import { concat, NEVER, Observable, of, Subject } from "rxjs"
import { concat, from, NEVER, Observable, of, Subject } from "rxjs"
import { catchError, switchMap, take } from "rxjs/operators"
import { TestScheduler } from "rxjs/testing"
import { partitionByKey } from "./"
@ -9,6 +9,193 @@ const scheduler = () =>
})
describe("partitionByKey", () => {
describe("behaviour", () => {
it("groups observables by using the key function", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-12-3456-")
const expectOdd = " -1--3-5--"
const expectEven = " --2--4-6-"
const [getInstance$] = partitionByKey(
source,
(v) => Number(v) % 2,
(v$) => v$,
)
expectObservable(getInstance$(0)).toBe(expectEven)
expectObservable(getInstance$(1)).toBe(expectOdd)
})
})
it("unsubscribes from all streams when refcount reaches 0", () => {
let innerSubs = 0
const inner = new Observable<number>(() => {
innerSubs++
return () => {
innerSubs--
}
})
const sourceSubject = new Subject<number>()
let sourceSubs = 0
const source = new Observable<number>((obs) => {
sourceSubs++
sourceSubject.subscribe(obs)
return () => {
sourceSubs--
}
})
const [getObs] = partitionByKey(
source,
(v) => v,
() => inner,
)
const observable = getObs(1)
expect(sourceSubs).toBe(0)
expect(innerSubs).toBe(0)
const sub1 = observable.subscribe()
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(0)
sourceSubject.next(1)
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(1)
const sub2 = observable.subscribe()
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(1)
sub1.unsubscribe()
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(1)
sub2.unsubscribe()
expect(sourceSubs).toBe(0)
expect(innerSubs).toBe(0)
})
it("emits a complete on the inner observable when the source completes", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab-a-|")
const expectA = " -a--a-(c|)"
const expectB = " --b---(c|)"
const [getInstance$] = partitionByKey(
source,
(v) => v,
(v$) => concat(v$, ["c"]),
)
expectObservable(getInstance$("a")).toBe(expectA)
expectObservable(getInstance$("b")).toBe(expectB)
})
})
it("emits the error on the inner observable when the source errors", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab-a-#")
const expectA = " -a--a-(e|)"
const expectB = " --b---(e|)"
const [getInstance$] = partitionByKey(
source,
(v) => v,
(v$) => v$.pipe(catchError(() => of("e"))),
)
expectObservable(getInstance$("a")).toBe(expectA)
expectObservable(getInstance$("b")).toBe(expectB)
})
})
it("handles an empty Observable", () => {
scheduler().run(({ expectSubscriptions, expectObservable, cold }) => {
const e1 = cold(" |")
const e1subs = " (^!)"
const expectObs = "|"
const expectKey = "(x|)"
const [getObs, keys$] = partitionByKey(
e1,
(v) => v,
(v$) => v$,
)
expectObservable(getObs("")).toBe(expectObs)
expectSubscriptions(e1.subscriptions).toBe(e1subs)
expectObservable(keys$).toBe(expectKey, { x: [] })
})
})
it("handles a never Observable", () => {
scheduler().run(({ expectSubscriptions, expectObservable, cold }) => {
const e1 = cold(" --")
const e1subs = " ^-"
const expectObs = "--"
const expectKey = "x-"
const [getObs, keys$] = partitionByKey(
e1,
(v) => v,
(v$) => v$,
)
expectObservable(getObs("")).toBe(expectObs)
expectSubscriptions(e1.subscriptions).toBe(e1subs)
expectObservable(keys$).toBe(expectKey, { x: [] })
})
})
it("handles a just-throw Observable", () => {
scheduler().run(({ expectSubscriptions, expectObservable, cold }) => {
const e1 = cold(" #")
const e1subs = " (^!)"
const expectObs = "#"
const expectKey = "(x#)"
const [getObs, keys$] = partitionByKey(
e1,
(v) => v,
(v$) => v$,
)
expectObservable(getObs("")).toBe(expectObs)
expectSubscriptions(e1.subscriptions).toBe(e1subs)
expectObservable(keys$).toBe(expectKey, { x: [] })
})
})
it("handles synchronous values", () => {
scheduler().run(({ expectObservable }) => {
const e1 = from(["1", "2", "3", "4", "5"])
const expectOdd = " (135|)"
const expectEven = "(24|)"
const expectKeys = "(wxyz|)"
const [getObs, keys$] = partitionByKey(
e1,
(v) => Number(v) % 2,
(v$) => v$,
)
expectObservable(keys$).toBe(expectKeys, {
w: [1],
x: [1, 0],
y: [0],
z: [],
})
expectObservable(getObs(0)).toBe(expectEven)
expectObservable(getObs(1)).toBe(expectOdd)
})
})
})
describe("activeKeys$", () => {
it("emits a list with all the active keys", () => {
scheduler().run(({ expectObservable, cold }) => {
@ -30,23 +217,6 @@ describe("partitionByKey", () => {
})
})
it("emits all the synchronous groups in a single emission", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = concat(of("a", "b"), cold("--c--"))
const expectedStr = " g-h--"
const [, result] = partitionByKey(
source,
(v) => v,
() => NEVER,
)
expectObservable(result).toBe(expectedStr, {
g: ["a", "b"],
h: ["a", "b", "c"],
})
})
})
it("removes a key from the list when its inner stream completes", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab---c--")
@ -130,21 +300,71 @@ describe("partitionByKey", () => {
})
})
it("errors when the source emits an error", () => {
it("errors when the source emits an error and no group is active", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab--#")
const a = cold(" --1---2")
const a = cold(" --1|")
const b = cold(" -|")
const expectedStr = "efghi#"
const innerStreams: Record<string, Observable<string>> = { a, b }
const [, result] = partitionByKey(
source,
(v) => v,
(v$) =>
v$.pipe(
take(1),
switchMap((v) => innerStreams[v]),
),
)
expectObservable(result).toBe(expectedStr, {
e: [],
f: ["a"],
g: ["a", "b"],
h: ["a"],
i: [],
})
})
})
it("doesn't error when the source errors and its inner streams stop the error", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab--#")
const a = cold(" --1--2--3|")
const b = cold(" ----|")
const expectedStr = "efg---h---(i|)"
const innerStreams: Record<string, Observable<string>> = { a, b }
const [, result] = partitionByKey(
source,
(v) => v,
(v$) =>
v$.pipe(
take(1),
switchMap((v) => innerStreams[v]),
),
)
expectObservable(result).toBe(expectedStr, {
e: [],
f: ["a"],
g: ["a", "b"],
h: ["a"],
i: [],
})
})
})
it("errors when one of its inner stream emits an error", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab-----")
const a = cold(" --1-#")
const b = cold(" ------")
const expectedStr = "efg--#"
const innerStreams: Record<string, Observable<string>> = { a, b }
const [, result] = partitionByKey(
source,
(v) => v,
(v$) =>
v$.pipe(
take(1),
switchMap((v) => innerStreams[v]),
),
(_, v) => innerStreams[v],
)
expectObservable(result).toBe(expectedStr, {
@ -154,32 +374,6 @@ describe("partitionByKey", () => {
})
})
})
it("removes a key when its inner stream emits an error", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab-----")
const a = cold(" --1-#")
const b = cold(" ------")
const expectedStr = "efg--h"
const innerStreams: Record<string, Observable<string>> = { a, b }
const [, result] = partitionByKey(
source,
(v) => v,
(v$) =>
v$.pipe(
take(1),
switchMap((v) => innerStreams[v]),
),
)
expectObservable(result).toBe(expectedStr, {
e: [],
f: ["a"],
g: ["a", "b"],
h: ["b"],
})
})
})
})
describe("getInstance$", () => {
@ -235,71 +429,17 @@ describe("partitionByKey", () => {
expect(lateNext).toHaveBeenCalledWith(1)
})
it("unsubscribes from all streams when refcount reaches 0", () => {
let innerSubs = 0
const inner = new Observable<number>(() => {
innerSubs++
return () => {
innerSubs--
}
})
const sourceSubject = new Subject<number>()
let sourceSubs = 0
const source = new Observable<number>((obs) => {
sourceSubs++
sourceSubject.subscribe(obs)
return () => {
sourceSubs--
}
})
const [getObs] = partitionByKey(
source,
(v) => v,
() => inner,
)
const observable = getObs(1)
expect(sourceSubs).toBe(0)
expect(innerSubs).toBe(0)
const sub1 = observable.subscribe()
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(0)
sourceSubject.next(1)
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(1)
const sub2 = observable.subscribe()
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(1)
sub1.unsubscribe()
expect(sourceSubs).toBe(1)
expect(innerSubs).toBe(1)
sub2.unsubscribe()
expect(sourceSubs).toBe(0)
expect(innerSubs).toBe(0)
})
it("emits a complete on the inner observable when the source completes", () => {
it("lets the projection function handle completions", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab-a-|")
const expectA = " -a--a-(c|)"
const expectB = " --b---(c|)"
const concatenated = cold("123|")
const expectA = " -a--a-123|"
const expectB = " --b---123|"
const [getInstance$] = partitionByKey(
source,
(v) => v,
(v$) => concat(v$, ["c"]),
(v$) => concat(v$, concatenated),
)
expectObservable(getInstance$("a")).toBe(expectA)
@ -307,12 +447,11 @@ describe("partitionByKey", () => {
})
})
// Do we want this behaviour?
it("emits an error when the source errors", () => {
it("lets the projection function catch source errors", () => {
scheduler().run(({ expectObservable, cold }) => {
const source = cold("-ab-a-#")
const expectA = " -a--a-#"
const expectB = " --b---#"
const expectA = " -a--a-(e|)"
const expectB = " --b---(e|)"
const [getInstance$] = partitionByKey(
source,

View File

@ -29,7 +29,7 @@ export function partitionByKey<T, K, R>(
(subscriber) => {
const groups: Map<K, InnerGroup<T, K, R>> = new Map()
let warmup = true
let emitted = false
let sourceCompleted = false
const sub = stream.subscribe(
(x) => {
@ -43,7 +43,7 @@ export function partitionByKey<T, K, R>(
const res = streamSelector(subject, key).pipe(
shareLatest(),
) as GroupedObservable<K, R>
res.key = key
;(res as any).key = key
const innerGroup: InnerGroup<T, K, R> = {
source: subject,
@ -52,35 +52,42 @@ export function partitionByKey<T, K, R>(
}
groups.set(key, innerGroup)
const onFinish = () => {
groups.delete(key)
subscriber.next(mapGroups(groups))
innerGroup.subscription = res.subscribe(
noop,
(e) => subscriber.error(e),
() => {
groups.delete(key)
subscriber.next(mapGroups(groups))
if (groups.size === 0 && sourceCompleted) {
subscriber.complete()
}
}
innerGroup.subscription = res.subscribe(noop, onFinish, onFinish)
if (groups.size === 0 && sourceCompleted) {
subscriber.complete()
}
},
)
subject.next(x)
if (!warmup) {
subscriber.next(mapGroups(groups))
}
subscriber.next(mapGroups(groups))
emitted = true
},
(e) => {
subscriber.error(e)
sourceCompleted = true
if (groups.size) {
groups.forEach((g) => g.source.error(e))
} else {
subscriber.error(e)
}
},
() => {
sourceCompleted = true
if (groups.size === 0) {
if (groups.size) {
groups.forEach((g) => g.source.complete())
} else {
subscriber.complete()
}
groups.forEach((g) => g.source.complete())
},
)
warmup = false
subscriber.next(mapGroups(groups))
if (!emitted) subscriber.next(mapGroups(groups))
return () => {
sub.unsubscribe()