Skip to content

Commit

Permalink
breaking(core): avoid continuable promise in store api (#2695)
Browse files Browse the repository at this point in the history
* add failing test for #2682

* wip: continuable promise in useAtomValue

* wip: abortable promise

* wip fix store test

* wip: fix dependency test

* wip: fix continuable promise

* fix unwrap test

* fix loadable tes

* fix with attachPromiseMeta

* refactor

* eliminate abort controller for promise

* small refactor

* refactor

* minor refactor

* improvement: cancel handler receives next value
  • Loading branch information
dai-shi authored Sep 20, 2024
1 parent b415826 commit e777f46
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 259 deletions.
88 changes: 75 additions & 13 deletions src/react/useAtomValue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ type Store = ReturnType<typeof useStore>
const isPromiseLike = (x: unknown): x is PromiseLike<unknown> =>
typeof (x as any)?.then === 'function'

const attachPromiseMeta = <T>(
promise: PromiseLike<T> & {
status?: 'pending' | 'fulfilled' | 'rejected'
value?: T
reason?: unknown
},
) => {
promise.status = 'pending'
promise.then(
(v) => {
promise.status = 'fulfilled'
promise.value = v
},
(e) => {
promise.status = 'rejected'
promise.reason = e
},
)
}

const use =
ReactExports.use ||
(<T>(
Expand All @@ -26,21 +46,56 @@ const use =
} else if (promise.status === 'rejected') {
throw promise.reason
} else {
promise.status = 'pending'
promise.then(
(v) => {
promise.status = 'fulfilled'
promise.value = v
},
(e) => {
promise.status = 'rejected'
promise.reason = e
},
)
attachPromiseMeta(promise)
throw promise
}
})

const continuablePromiseMap = new WeakMap<
PromiseLike<unknown>,
Promise<unknown>
>()

const createContinuablePromise = <T>(promise: PromiseLike<T>) => {
let continuablePromise = continuablePromiseMap.get(promise)
if (!continuablePromise) {
continuablePromise = new Promise<T>((resolve, reject) => {
let curr = promise
const onFulfilled = (me: PromiseLike<T>) => (v: T) => {
if (curr === me) {
resolve(v)
}
}
const onRejected = (me: PromiseLike<T>) => (e: unknown) => {
if (curr === me) {
reject(e)
}
}
const registerCancelHandler = (p: PromiseLike<T>) => {
if ('onCancel' in p && typeof p.onCancel === 'function') {
p.onCancel((nextValue: PromiseLike<T> | T) => {
if (import.meta.env?.MODE !== 'production' && nextValue === p) {
throw new Error('[Bug] p is not updated even after cancelation')
}
if (isPromiseLike(nextValue)) {
continuablePromiseMap.set(nextValue, continuablePromise!)
curr = nextValue
nextValue.then(onFulfilled(nextValue), onRejected(nextValue))
registerCancelHandler(nextValue)
} else {
resolve(nextValue)
}
})
}
}
promise.then(onFulfilled(promise), onRejected(promise))
registerCancelHandler(promise)
})
continuablePromiseMap.set(promise, continuablePromise)
}
return continuablePromise
}

