mirror of
https://github.com/re-rxjs/react-rxjs.git
synced 2025-12-08 18:01:51 +00:00
wip
This commit is contained in:
parent
4511e4440c
commit
0c7b337d54
@ -14,6 +14,7 @@ export function trackParentChanges<T, K extends Record<string, any>, R>(
|
||||
onAdded: (key: K, isInitial: boolean) => R
|
||||
onActive: (key: K, storage: Storage<R>) => void
|
||||
onReset: (key: K, storage: Storage<R>) => void
|
||||
onAfterChange?: (key: K, storage: Storage<R>) => void
|
||||
onRemoved: (key: K, storage: Storage<R>) => void
|
||||
},
|
||||
): Subscription {
|
||||
@ -47,6 +48,8 @@ export function trackParentChanges<T, K extends Record<string, any>, R>(
|
||||
tracker.onActive(change.key, getStorage(change.key))
|
||||
} else if (change.type === "reset") {
|
||||
tracker.onReset(change.key, getStorage(change.key))
|
||||
} else if (change.type === "postchange") {
|
||||
tracker.onAfterChange?.(change.key, getStorage(change.key))
|
||||
} else if (change.type === "removed") {
|
||||
const storage = getStorage(change.key)
|
||||
tracker.onRemoved(change.key, storage)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { BehaviorSubject, Observable, Subscription, filter } from "rxjs"
|
||||
import { Observable, Subject, Subscription, filter, startWith } from "rxjs"
|
||||
import type { KeysBaseType } from "../types"
|
||||
import { EMPTY_VALUE } from "./empty-value"
|
||||
import { StatePromise, createDeferredPromise } from "./promises"
|
||||
@ -18,7 +18,9 @@ export function createInstance<T, K extends KeysBaseType>(
|
||||
observable: Observable<T>,
|
||||
onAfterChange: () => void,
|
||||
): Instance<T, K> {
|
||||
let subject = new BehaviorSubject<T | EMPTY_VALUE>(EMPTY_VALUE)
|
||||
let currentValue: T | EMPTY_VALUE = EMPTY_VALUE
|
||||
let previousContextValue: T | EMPTY_VALUE = currentValue
|
||||
let subject = new Subject<T | EMPTY_VALUE>()
|
||||
|
||||
// TODO firehose
|
||||
let deferred = createDeferredPromise<T>()
|
||||
@ -30,10 +32,32 @@ export function createInstance<T, K extends KeysBaseType>(
|
||||
let error = EMPTY_VALUE
|
||||
|
||||
let subscription: Subscription | null = null
|
||||
const restart = () => {
|
||||
subscription?.unsubscribe()
|
||||
const start = () => {
|
||||
let isSynchronous = true
|
||||
let emitted = false
|
||||
if (subscription) {
|
||||
const err = new Error("Instance already active")
|
||||
console.error(err)
|
||||
throw err
|
||||
}
|
||||
subscription = observable.subscribe({
|
||||
next: (v) => {
|
||||
currentValue = v
|
||||
|
||||
// TODO equality function
|
||||
if (
|
||||
isSynchronous &&
|
||||
!emitted &&
|
||||
previousContextValue !== EMPTY_VALUE &&
|
||||
!Object.is(previousContextValue, v)
|
||||
) {
|
||||
previousContextValue = EMPTY_VALUE
|
||||
const oldSubject = subject
|
||||
subject = new Subject<T | EMPTY_VALUE>()
|
||||
oldSubject.complete()
|
||||
}
|
||||
emitted = true
|
||||
|
||||
deferred.res(v)
|
||||
subject.next(v)
|
||||
onAfterChange()
|
||||
@ -52,75 +76,69 @@ export function createInstance<T, K extends KeysBaseType>(
|
||||
// }
|
||||
},
|
||||
})
|
||||
isSynchronous = false
|
||||
|
||||
if (!emitted && previousContextValue !== EMPTY_VALUE) {
|
||||
previousContextValue = EMPTY_VALUE
|
||||
const oldSubject = subject
|
||||
subject = new Subject<T | EMPTY_VALUE>()
|
||||
oldSubject.complete()
|
||||
}
|
||||
}
|
||||
|
||||
const instance: Instance<T, K> = {
|
||||
key,
|
||||
activate() {
|
||||
// TODO just call it activate
|
||||
if (subscription) {
|
||||
throw new Error("Instance already active")
|
||||
return
|
||||
}
|
||||
|
||||
restart()
|
||||
start()
|
||||
},
|
||||
kill() {
|
||||
subscription?.unsubscribe()
|
||||
subscription = null
|
||||
subject.complete()
|
||||
if (subject.getValue() === EMPTY_VALUE) {
|
||||
if (currentValue === EMPTY_VALUE) {
|
||||
deferred.rej("TODO What kind of error? Test doesn't say")
|
||||
}
|
||||
},
|
||||
reset() {
|
||||
// TODO how to reset without activating straight away with this flow?
|
||||
if (error !== EMPTY_VALUE || subject.getValue() !== EMPTY_VALUE) {
|
||||
// If the new subscription returns the same value synchronously, do not complete the previous result.
|
||||
// TODO the child nodes should also reset... are they resetting?
|
||||
error = EMPTY_VALUE
|
||||
error = EMPTY_VALUE
|
||||
if (currentValue !== EMPTY_VALUE) {
|
||||
deferred = createDeferredPromise()
|
||||
subscription?.unsubscribe()
|
||||
let isSynchronous = true
|
||||
let passed = false
|
||||
subscription = observable.subscribe({
|
||||
next: (v) => {
|
||||
deferred.res(v)
|
||||
// TODO equality function
|
||||
if (isSynchronous && !Object.is(subject.getValue(), v)) {
|
||||
const oldSubject = subject
|
||||
subject = new BehaviorSubject<T | EMPTY_VALUE>(EMPTY_VALUE)
|
||||
oldSubject.complete()
|
||||
}
|
||||
passed = true
|
||||
subject.next(v)
|
||||
},
|
||||
error: (e) => {
|
||||
deferred.rej(e)
|
||||
error = e
|
||||
subject.error(e)
|
||||
},
|
||||
})
|
||||
isSynchronous = false
|
||||
|
||||
if (!passed) {
|
||||
const oldSubject = subject
|
||||
subject = new BehaviorSubject<T | EMPTY_VALUE>(EMPTY_VALUE)
|
||||
oldSubject.complete()
|
||||
}
|
||||
}
|
||||
restart()
|
||||
previousContextValue = currentValue
|
||||
currentValue = EMPTY_VALUE
|
||||
subscription?.unsubscribe()
|
||||
subscription = null
|
||||
},
|
||||
getValue() {
|
||||
if (error !== EMPTY_VALUE) {
|
||||
throw error
|
||||
}
|
||||
|
||||
const value = subject.getValue()
|
||||
if (value === EMPTY_VALUE) {
|
||||
if (currentValue === EMPTY_VALUE) {
|
||||
return deferred.promise
|
||||
}
|
||||
return value
|
||||
return currentValue
|
||||
},
|
||||
getState$() {
|
||||
return subject.pipe(filter((v) => v !== EMPTY_VALUE)) as Observable<T>
|
||||
// In case someone tries to grab the state while we're switching context
|
||||
// we can't return the current subject because that might complete straight away
|
||||
// after the context switch, so we just swap it now.
|
||||
if (previousContextValue !== EMPTY_VALUE) {
|
||||
previousContextValue = EMPTY_VALUE
|
||||
const oldSubject = subject
|
||||
subject = new Subject<T | EMPTY_VALUE>()
|
||||
oldSubject.complete()
|
||||
}
|
||||
|
||||
return subject.pipe(
|
||||
startWith(currentValue),
|
||||
filter((v) => v !== EMPTY_VALUE),
|
||||
) as Observable<T>
|
||||
},
|
||||
}
|
||||
return instance
|
||||
|
||||
@ -301,7 +301,7 @@ describe("subState", () => {
|
||||
expect(nodeA.getValue()).toBe("b-a")
|
||||
})
|
||||
|
||||
it.only("can reference its siblings after a change", () => {
|
||||
it("can reference its siblings after a change", () => {
|
||||
const root = createRoot()
|
||||
const source$ = new Subject<string>()
|
||||
const subNode = substate(root, () => source$)
|
||||
|
||||
@ -40,9 +40,6 @@ export const substate = <T, K extends KeysBaseType>(
|
||||
.subscribe({
|
||||
next: () => {
|
||||
stateNode.resetInstance(instanceKey)
|
||||
// TODO shouldn't re-activation of instances happen after all subscribers have restarted? how to do it?
|
||||
// Yes. I would need a special kind of observable so that I can first reset the instances without activating them
|
||||
// and then synchronously activate them
|
||||
},
|
||||
error: () => {
|
||||
// TODO
|
||||
@ -62,6 +59,10 @@ export const substate = <T, K extends KeysBaseType>(
|
||||
},
|
||||
onReset(key) {
|
||||
stateNode.resetInstance(key)
|
||||
stateNode.activateInstance(key)
|
||||
},
|
||||
onAfterChange(key) {
|
||||
stateNode.activateInstance(key)
|
||||
},
|
||||
onRemoved(key, storage) {
|
||||
stateNode.removeInstance(key)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user