mirror of
https://github.com/re-rxjs/react-rxjs.git
synced 2025-12-08 18:01:51 +00:00
fix: nodes can reference its siblings irregardless of definition order (#284)
* fix: nodes can reference its siblings irregardless of definition order * fix: avoid `forEach` when there can be re-entrant instances * fix: avoid possible mem-leak on delayed observable
This commit is contained in:
parent
0e48b72d30
commit
48c6a38d47
@ -13,12 +13,17 @@ export function createRoot<KeyValue, KeyName extends string>(
|
||||
export function createRoot<KeyValue = never, KeyName extends string = "">(
|
||||
keyName?: KeyName,
|
||||
): RootNode<KeyValue, KeyName> {
|
||||
const flushQueue = new Map<any, Array<() => void>>()
|
||||
const childRunners = new Array<RunFn>()
|
||||
|
||||
const runChildren: RunFn = (key, isActive) => {
|
||||
childRunners.forEach((cb) => {
|
||||
cb(key, isActive, true)
|
||||
})
|
||||
const [rootKey] = key
|
||||
const waiters: Array<() => void> = []
|
||||
flushQueue.set(rootKey, waiters)
|
||||
for (let i = 0; i < childRunners.length; i++)
|
||||
childRunners[i](key, isActive, true)
|
||||
flushQueue.delete(rootKey)
|
||||
for (let i = 0; i < waiters.length; i++) waiters[i]()
|
||||
}
|
||||
|
||||
const result: RootNode<KeyValue, KeyName> = {
|
||||
@ -40,6 +45,7 @@ export function createRoot<KeyValue = never, KeyName extends string = "">(
|
||||
run: runChildren,
|
||||
parents: [],
|
||||
childRunners,
|
||||
isRunning: ([key]) => flushQueue.get(key) ?? false,
|
||||
isActive: () => true,
|
||||
keysOrder: keyName ? [keyName] : [],
|
||||
public: result as any,
|
||||
|
||||
@ -25,6 +25,7 @@ export const createSignal = <T, K extends StringRecord<any>>(
|
||||
})
|
||||
|
||||
return {
|
||||
parent,
|
||||
getSignal$(keyObj: K = {} as K) {
|
||||
const sortedKey = parentInternals.keysOrder.map((key) => keyObj[key])
|
||||
const instance = instances.get(sortedKey)
|
||||
|
||||
@ -8,7 +8,7 @@ import {
|
||||
StatePromise,
|
||||
DeferredPromise,
|
||||
createDeferredPromise,
|
||||
RunFn,
|
||||
getInternals,
|
||||
addInternals,
|
||||
} from "./"
|
||||
import { NestedMap } from "./nested-map"
|
||||
@ -34,22 +34,22 @@ export const recursiveError = (
|
||||
return undefined
|
||||
}
|
||||
|
||||
interface Instance<T> {
|
||||
subject: ReplaySubject<T>
|
||||
onFlushQueue?: Array<() => void>
|
||||
subscription: Subscription | null
|
||||
currentValue: EMPTY_VALUE | T
|
||||
isParentLoaded: boolean
|
||||
promise: DeferredPromise<T> | null
|
||||
error: null | { e: any }
|
||||
}
|
||||
|
||||
export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
keysOrder: string[],
|
||||
getState$: CtxFn<T, K>,
|
||||
equalityFn: (a: T, b: T) => boolean = Object.is,
|
||||
): InternalStateNode<T, K> => {
|
||||
const instances = new NestedMap<
|
||||
any,
|
||||
{
|
||||
subject: ReplaySubject<T>
|
||||
subscription: Subscription | null
|
||||
currentValue: EMPTY_VALUE | T
|
||||
isParentLoaded: boolean
|
||||
promise: DeferredPromise<T> | null
|
||||
error: null | { e: any }
|
||||
}
|
||||
>()
|
||||
const instances = new NestedMap<any, Instance<T>>()
|
||||
|
||||
const privateNode = {
|
||||
keysOrder,
|
||||
@ -95,10 +95,32 @@ export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
return instance.error?.e ?? true
|
||||
}
|
||||
|
||||
const runChildren: RunFn = (...args) => {
|
||||
privateNode.childRunners.forEach((cb) => {
|
||||
cb(...args)
|
||||
})
|
||||
privateNode.isRunning = (key: any[]) => {
|
||||
const instance = instances.get(key)
|
||||
if (instance?.onFlushQueue) return instance.onFlushQueue!
|
||||
if (Array.isArray(privateNode.parents)) {
|
||||
for (let i = 0; i < privateNode.parents.length; i++) {
|
||||
const result = privateNode.parents[i].isRunning(key)
|
||||
if (result) return result
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
return privateNode.parents.isRunning(key)
|
||||
}
|
||||
}
|
||||
|
||||
const runChildren: (
|
||||
instance: Instance<T>,
|
||||
key: any[],
|
||||
isActive: boolean,
|
||||
isParentLoaded?: boolean,
|
||||
) => void = (instance, ...args) => {
|
||||
const waiters: Array<() => void> = []
|
||||
instance.onFlushQueue = waiters
|
||||
for (let i = 0; i < privateNode.childRunners.length; i++)
|
||||
privateNode.childRunners[i](...args)
|
||||
delete instance.onFlushQueue
|
||||
for (let i = 0; i < waiters.length; i++) waiters[i]()
|
||||
}
|
||||
|
||||
privateNode.run = (
|
||||
@ -113,7 +135,7 @@ export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
|
||||
instance.subscription?.unsubscribe()
|
||||
|
||||
runChildren(key, false)
|
||||
runChildren(instance, key, false)
|
||||
instance.promise?.rej(inactiveContext())
|
||||
instance.subject.complete()
|
||||
return
|
||||
@ -155,11 +177,63 @@ export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
const ctxObservable = <V, CK extends StringRecord<any>>(
|
||||
node: StateNode<V, CK> | Signal<V, CK>,
|
||||
partialKey: Omit<CK, keyof K>,
|
||||
): Observable<V> =>
|
||||
("getSignal$" in node ? node.getSignal$ : node.getState$)({
|
||||
): Observable<V> => {
|
||||
const internalNode = getInternals(
|
||||
"getSignal$" in node ? node.parent : node,
|
||||
)
|
||||
|
||||
const keyObj = {
|
||||
...objKey,
|
||||
...partialKey,
|
||||
} as CK)
|
||||
} as CK
|
||||
const onFlushQueue = internalNode.isRunning(
|
||||
keysOrder.map((key) => keyObj[key]),
|
||||
)
|
||||
|
||||
if (!onFlushQueue) {
|
||||
return ("getSignal$" in node ? node.getSignal$ : node.getState$)(
|
||||
keyObj,
|
||||
)
|
||||
}
|
||||
|
||||
let observable: any = EMPTY_VALUE
|
||||
onFlushQueue.push(() => {
|
||||
try {
|
||||
observable = (
|
||||
"getSignal$" in node ? node.getSignal$ : node.getState$
|
||||
)(keyObj)
|
||||
} catch (e) {
|
||||
observable = e
|
||||
}
|
||||
})
|
||||
|
||||
return new Observable((observer) => {
|
||||
if (observable !== EMPTY_VALUE) {
|
||||
if (observable instanceof Observable) {
|
||||
return observable.subscribe(observer)
|
||||
} else {
|
||||
observer.error(observable)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
let isActive = true
|
||||
let subscription: Subscription | null = null
|
||||
onFlushQueue.push(() => {
|
||||
if (!isActive) return
|
||||
if (observable instanceof Observable) {
|
||||
subscription = observable.subscribe(observer)
|
||||
} else {
|
||||
observer.error(observable)
|
||||
}
|
||||
})
|
||||
|
||||
return () => {
|
||||
isActive = false
|
||||
subscription?.unsubscribe()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const onError = (err: any) => {
|
||||
const prevPromise = actualInstance.promise
|
||||
@ -170,7 +244,7 @@ export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
|
||||
actualInstance.currentValue = EMPTY_VALUE
|
||||
|
||||
runChildren(key, false)
|
||||
runChildren(actualInstance, key, false)
|
||||
prevPromise?.rej(err)
|
||||
actualInstance.subject.error(err)
|
||||
}
|
||||
@ -194,7 +268,7 @@ export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
actualInstance.promise = null
|
||||
if (prevValue === EMPTY_VALUE || !equalityFn(prevValue, value)) {
|
||||
prevPromise?.res(value)
|
||||
runChildren(key, true, true)
|
||||
runChildren(actualInstance, key, true, true)
|
||||
actualInstance.subject!.next(value)
|
||||
}
|
||||
},
|
||||
@ -217,7 +291,7 @@ export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
prevSubect = actualInstance.subject
|
||||
actualInstance.subject = new ReplaySubject<T>(1)
|
||||
}
|
||||
runChildren(key, true, false)
|
||||
runChildren(actualInstance, key, true, false)
|
||||
prevSubect?.complete()
|
||||
}
|
||||
|
||||
@ -245,7 +319,7 @@ export const detachedNode = <T, K extends StringRecord<any>>(
|
||||
}
|
||||
instances.set(key, instance)
|
||||
}
|
||||
runChildren(key, true, false)
|
||||
runChildren(instance, key, true, false)
|
||||
prevSubect?.complete()
|
||||
}
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ export interface RunFn {
|
||||
export interface InternalStateNode<T, K extends StringRecord<any>> {
|
||||
run: RunFn
|
||||
parents: InternalStateNode<any, any> | Array<InternalStateNode<any, any>>
|
||||
isRunning: (key: any[]) => false | Array<() => void>
|
||||
childRunners: Array<RunFn>
|
||||
isActive: (key: any[]) => boolean | Error
|
||||
keysOrder: Array<string>
|
||||
|
||||
@ -12,6 +12,8 @@ export class NestedMap<K extends any[], V extends Object> {
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
current = current.get(keys[i])
|
||||
if (!current) return undefined
|
||||
// a child instance could be checking for a parent instance with its (longer) key
|
||||
if (!(current instanceof Map)) return current
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { routeState } from "./route-state"
|
||||
import { EMPTY, NEVER, Observable, of, Subject, throwError } from "rxjs"
|
||||
import { EMPTY, map, NEVER, Observable, of, Subject, throwError } from "rxjs"
|
||||
import { createRoot } from "./create-root"
|
||||
import { substate } from "./substate"
|
||||
|
||||
@ -272,6 +272,19 @@ describe("subState", () => {
|
||||
|
||||
await expect(promise).rejects.toBe(error)
|
||||
})
|
||||
|
||||
it("can reference its siblings", () => {
|
||||
const root = createRoot()
|
||||
const nodeA = substate(root, (_, getState$) =>
|
||||
getState$(nodeB, {}).pipe(map((v) => v + "-a")),
|
||||
)
|
||||
const nodeB = substate(root, () => of("b"))
|
||||
|
||||
root.run()
|
||||
|
||||
expect(nodeB.getValue()).toBe("b")
|
||||
expect(nodeA.getValue()).toBe("b-a")
|
||||
})
|
||||
})
|
||||
|
||||
describe("state$", () => {
|
||||
|
||||
@ -14,6 +14,7 @@ export interface StateNode<T, K extends StringRecord<any>> {
|
||||
|
||||
export interface Signal<T, K extends StringRecord<any>> {
|
||||
push: {} extends K ? (value: T) => void : (key: K, value: T) => void
|
||||
parent: StateNode<any, K>
|
||||
getSignal$: {} extends K ? () => Observable<T> : (key: K) => Observable<T>
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user