type Options = Parameters<typeof useStore>[0] & {
delay?: number
}
Expand Down Expand Up @@ -88,6 +143,10 @@ export function useAtomValue<Value>(atom: Atom<Value>, options?: Options) {
useEffect(() => {
const unsub = store.sub(atom, () => {
if (typeof delay === 'number') {
const value = store.get(atom)
if (isPromiseLike(value)) {
attachPromiseMeta(createContinuablePromise(value))
}
// delay rerendering to wait a promise possibly to resolve
setTimeout(rerender, delay)
return
Expand All @@ -99,8 +158,11 @@ export function useAtomValue<Value>(atom: Atom<Value>, options?: Options) {
}, [store, atom, delay])

useDebugValue(value)
// TS doesn't allow using `use` always.
// The use of isPromiseLike is to be consistent with `use` type.
// `instanceof Promise` actually works fine in this case.
return isPromiseLike(value) ? use(value) : (value as Awaited<Value>)
if (isPromiseLike(value)) {
const promise = createContinuablePromise(value)
return use(promise)
}
return value as Awaited<Value>
}
196 changes: 58 additions & 138 deletions src/vanilla/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,92 +20,46 @@ const isActuallyWritableAtom = (atom: AnyAtom): atom is AnyWritableAtom =>
!!(atom as AnyWritableAtom).write

//
// Continuable Promise
// Cancelable Promise
//

const CONTINUE_PROMISE = Symbol(
import.meta.env?.MODE !== 'production' ? 'CONTINUE_PROMISE' : '',
)

const PENDING = 'pending'
const FULFILLED = 'fulfilled'
const REJECTED = 'rejected'

type ContinuePromise<T> = (
nextPromise: PromiseLike<T> | undefined,
nextAbort: () => void,
) => void

type ContinuablePromise<T> = Promise<T> &
(
| { status: typeof PENDING }
| { status: typeof FULFILLED; value?: T }
| { status: typeof REJECTED; reason?: AnyError }
) & {
[CONTINUE_PROMISE]: ContinuePromise<T>
}
type CancelHandler = (nextValue: unknown) => void
type PromiseState = [cancelHandlers: Set<CancelHandler>, settled: boolean]

const isContinuablePromise = (
promise: unknown,
): promise is ContinuablePromise<AnyValue> =>
typeof promise === 'object' && promise !== null && CONTINUE_PROMISE in promise
const cancelablePromiseMap = new WeakMap<PromiseLike<unknown>, PromiseState>()

const continuablePromiseMap: WeakMap<
PromiseLike<AnyValue>,
ContinuablePromise<AnyValue>
> = new WeakMap()
const isPendingPromise = (value: unknown): value is PromiseLike<unknown> =>
isPromiseLike(value) && !cancelablePromiseMap.get(value)?.[1]

/**
* Create a continuable promise from a regular promise.
*/
const createContinuablePromise = <T>(
promise: PromiseLike<T>,
abort: () => void,
complete: () => void,
): ContinuablePromise<T> => {
if (!continuablePromiseMap.has(promise)) {
let continuePromise: ContinuePromise<T>
const p: any = new Promise((resolve, reject) => {
let curr = promise
const onFulfilled = (me: PromiseLike<T>) => (v: T) => {
if (curr === me) {
p.status = FULFILLED
p.value = v
resolve(v)
complete()
}
}
const onRejected = (me: PromiseLike<T>) => (e: AnyError) => {
if (curr === me) {
p.status = REJECTED
p.reason = e
reject(e)
complete()
}
}
promise.then(onFulfilled(promise), onRejected(promise))
continuePromise = (nextPromise, nextAbort) => {
if (nextPromise) {
continuablePromiseMap.set(nextPromise, p)
curr = nextPromise
nextPromise.then(onFulfilled(nextPromise), onRejected(nextPromise))

// Only abort promises that aren't user-facing. When nextPromise is set,
// we can replace the current promise with the next one, so we don't
// see any abort-related errors.
abort()
abort = nextAbort
}
}
})
p.status = PENDING
p[CONTINUE_PROMISE] = continuePromise!
continuablePromiseMap.set(promise, p)
const cancelPromise = <T>(promise: PromiseLike<T>, nextValue: unknown) => {
const promiseState = cancelablePromiseMap.get(promise)
if (promiseState) {
promiseState[1] = true
promiseState[0].forEach((fn) => fn(nextValue))
} else if (import.meta.env?.MODE !== 'production') {
throw new Error('[Bug] cancelable promise not found')
}
return continuablePromiseMap.get(promise) as ContinuablePromise<T>
}

const isPromiseLike = (x: unknown): x is PromiseLike<unknown> =>
const patchPromiseForCancelability = <T>(promise: PromiseLike<T>) => {
if (cancelablePromiseMap.has(promise)) {
// already patched
return
}
const promiseState: PromiseState = [new Set(), false]
cancelablePromiseMap.set(promise, promiseState)
const settle = () => {
promiseState[1] = true
}
promise.then(settle, settle)
;(promise as { onCancel?: (fn: CancelHandler) => void }).onCancel = (fn) => {
promiseState[0].add(fn)
}
}

const isPromiseLike = (
x: unknown,
): x is PromiseLike<unknown> & { onCancel?: (fn: CancelHandler) => void } =>
typeof (x as any)?.then === 'function'

/**
Expand Down Expand Up @@ -165,17 +119,9 @@ const returnAtomValue = <Value>(atomState: AtomState<Value>): Value => {
return atomState.v!
}

const getPendingContinuablePromise = (atomState: AtomState) => {
const value: unknown = atomState.v
if (isContinuablePromise(value) && value.status === PENDING) {
return value
}
return null
}

const addPendingContinuablePromiseToDependency = (
const addPendingPromiseToDependency = (
atom: AnyAtom,
promise: ContinuablePromise<AnyValue> & { status: typeof PENDING },
promise: PromiseLike<AnyValue>,
dependencyAtomState: AtomState,
) => {
if (!dependencyAtomState.p.has(atom)) {
Expand All @@ -202,9 +148,8 @@ const addDependency = <Value>(
throw new Error('[Bug] atom cannot depend on itself')
}
atomState.d.set(a, aState.n)
const continuablePromise = getPendingContinuablePromise(atomState)
if (continuablePromise) {
addPendingContinuablePromiseToDependency(atom, continuablePromise, aState)
if (isPendingPromise(atomState.v)) {
addPendingPromiseToDependency(atom, atomState.v, aState)
}
aState.m?.t.add(atom)
if (pending) {
Expand Down Expand Up @@ -312,48 +257,30 @@ const buildStore = (getAtomState: StoreArgs[0]): Store => {
atom: AnyAtom,
atomState: AtomState,
valueOrPromise: unknown,
abortPromise = () => {},
completePromise = () => {},
) => {
const hasPrevValue = 'v' in atomState
const prevValue = atomState.v
const pendingPromise = getPendingContinuablePromise(atomState)
const pendingPromise = isPendingPromise(atomState.v) ? atomState.v : null
if (isPromiseLike(valueOrPromise)) {
if (pendingPromise) {
if (pendingPromise !== valueOrPromise) {
pendingPromise[CONTINUE_PROMISE](valueOrPromise, abortPromise)
++atomState.n
}
} else {
const continuablePromise = createContinuablePromise(
patchPromiseForCancelability(valueOrPromise)
for (const a of atomState.d.keys()) {
addPendingPromiseToDependency(
atom,
valueOrPromise,
abortPromise,
completePromise,
getAtomState(a, atomState),
)
if (continuablePromise.status === PENDING) {
for (const a of atomState.d.keys()) {
addPendingContinuablePromiseToDependency(
atom,
continuablePromise,
getAtomState(a, atomState),
)
}
}
atomState.v = continuablePromise
delete atomState.e
}
atomState.v = valueOrPromise
delete atomState.e
} else {
if (pendingPromise) {
pendingPromise[CONTINUE_PROMISE](
Promise.resolve(valueOrPromise),
abortPromise,
)
}
atomState.v = valueOrPromise
delete atomState.e
}
if (!hasPrevValue || !Object.is(prevValue, atomState.v)) {
++atomState.n
if (pendingPromise) {
cancelPromise(pendingPromise, valueOrPromise)
}
}
}

Expand Down Expand Up @@ -448,19 +375,18 @@ const buildStore = (getAtomState: StoreArgs[0]): Store => {
}
try {
const valueOrPromise = atom.read(getter, options as never)
setAtomStateValueOrPromise(
atom,
atomState,
valueOrPromise,
() => controller?.abort(),
() => {
setAtomStateValueOrPromise(atom, atomState, valueOrPromise)
if (isPromiseLike(valueOrPromise)) {
valueOrPromise.onCancel?.(() => controller?.abort())
const complete = () => {
if (atomState.m) {
const pending = createPending()
mountDependencies(pending, atom, atomState)
flushPending(pending)
}
},
)
}
valueOrPromise.then(complete, complete)
}
return atomState
} catch (error) {
delete atomState.v
Expand All @@ -484,10 +410,10 @@ const buildStore = (getAtomState: StoreArgs[0]): Store => {
for (const a of atomState.m?.t || []) {
dependents.set(a, getAtomState(a, atomState))
}
for (const atomWithPendingContinuablePromise of atomState.p) {
for (const atomWithPendingPromise of atomState.p) {
dependents.set(
atomWithPendingContinuablePromise,
getAtomState(atomWithPendingContinuablePromise, atomState),
atomWithPendingPromise,
getAtomState(atomWithPendingPromise, atomState),
)
}
getPendingDependents(pending, atom)?.forEach((dependent) => {
Expand Down Expand Up @@ -609,7 +535,7 @@ const buildStore = (getAtomState: StoreArgs[0]): Store => {
atom: AnyAtom,
atomState: AtomState,
) => {
if (atomState.m && !getPendingContinuablePromise(atomState)) {
if (atomState.m && !isPendingPromise(atomState.v)) {
for (const a of atomState.d.keys()) {
if (!atomState.m.d.has(a)) {
const aMounted = mountAtom(pending, a, getAtomState(a, atomState))
Expand Down Expand Up @@ -691,12 +617,6 @@ const buildStore = (getAtomState: StoreArgs[0]): Store => {
const aMounted = unmountAtom(pending, a, getAtomState(a, atomState))
aMounted?.t.delete(atom)
}
// abort pending promise
const pendingPromise = getPendingContinuablePromise(atomState)
if (pendingPromise) {
// FIXME using `undefined` is kind of a hack.
pendingPromise[CONTINUE_PROMISE](undefined, () => {})
}
return undefined
}
return atomState.m
Expand Down
Loading

0 comments on commit e777f46

Please sign in to comment.