Fix: distinctShareReplay shouldn't skip values on sync subscriptions

This commit is contained in:
Josep M Sobrepere 2020-06-23 02:47:24 +02:00
parent aec21425df
commit 207f71ebf9
3 changed files with 24 additions and 9 deletions

View File

@ -16,9 +16,11 @@ export const distinctShareReplay = <T>(
const result = new Observable<T>(subscriber => {
refCount++
let innerSub: Subscription
if (!subject) {
currentValue = { value: EMPTY_VALUE }
subject = new Subject<T>()
innerSub = subject.subscribe(subscriber)
subscription = source$.subscribe({
next(value) {
if (
@ -37,13 +39,13 @@ export const distinctShareReplay = <T>(
subject!.complete()
},
})
} else {
innerSub = subject.subscribe(subscriber)
if (currentValue.value !== EMPTY_VALUE) {
subscriber.next(currentValue.value)
}
}
if (currentValue.value !== EMPTY_VALUE) {
subscriber.next(currentValue.value)
}
const innerSub = subject.subscribe(subscriber)
return () => {
refCount--
innerSub.unsubscribe()

View File

@ -103,7 +103,7 @@ describe("connectFactoryObservable", () => {
nUpdates += 1
})
expect(latestValue1).toBe(4)
expect(nUpdates).toBe(1)
expect(nUpdates).toBe(4)
let latestValue2: number = 0
const sub2 = getShared(0).subscribe(x => {
@ -111,7 +111,7 @@ describe("connectFactoryObservable", () => {
nUpdates += 1
})
expect(latestValue2).toBe(4)
expect(nUpdates).toBe(2)
expect(nUpdates).toBe(5)
sub1.unsubscribe()
sub2.unsubscribe()
@ -122,7 +122,7 @@ describe("connectFactoryObservable", () => {
nUpdates += 1
})
expect(latestValue3).toBe(5)
expect(nUpdates).toBe(3)
expect(nUpdates).toBe(9)
sub3.unsubscribe()
})
})

View File

@ -3,7 +3,7 @@ import { BehaviorObservable } from "../../src/BehaviorObservable"
import { EMPTY_VALUE } from "../../src/operators/distinct-share-replay"
import { cold } from "jest-marbles"
import { TestScheduler } from "rxjs/testing"
import { Subject } from "rxjs"
import { Subject, from } from "rxjs"
const scheduler = () =>
new TestScheduler((actual, expected) => {
@ -77,6 +77,19 @@ describe("operators/distinctShareReplay", () => {
})
})
// prettier-ignore
it("should not skip values on a sync source", () => {
scheduler().run(({ expectObservable }) => {
const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)")
const sub1 = '^';
const expected1 = " (abcd|)"
const shared = source.pipe(distinctShareReplay());
expectObservable(shared, sub1).toBe(expected1);
})
})
describe("Returns a BehaviorObservable which exposes a getValue function", () => {
it("getValue returns the latest emitted value", () => {
const input = new Subject<string>()