import { Observable, ReplaySubject, Subscription } from "rxjs" import { distinctUntilChanged } from "rxjs/operators" export interface ReactObservable extends Observable { getCurrentValue: () => O | IO } const reactOperator = ( source$: Observable, initialValue: I, gracePeriod: number, compare: (a: T | I, b: T) => boolean, teardown?: () => void, ): ReactObservable => { let subject: ReplaySubject | undefined let subscription: Subscription | undefined let timeoutToken: NodeJS.Timeout | undefined = undefined let refCount = 0 let hasError = false let currentValue: T | I = initialValue const observable$ = new Observable(subscriber => { if (timeoutToken !== undefined) { clearTimeout(timeoutToken) } refCount++ if (!subject || hasError) { hasError = false subject = new ReplaySubject(1) subscription = distinctUntilChanged(compare)(source$).subscribe({ next(value) { currentValue = value subject!.next(value) }, error(err) { hasError = true subject!.error(err) }, complete() { subscription = undefined subject!.complete() }, }) } const innerSub = subject.subscribe(subscriber) const cleanup = () => { timeoutToken = undefined currentValue = initialValue teardown?.() if (subscription) { subscription.unsubscribe() subscription = undefined } subject = undefined } return () => { refCount-- innerSub.unsubscribe() if (refCount === 0) { if (gracePeriod > 0) { timeoutToken = setTimeout(cleanup, gracePeriod) } else { cleanup() } } } }) const result = observable$ as ReactObservable result.getCurrentValue = () => currentValue return result } export default reactOperator