mirror of
https://github.com/re-rxjs/react-rxjs.git
synced 2025-12-08 18:01:51 +00:00
v0.2.0-alpha.0
This commit is contained in:
parent
f719548d94
commit
81ce35d0de
@ -1,5 +1,6 @@
|
||||
{
|
||||
"version": "0.1.4",
|
||||
"version": "0.2.0-alpha.0",
|
||||
"sideEffects": false,
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git+https://github.com/josepot/react-rxjs.git"
|
||||
|
||||
@ -1,21 +1,7 @@
|
||||
import { useEffect, useState } from "react"
|
||||
import { Observable, of, race } from "rxjs"
|
||||
import { delay, take, mapTo } from "rxjs/operators"
|
||||
import {
|
||||
StaticObservableOptions,
|
||||
defaultStaticOptions,
|
||||
} from "./connectObservable"
|
||||
import { Observable } from "rxjs"
|
||||
import distinctShareReplay from "./operators/distinct-share-replay"
|
||||
import reactOptimizations from "./operators/react-optimizations"
|
||||
|
||||
interface FactoryObservableOptions<T> extends StaticObservableOptions<T> {
|
||||
suspenseTime: number
|
||||
}
|
||||
|
||||
const defaultOptions: FactoryObservableOptions<any> = {
|
||||
...defaultStaticOptions,
|
||||
suspenseTime: 200,
|
||||
}
|
||||
import { FactoryObservableOptions, defaultFactoryOptions } from "./options"
|
||||
import useSharedReplayableObservable from "./useSharedReplayableObservable"
|
||||
|
||||
export function connectFactoryObservable<
|
||||
I,
|
||||
@ -24,14 +10,13 @@ export function connectFactoryObservable<
|
||||
>(
|
||||
getObservable: (...args: A) => Observable<O>,
|
||||
initialValue: I,
|
||||
options?: Partial<FactoryObservableOptions<O>>,
|
||||
_options?: FactoryObservableOptions<O>,
|
||||
): [(...args: A) => O | I, (...args: A) => Observable<O>] {
|
||||
const { suspenseTime, unsubscribeGraceTime, compare } = {
|
||||
...defaultOptions,
|
||||
...options,
|
||||
const options = {
|
||||
...defaultFactoryOptions,
|
||||
..._options,
|
||||
}
|
||||
|
||||
const reactEnhander = reactOptimizations(unsubscribeGraceTime)
|
||||
const cache = new Map<string, Observable<O>>()
|
||||
|
||||
const getSharedObservable$ = (...input: A): Observable<O> => {
|
||||
@ -43,7 +28,7 @@ export function connectFactoryObservable<
|
||||
}
|
||||
|
||||
const reactObservable$ = getObservable(...input).pipe(
|
||||
distinctShareReplay(compare, () => {
|
||||
distinctShareReplay(options.compare, () => {
|
||||
cache.delete(key)
|
||||
}),
|
||||
)
|
||||
@ -53,34 +38,13 @@ export function connectFactoryObservable<
|
||||
}
|
||||
|
||||
return [
|
||||
(...input: A) => {
|
||||
const [value, setValue] = useState<I | O>(initialValue)
|
||||
(...input: A) =>
|
||||
useSharedReplayableObservable(
|
||||
getSharedObservable$(...input),
|
||||
initialValue,
|
||||
options,
|
||||
),
|
||||
|
||||
useEffect(() => {
|
||||
const sharedObservable$ = getSharedObservable$(...input)
|
||||
const subscription = reactEnhander(sharedObservable$).subscribe(
|
||||
setValue,
|
||||
)
|
||||
|
||||
if (suspenseTime === 0) {
|
||||
setValue(initialValue)
|
||||
} else if (suspenseTime < Infinity) {
|
||||
subscription.add(
|
||||
race(
|
||||
of(initialValue).pipe(delay(suspenseTime)),
|
||||
sharedObservable$.pipe(
|
||||
take(1),
|
||||
mapTo((x: I | O) => x),
|
||||
),
|
||||
).subscribe(setValue),
|
||||
)
|
||||
}
|
||||
|
||||
return () => subscription.unsubscribe()
|
||||
}, input)
|
||||
|
||||
return value
|
||||
},
|
||||
getSharedObservable$,
|
||||
]
|
||||
}
|
||||
|
||||
46
src/connectGroupedObservable.ts
Normal file
46
src/connectGroupedObservable.ts
Normal file
@ -0,0 +1,46 @@
|
||||
import { Observable, GroupedObservable } from "rxjs"
|
||||
import { map, filter, take, mergeMap } from "rxjs/operators"
|
||||
import distinctShareReplay from "./operators/distinct-share-replay"
|
||||
import { FactoryObservableOptions, defaultFactoryOptions } from "./options"
|
||||
import useSharedReplayableObservable from "./useSharedReplayableObservable"
|
||||
|
||||
const connectGroupedObservable = <K, O, I>(
|
||||
source$: Observable<GroupedObservable<K, O>>,
|
||||
initialValue: I,
|
||||
_options?: FactoryObservableOptions<O>,
|
||||
): [(key: K) => I | O, (key: K) => Observable<O>] => {
|
||||
const options = {
|
||||
...defaultFactoryOptions,
|
||||
..._options,
|
||||
}
|
||||
const observables = new Map<K, Observable<O>>()
|
||||
const activeObservables$ = source$.pipe(
|
||||
map(x => {
|
||||
observables.set(x.key, x)
|
||||
return observables
|
||||
}),
|
||||
distinctShareReplay(
|
||||
() => false,
|
||||
() => observables.clear(),
|
||||
),
|
||||
)
|
||||
|
||||
const getObservableByKey = (key: K) =>
|
||||
activeObservables$.pipe(
|
||||
filter(x => x.has(key)),
|
||||
take(1),
|
||||
mergeMap(x => x.get(key)!),
|
||||
distinctShareReplay(options.compare, () => observables.delete(key)),
|
||||
)
|
||||
|
||||
const hook = (key: K) =>
|
||||
useSharedReplayableObservable(
|
||||
getObservableByKey(key),
|
||||
initialValue,
|
||||
options,
|
||||
)
|
||||
|
||||
return [hook, getObservableByKey]
|
||||
}
|
||||
|
||||
export default connectGroupedObservable
|
||||
@ -1,38 +1,23 @@
|
||||
import { useEffect, useState } from "react"
|
||||
import { Observable } from "rxjs"
|
||||
import reactOptimizations from "./operators/react-optimizations"
|
||||
import distinctShareReplay from "./operators/distinct-share-replay"
|
||||
|
||||
export interface StaticObservableOptions<T> {
|
||||
unsubscribeGraceTime: number
|
||||
compare: (a: T, b: T) => boolean
|
||||
}
|
||||
export const defaultStaticOptions: StaticObservableOptions<any> = {
|
||||
unsubscribeGraceTime: 120,
|
||||
compare: (a, b) => a === b,
|
||||
}
|
||||
import { StaticObservableOptions, defaultStaticOptions } from "./options"
|
||||
import useSharedReplayableObservable from "./useSharedReplayableObservable"
|
||||
|
||||
export function connectObservable<O, IO>(
|
||||
observable: Observable<O>,
|
||||
initialValue: IO,
|
||||
options?: Partial<StaticObservableOptions<O>>,
|
||||
_options?: StaticObservableOptions<O>,
|
||||
) {
|
||||
const { unsubscribeGraceTime, compare } = {
|
||||
const options = {
|
||||
...defaultStaticOptions,
|
||||
...options,
|
||||
..._options,
|
||||
}
|
||||
const sharedObservable$ = observable.pipe(distinctShareReplay(compare))
|
||||
const reactObservable$ = sharedObservable$.pipe(
|
||||
reactOptimizations(unsubscribeGraceTime),
|
||||
const sharedObservable$ = observable.pipe(
|
||||
distinctShareReplay(options.compare),
|
||||
)
|
||||
|
||||
const useStaticObservable = () => {
|
||||
const [value, setValue] = useState<O | IO>(initialValue)
|
||||
useEffect(() => {
|
||||
const subscription = reactObservable$.subscribe(setValue)
|
||||
return () => subscription.unsubscribe()
|
||||
}, [])
|
||||
return value
|
||||
}
|
||||
const useStaticObservable = () =>
|
||||
useSharedReplayableObservable(sharedObservable$, initialValue, options)
|
||||
|
||||
return [useStaticObservable, sharedObservable$] as const
|
||||
}
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
export { connectObservable } from "./connectObservable"
|
||||
export { connectFactoryObservable } from "./connectFactoryObservable"
|
||||
export { default as connectGroupedObservable } from "./connectGroupedObservable"
|
||||
export { default as distinctShareReplay } from "./operators/distinct-share-replay"
|
||||
export { default as useSharedReplayableObservable } from "./useSharedReplayableObservable"
|
||||
|
||||
20
src/options.ts
Normal file
20
src/options.ts
Normal file
@ -0,0 +1,20 @@
|
||||
export interface StaticObservableOptions<T> {
|
||||
unsubscribeGraceTime?: number
|
||||
compare?: (a: T, b: T) => boolean
|
||||
}
|
||||
export const defaultStaticOptions = {
|
||||
unsubscribeGraceTime: 120,
|
||||
compare: (a: any, b: any) => a === b,
|
||||
}
|
||||
|
||||
export interface FactoryObservableOptions<T>
|
||||
extends StaticObservableOptions<T> {
|
||||
suspenseTime?: number
|
||||
}
|
||||
|
||||
export const defaultFactoryOptions = {
|
||||
...defaultStaticOptions,
|
||||
suspenseTime: 200,
|
||||
}
|
||||
|
||||
export type ObservableOptions = Omit<FactoryObservableOptions<any>, "compare">
|
||||
43
src/useSharedReplayableObservable.ts
Normal file
43
src/useSharedReplayableObservable.ts
Normal file
@ -0,0 +1,43 @@
|
||||
import { useState, useEffect } from "react"
|
||||
import { Observable, of, race } from "rxjs"
|
||||
import { take, mapTo, delay } from "rxjs/operators"
|
||||
import reactOptimizations from "./operators/react-optimizations"
|
||||
import { defaultFactoryOptions, ObservableOptions } from "./options"
|
||||
|
||||
const useSharedReplayableObservable = <O, I>(
|
||||
sharedReplayableObservable$: Observable<O>,
|
||||
initialValue: I,
|
||||
options?: ObservableOptions,
|
||||
) => {
|
||||
const [value, setValue] = useState<I | O>(initialValue)
|
||||
|
||||
const { suspenseTime, unsubscribeGraceTime } = {
|
||||
...defaultFactoryOptions,
|
||||
...options,
|
||||
}
|
||||
|
||||
useEffect(() => {
|
||||
const subscription = reactOptimizations(unsubscribeGraceTime)(
|
||||
sharedReplayableObservable$,
|
||||
).subscribe(setValue)
|
||||
|
||||
if (suspenseTime === 0) {
|
||||
setValue(initialValue)
|
||||
} else if (suspenseTime < Infinity) {
|
||||
subscription.add(
|
||||
race(
|
||||
of(initialValue).pipe(delay(suspenseTime)),
|
||||
sharedReplayableObservable$.pipe(
|
||||
take(1),
|
||||
mapTo((x: I | O) => x),
|
||||
),
|
||||
).subscribe(setValue),
|
||||
)
|
||||
}
|
||||
|
||||
return () => subscription.unsubscribe()
|
||||
}, [sharedReplayableObservable$, suspenseTime, unsubscribeGraceTime])
|
||||
return value
|
||||
}
|
||||
|
||||
export default useSharedReplayableObservable
|
||||
Loading…
x
Reference in New Issue
Block a user