diff --git a/package-lock.json b/package-lock.json index cf7fb77..eef813a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,9 @@ "name": "@amadeus-it-group/tansu", "version": "1.0.0", "license": "MIT", + "dependencies": { + "signal-polyfill": "divdavem/signal-polyfill#ef64ffc15daa7553615b4cf8d136a54540423863" + }, "devDependencies": { "@angular/common": "^18.0.1", "@angular/compiler": "^18.0.1", @@ -5341,6 +5344,11 @@ "url": "https://github.com/sponsors/isaacs" } }, + "node_modules/signal-polyfill": { + "version": "0.1.1", + "resolved": "git+ssh://git@github.com/divdavem/signal-polyfill.git#ef64ffc15daa7553615b4cf8d136a54540423863", + "integrity": "sha512-EC4owVvU7c9csxUCL99G7D4Gf33OQNaxzHEbSu7VE+ULuif2JKD2zFc+cBLwBONsfEf0eZ6Yh8rvKAJhXvfdbA==" + }, "node_modules/sirv": { "version": "2.0.4", "resolved": "https://registry.npmjs.org/sirv/-/sirv-2.0.4.tgz", diff --git a/package.json b/package.json index 1b27efe..726e86e 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,9 @@ "url": "https://github.com/AmadeusITGroup/tansu/issues" }, "homepage": "https://github.com/AmadeusITGroup/tansu#readme", + "dependencies": { + "signal-polyfill": "divdavem/signal-polyfill#ef64ffc15daa7553615b4cf8d136a54540423863" + }, "private": true, "devDependencies": { "@angular/common": "^18.0.1", diff --git a/rollup.config.mjs b/rollup.config.mjs index ab99715..37ce3ac 100644 --- a/rollup.config.mjs +++ b/rollup.config.mjs @@ -53,4 +53,5 @@ export default defineConfig({ }, }, ], + external: ['signal-polyfill'], }); diff --git a/src/index.spec.ts b/src/index.spec.ts index 833ec75..cd48646 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -444,34 +444,6 @@ describe('stores', () => { unsubscribe(); }); - it('should not call again listeners when only resuming subscribers', () => { - class BasicStore extends Store { - public override pauseSubscribers(): void { - super.pauseSubscribers(); - } - public override resumeSubscribers(): void { - super.resumeSubscribers(); - } - public override set(value: object): void { - super.set(value); - } - } - const initialValue = {}; - const newValue = {}; - const store = new BasicStore(initialValue); - const calls: object[] = []; - const unsubscribe = store.subscribe((v) => calls.push(v)); - expect(calls.length).toBe(1); - expect(calls[0]).toBe(initialValue); - store.pauseSubscribers(); - store.resumeSubscribers(); - expect(calls.length).toBe(1); - store.set(newValue); - expect(calls.length).toBe(2); - expect(calls[1]).toBe(newValue); - unsubscribe(); - }); - it('asReadable should be compatible with rxjs (BehaviorSubject)', () => { const behaviorSubject = new BehaviorSubject(0); const store = asReadable(behaviorSubject); @@ -1601,7 +1573,7 @@ describe('stores', () => { wrongDerivedStore.subscribe(() => { reachedSubscriber = true; }); - }).toThrowError('reached maximum number of store changes in one shot'); + }).toThrowError('Could not stabilize the computation.'); expect(reachedSubscriber).toBe(false); }); @@ -1620,7 +1592,7 @@ describe('stores', () => { expect(values).toEqual([0]); expect(() => { store.set(-1); - }).toThrowError('reached maximum number of store changes in one shot'); + }).toThrowError('Could not stabilize the computation.'); unsubscribe(); expect(values).toEqual([0]); }); @@ -2986,7 +2958,7 @@ describe('stores', () => { const values: number[] = []; expect(() => { myValue.subscribe((value) => values.push(value)); - }).toThrowError('recursive computed'); + }).toThrowError('Detected cycle in computations.'); expect(values).toEqual([]); }); @@ -2998,7 +2970,7 @@ describe('stores', () => { expect(values).toEqual([0]); expect(() => { recursive.set(true); - }).toThrowError('recursive computed'); + }).toThrowError('Detected cycle in computations.'); }); it('should throw when changing a value from computed would result in an infinite loop (on subscribe)', () => { @@ -3013,7 +2985,7 @@ describe('stores', () => { wrongComputed.subscribe(() => { reachedSubscriber = true; }); - }).toThrowError('reached maximum number of store changes in one shot'); + }).toThrowError('Could not stabilize the computation.'); expect(reachedSubscriber).toBe(false); }); @@ -3032,7 +3004,7 @@ describe('stores', () => { }); expect(() => { store.set(11); - }).toThrowError('reached maximum number of store changes in one shot'); + }).toThrowError('Could not stabilize the computation.'); expect(values).toEqual([0]); }); diff --git a/src/index.ts b/src/index.ts index e76bbf8..4a52455 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,8 @@ declare global { } } +import { Signal } from 'signal-polyfill'; + /** * Symbol used in {@link InteropObservable} allowing any object to expose an observable. */ @@ -61,9 +63,9 @@ export interface SubscriberObject { } interface PrivateSubscriberObject extends Omit, typeof oldSubscription> { - _value: T; - _valueIndex: number; + _new: boolean; _paused: boolean; + _changeChecker: () => { value: T; changed: boolean }; } /** @@ -114,17 +116,28 @@ export interface InteropObservable { [Symbol.observable]: () => SubscribableStore; } +/** + * Represents a store whose value can be retrieved with a get function that tracks its usage as specified in the Signal proposal specification. + */ +export interface SignalStore { + get(): T; +} + /** * Valid types that can be considered as a store. */ -export type StoreInput = SubscribableStore | InteropObservable; +export type StoreInput = + | (() => T) + | SignalStore + | SubscribableStore + | InteropObservable; /** * This interface augments the base {@link SubscribableStore} interface by requiring the return value of the subscribe method to be both a function and an object with the `unsubscribe` method. * * For {@link https://rxjs.dev/api/index/interface/InteropObservable | interoperability with rxjs}, it also implements the `[Symbol.observable]` method. */ -export interface Readable extends SubscribableStore, InteropObservable { +export interface Readable extends SignalStore, SubscribableStore, InteropObservable { subscribe(subscriber: Subscriber): UnsubscribeFunction & UnsubscribeObject; [Symbol.observable](): Readable; } @@ -187,13 +200,30 @@ const bind = (object: T | null | undefined, fnName: keyof T) => { return typeof fn === 'function' ? fn.bind(object) : noop; }; -const toSubscriberObject = (subscriber: Subscriber): PrivateSubscriberObject => ({ +const changeChecker = (store: SignalStore): PrivateSubscriberObject['_changeChecker'] => { + let changed = false; + let value: T = undefined as T; + const computedSignal = new Signal.Computed(() => { + changed = true; + value = store.get(); + }); + return () => { + changed = false; + computedSignal.get(); + return { changed, value }; + }; +}; + +const toSubscriberObject = ( + subscriber: Subscriber, + _changeChecker: PrivateSubscriberObject['_changeChecker'] +): PrivateSubscriberObject => ({ next: typeof subscriber === 'function' ? subscriber.bind(null) : bind(subscriber, 'next'), pause: bind(subscriber, 'pause'), resume: bind(subscriber, 'resume'), - _value: undefined as any, - _valueIndex: 0, + _new: true, _paused: false, + _changeChecker, }); const returnThis = function (this: T): T { @@ -215,7 +245,6 @@ const normalizeUnsubscribe = ( return res; }; -const normalizedSubscribe = new WeakSet['subscribe']>(); const normalizeSubscribe = (store: SubscribableStore): Readable['subscribe'] => { let res: Readable['subscribe'] = store.subscribe as any; if (!normalizedSubscribe.has(res)) { @@ -225,17 +254,6 @@ const normalizeSubscribe = (store: SubscribableStore): Readable['subscr return res; }; -const getNormalizedSubscribe = (input: StoreInput) => { - const store = 'subscribe' in input ? input : input[symbolObservable](); - return normalizeSubscribe(store); -}; - -const getValue = (subscribe: Readable['subscribe']): T => { - let value: T; - subscribe((v) => (value = v))(); - return value!; -}; - /** * Returns a wrapper (for the given store) which only exposes the {@link ReadableSignal} interface. * This converts any {@link StoreInput} to a {@link ReadableSignal} and exposes the store as read-only. @@ -260,12 +278,12 @@ export function asReadable( store: StoreInput, extraProp?: U ): ReadableSignal & Omit> { - const subscribe = getNormalizedSubscribe(store); - const res = Object.assign(() => get(res), extraProp, { - subscribe, + const storeGetter = toSignalFn(store); + return Object.assign(() => storeGetter(), extraProp, { + get: () => storeGetter(), + subscribe: toSubscribe(store), [symbolObservable]: returnThis, }); - return res; } const defaultUpdate: any = function (this: Writable, updater: Updater) { @@ -323,9 +341,221 @@ export function asWritable( } const triggerUpdate = Symbol(); -const queueProcess = Symbol(); -let willProcessQueue = false; -const queue = new Set<{ [queueProcess](): void }>(); + +let willProcessQueueSync = false; +let willProcessQueueAsync = false; +let asyncStoresToNotifyQueue = new Set<() => void>(); +let syncStoresToNotifyQueue = new Set<() => void>(); + +const planNotifyStore = (storeNotifier: () => void) => { + if (willProcessQueueSync) { + syncStoresToNotifyQueue.add(storeNotifier); + } else { + asyncStoresToNotifyQueue.add(storeNotifier); + if (!willProcessQueueAsync) { + willProcessQueueAsync = true; + (async () => { + await 0; + batch(() => { + willProcessQueueAsync = false; + syncStoresToNotifyQueue = asyncStoresToNotifyQueue; + asyncStoresToNotifyQueue = new Set(); + }); + })(); + } + } +}; +const unplanNotifyStore = (storeNotifier: () => void) => { + asyncStoresToNotifyQueue.delete(storeNotifier); + syncStoresToNotifyQueue.delete(storeNotifier); +}; + +const returnFalse = () => false; +const returnTrue = () => true; +const normalizedSubscribe = new WeakSet(); +const createSubscribeFromSignal = (store: Signal.State | Signal.Computed) => { + let changeNotification = false; + const watcher = new Signal.subtle.Watcher(() => { + changeNotification = true; + unplanNotifyStore(storeNotifier); + planNotifyStore(storeNotifier); + for (const subscriber of [...subscribers]) { + // FIXME: skip new subscriber? + pauseSubscriber(subscriber); + } + watcher.watch(); // re-enable watch + }); + const subscribers = new Set>(); + const storeNotifier = () => { + changeNotification = false; + for (const subscriber of [...subscribers]) { + if (changeNotification) { + // the value of the store can change while notifying subscribers + // in that case, let's just stop notifying subscribers + // they will be called later through another notifier call + // with the correct final value and in the right order + return; + } + if (!subscriber._new) { + notifySubscriber(subscriber); + } + } + }; + const pauseSubscriber = (subscriber: PrivateSubscriberObject) => { + if (!subscriber._paused && !subscriber._new) { + subscriber._paused = true; + subscriber.pause(); + } + }; + const notifySubscriber = (subscriber: PrivateSubscriberObject) => + untrack(() => { + const { changed, value } = subscriber._changeChecker(); + const wasPaused = subscriber._paused; + subscriber._paused = false; + subscriber._new = false; + if (changed) { + // the value really changed + subscriber.next(value); + } else if (wasPaused) { + // the value did not change + subscriber.resume(); + } + }); + + const oldSubscriptionsMap = new WeakMap['_changeChecker']>(); + + const res = (subscriber: Subscriber): UnsubscribeFunction & UnsubscribeObject => { + const oldSubscriptionRef = subscriber?.[oldSubscription]; + const oldSubscriptionChecker = oldSubscriptionRef + ? oldSubscriptionsMap.get(oldSubscriptionRef) + : null; + const subscriberObject = toSubscriberObject( + subscriber, + oldSubscriptionChecker ?? changeChecker(store) + ); + subscribers.add(subscriberObject); + if (subscribers.size === 1) { + watcher.watch(store); + } + notifySubscriber(subscriberObject); + const unsubscribe = () => { + const removed = subscribers.delete(subscriberObject); + subscriberObject.next = noop; + subscriberObject.pause = noop; + subscriberObject.resume = noop; + if (removed) { + oldSubscriptionsMap.set(unsubscribe, subscriberObject._changeChecker); + if (subscribers.size === 0) { + watcher.unwatch(store); + unplanNotifyStore(storeNotifier); + } + } + }; + (unsubscribe as any)[triggerUpdate] = () => notifySubscriber(subscriberObject); + unsubscribe.unsubscribe = unsubscribe; + return unsubscribe; + }; + + normalizedSubscribe.add(res); + return res; +}; + +const onUseLogic = (onUseFn: () => Unsubscriber | void) => { + let counter = 0; + let cleanupFn: UnsubscribeFunction | null; + const startUse = () => { + if (++counter === 1) { + cleanupFn = untrack(() => normalizeUnsubscribe(onUseFn())); + } + }; + const endUse = () => { + if (--counter === 0) { + const fn = cleanupFn; + cleanupFn = null; + untrack(() => fn?.()); + } + }; + const wrapCall = (fn: () => T): T => { + try { + startUse(); + return fn(); + } finally { + endUse(); + } + }; + return { startUse, endUse, wrapCall }; +}; + +class ListenerState extends Signal.State { + #wrapCall: (fn: () => T) => T; + + constructor(store: SubscribableStore) { + const subscriber: SubscriberFunction & Partial> = (value: T) => + batch(() => super.set(value)); + + subscriber.next = subscriber; + + let unsubscribe: any; + const { startUse, endUse, wrapCall } = onUseLogic(() => { + unsubscribe = normalizeUnsubscribe(store.subscribe(subscriber)); + return () => { + unsubscribe(); + (subscriber as any)[oldSubscription] = unsubscribe; + }; + }); + super(undefined as T, { + equals: returnFalse, + [Signal.subtle.watched]: startUse, + [Signal.subtle.unwatched]: endUse, + }); + this.#wrapCall = wrapCall; + } + + override get(): T { + return this.#wrapCall(() => super.get()); + } + override set() { + throw new Error('Read-only!'); + } +} + +const createSignalFromSubscribableStore = (store: SubscribableStore) => { + const signal = new ListenerState(store); + return () => signal.get(); +}; + +const toSignalFnCache = new Map, () => any>(); +const toSignalFn = (store: StoreInput): (() => T) => { + if (typeof store === 'function') { + return () => store(); + } else if ('get' in store) { + return () => store.get(); + } + const subscribable = 'subscribe' in store ? store : store[symbolObservable](); + let res = toSignalFnCache.get(subscribable); + if (!res) { + res = createSignalFromSubscribableStore(subscribable); + toSignalFnCache.set(subscribable, res); + } + return res; +}; + +const toSubscribeCache = new Map(); +const toSubscribe = (store: StoreInput): Readable['subscribe'] => { + if ('subscribe' in store) { + return normalizeSubscribe(store); + } else if (symbolObservable in store) { + return normalizeSubscribe(store[symbolObservable]()); + } + let res = toSubscribeCache.get(store); + if (!res) { + const storeFn = toSignalFn(store); + // FIXME: check if it is ok to create a computed here!! + res = createSubscribeFromSignal(new Signal.Computed(() => storeFn(), { equals: returnFalse })); + toSubscribeCache.set(store, res); + } + return res; +}; const MAX_STORE_PROCESSING_IN_QUEUE = 1000; const checkIterations = (iterations: number) => { @@ -375,34 +605,33 @@ const checkIterations = (iterations: number) => { * ``` */ export const batch = (fn: () => T): T => { - const needsProcessQueue = !willProcessQueue; + const needsProcessQueue = !willProcessQueueSync; if (needsProcessQueue) { - willProcessQueue = true; + willProcessQueueSync = true; } try { return fn(); } finally { if (needsProcessQueue) { try { - const storePasses = new Map<{ [queueProcess](): void }, number>(); - for (const store of queue) { - const storeCount = storePasses.get(store) ?? 0; + const storePasses = new Map<() => void, number>(); + for (const storeNotifier of syncStoresToNotifyQueue) { + const storeCount = storePasses.get(storeNotifier) ?? 0; checkIterations(storeCount); - storePasses.set(store, storeCount + 1); - queue.delete(store); - store[queueProcess](); + storePasses.set(storeNotifier, storeCount + 1); + syncStoresToNotifyQueue.delete(storeNotifier); + // FIXME: should we catch exceptions and run remaining storeNotifiers? + storeNotifier(); } } finally { - queue.clear(); - willProcessQueue = false; + // FIXME: should we log an error if there are remaining items in the queue? + syncStoresToNotifyQueue.clear(); + willProcessQueueSync = false; } } } }; -const defaultReactiveContext = (store: StoreInput) => getValue(getNormalizedSubscribe(store)); -let reactiveContext = defaultReactiveContext; - /** * A utility function to get the current value from a given store. * It works by subscribing to a store, capturing the value (synchronously) and unsubscribing just after. @@ -415,16 +644,9 @@ let reactiveContext = defaultReactiveContext; * console.log(get(myStore)); // logs 1 * ``` */ -export const get = (store: StoreInput): T => reactiveContext(store); - -const createEqualCache = (valueIndex: number): Record => ({ - [valueIndex]: true, // the subscriber already has the last value - [valueIndex - 1]: false, // the subscriber had the previous value, - // which is known to be different because equal is called in the set method - 0: false, // the subscriber never received any value -}); +export const get = (store: StoreInput): T => toSignalFn(store)(); -const skipEqualInSet = Symbol(); +const skipEqual = Symbol(); /** * Default implementation of the equal function used by tansu when a store @@ -471,75 +693,37 @@ export const equal = (a: T, b: T): boolean => * ``` */ export abstract class Store implements Readable { - #subscribers = new Set>(); - #cleanupFn: null | UnsubscribeFunction = null; - #subscribersPaused = false; - #valueIndex = 1; - #value: T; - #equalCache = createEqualCache(1); - #oldSubscriptions = new WeakMap>(); - - private [skipEqualInSet] = false; + // FIXME: should we avoid exposing this with a symbol? + protected readonly _onUseLogic = onUseLogic(() => this.onUse()); + // FIXME: should we avoid exposing this with a symbol? + protected readonly _signal: Signal.State; + [skipEqual] = false; /** * * @param value - Initial value of the store */ constructor(value: T) { - this.#value = value; - } - - #start() { - this.#cleanupFn = normalizeUnsubscribe(this.onUse()); + this._signal = this._createSignal(value); + this.subscribe = createSubscribeFromSignal(this._signal); } - #stop() { - const cleanupFn = this.#cleanupFn; - if (cleanupFn) { - this.#cleanupFn = null; - cleanupFn(); - } - } - - private [queueProcess](): void { - const valueIndex = this.#valueIndex; - for (const subscriber of [...this.#subscribers]) { - if (this.#subscribersPaused || this.#valueIndex !== valueIndex) { - // the value of the store can change while notifying subscribers - // in that case, let's just stop notifying subscribers - // they will be called later through another queueProcess call - // with the correct final value and in the right order - return; - } - if (subscriber._valueIndex === 0) { - // ignore subscribers which were not yet called synchronously - continue; - } - this.#notifySubscriber(subscriber); - } + // FIXME: should we avoid exposing this with a private symbol? + protected _createSignal(value: T): Signal.State { + return new Signal.State(value, { + equals: (a, b) => { + if (this[skipEqual]) { + return false; + } + return this.equal(a, b); + }, + [Signal.subtle.watched]: this._onUseLogic.startUse, + [Signal.subtle.unwatched]: this._onUseLogic.endUse, + }); } - /** @internal */ - protected [triggerUpdate](): void {} - - #notifySubscriber(subscriber: PrivateSubscriberObject): void { - const equalCache = this.#equalCache; - const valueIndex = this.#valueIndex; - const value = this.#value; - let equal = equalCache[subscriber._valueIndex]; - if (equal == null) { - equal = !!this.equal(subscriber._value, value); - equalCache[subscriber._valueIndex] = equal; - } - subscriber._valueIndex = valueIndex; - if (!equal) { - subscriber._value = value; - subscriber._paused = false; - subscriber.next(value); - } else if (!this.#subscribersPaused && subscriber._paused) { - subscriber._paused = false; - subscriber.resume(); - } + get(): T { + return this._onUseLogic.wrapCall(() => this._signal.get()); } /** @@ -585,54 +769,6 @@ export abstract class Store implements Readable { return !equal(a, b); } - /** - * Puts the store in the paused state, which means it will soon update its value. - * - * @remarks - * - * The paused state prevents derived or computed stores (both direct and transitive) from recomputing their value - * using the current value of this store. - * - * There are two ways to put a store back into its normal state: calling {@link Store.set | set} to set a new - * value or calling {@link Store.resumeSubscribers | resumeSubscribers} to declare that finally the value does not need to be - * changed. - * - * Note that a store should not stay in the paused state for a long time, and most of the time - * it is not needed to call pauseSubscribers or resumeSubscribers manually. - * - */ - protected pauseSubscribers(): void { - if (!this.#subscribersPaused) { - this.#subscribersPaused = true; - queue.delete(this as any); - for (const subscriber of [...this.#subscribers]) { - if (subscriber._valueIndex === 0 || subscriber._paused) { - // ignore subscribers which were not yet called synchronously or are already paused - continue; - } - subscriber._paused = true; - subscriber.pause(); - } - } - } - - /** - * Puts the store back to the normal state without changing its value, if it was in the paused state - * (cf {@link Store.pauseSubscribers | pauseSubscribers}). - * - * @remarks - * - * Does nothing if the store was not in the paused state. - */ - protected resumeSubscribers(): void { - if (this.#subscribersPaused) { - this.#subscribersPaused = false; - batch(() => { - queue.add(this as any); - }); - } - } - /** * Replaces store's state with the provided value. * Equivalent of {@link Writable.set}, but internal to the store. @@ -640,18 +776,9 @@ export abstract class Store implements Readable { * @param value - value to be used as the new state of a store. */ protected set(value: T): void { - const skipEqual = this[skipEqualInSet]; - if (skipEqual || !this.equal(this.#value, value)) { - const valueIndex = this.#valueIndex + 1; - this.#valueIndex = valueIndex; - this.#value = value; - this.#equalCache = createEqualCache(valueIndex); - if (skipEqual) { - delete this.#equalCache[valueIndex - 1]; - } - this.pauseSubscribers(); - } - this.resumeSubscribers(); + batch(() => { + this._signal.set(value); + }); } /** @@ -661,7 +788,7 @@ export abstract class Store implements Readable { * @param updater - a function that takes the current state as an argument and returns the new state. */ protected update(updater: Updater): void { - this.set(updater(this.#value)); + this.set(updater(untrack(() => this._signal.get()))); } /** @@ -694,45 +821,7 @@ export abstract class Store implements Readable { * Default Implementation of the {@link SubscribableStore.subscribe}, not meant to be overridden. * @param subscriber - see {@link SubscribableStore.subscribe} */ - subscribe(subscriber: Subscriber): UnsubscribeFunction & UnsubscribeObject { - const subscriberObject = toSubscriberObject(subscriber); - const oldSubscriptionParam = subscriber?.[oldSubscription]; - if (oldSubscriptionParam) { - const oldSubscriberObject = this.#oldSubscriptions.get(oldSubscriptionParam); - if (oldSubscriberObject) { - subscriberObject._value = oldSubscriberObject._value; - subscriberObject._valueIndex = oldSubscriberObject._valueIndex; - } - } - this.#subscribers.add(subscriberObject); - batch(() => { - if (this.#subscribers.size == 1) { - this.#start(); - } else { - this[triggerUpdate](); - } - }); - this.#notifySubscriber(subscriberObject); - - const unsubscribe = () => { - const removed = this.#subscribers.delete(subscriberObject); - subscriberObject.next = noop; - subscriberObject.pause = noop; - subscriberObject.resume = noop; - if (removed) { - this.#oldSubscriptions.set(unsubscribe, subscriberObject); - if (this.#subscribers.size === 0) { - this.#stop(); - } - } - }; - (unsubscribe as any)[triggerUpdate] = () => { - this[triggerUpdate](); - this.#notifySubscriber(subscriberObject); - }; - unsubscribe.unsubscribe = unsubscribe; - return unsubscribe; - } + subscribe: Readable['subscribe']; [symbolObservable](): this { return this; @@ -815,13 +904,16 @@ export interface StoreOptions { */ function constStore(value: T): ReadableSignal { const subscribe = (subscriber: Subscriber) => { - if (!subscriber?.[oldSubscription]) { - toSubscriberObject(subscriber).next(value); - } + const next = typeof subscriber === 'function' ? subscriber : bind(subscriber, 'next'); + next(value); return noopUnsubscribe; }; normalizedSubscribe.add(subscribe); - return Object.assign(() => value, { subscribe, [symbolObservable]: returnThis }); + return Object.assign(() => value, { + subscribe, + get: () => value, + [symbolObservable]: returnThis, + }); } class WritableStore extends Store implements Writable { @@ -969,107 +1061,69 @@ function isSyncDeriveFn(fn: DeriveFn): fn is SyncDeriveFn { return fn.length <= 1; } -const callFn = (fn: () => void) => fn(); - export abstract class DerivedStore extends Store { - readonly #isArray: boolean; - readonly #storesSubscribeFn: Readable['subscribe'][]; - #pending = 0; + readonly #computedSignal: Signal.Computed; + // computed for the function param + // with as many stores and an extra one constructor(stores: S, initialValue: T) { super(initialValue); const isArray = Array.isArray(stores); - this.#isArray = isArray; - this.#storesSubscribeFn = (isArray ? [...stores] : [stores]).map(getNormalizedSubscribe); - } - - protected override resumeSubscribers(): void { - if (!this.#pending) { - // only resume subscribers if we know that the values of the stores with which - // the derived function was called were the correct ones - super.resumeSubscribers(); - } - } - - protected override onUse(): Unsubscriber | void { - let initDone = false; - let changed = 0; - - const isArray = this.#isArray; - const storesSubscribeFn = this.#storesSubscribeFn; - const dependantValues = new Array(storesSubscribeFn.length); - + const storesSignalFn = (isArray ? [...stores] : [stores]).map(toSignalFn); + const derivedArg = new Signal.Computed( + isArray ? () => storesSignalFn.map((fn) => fn()) : () => storesSignalFn[0](), + { equals: returnFalse } + ); let cleanupFn: null | UnsubscribeFunction = null; - const callCleanup = () => { - const fn = cleanupFn; - if (fn) { - cleanupFn = null; - fn(); - } + const previousCleanup = cleanupFn; + cleanupFn = null; + previousCleanup?.(); }; - - const callDerive = (setInitDone = false) => { - if (setInitDone) { - initDone = true; - } - if (initDone && !this.#pending) { - if (changed) { - changed = 0; + const callDerived = new Signal.Computed( + () => { + const derivedArgValue = derivedArg.get(); + untrack(() => { callCleanup(); - cleanupFn = normalizeUnsubscribe( - this.derive(isArray ? dependantValues : dependantValues[0]) - ); - } - this.resumeSubscribers(); + cleanupFn = normalizeUnsubscribe(this.derive(derivedArgValue as any)); + }); + }, + { + equals: returnTrue, + [Signal.subtle.unwatched]: callCleanup, } - }; + ); + this.#computedSignal = new Signal.Computed( + () => { + callDerived.get(); + return this._signal.get(); + }, + { + equals: (a, b) => { + if (this[skipEqual]) { + return false; + } + return this.equal(a, b); + }, + } + ); + // FIXME: avoid assigning subscribe twice (both in parent class Store and here) + this.subscribe = createSubscribeFromSignal(this.#computedSignal); + } - const unsubscribers = storesSubscribeFn.map((subscribe, idx) => { - const subscriber = (v: any) => { - dependantValues[idx] = v; - changed |= 1 << idx; - this.#pending &= ~(1 << idx); - callDerive(); - }; - subscriber.next = subscriber; - subscriber.pause = () => { - this.#pending |= 1 << idx; - this.pauseSubscribers(); - }; - subscriber.resume = () => { - this.#pending &= ~(1 << idx); - callDerive(); - }; - return subscribe(subscriber); + protected override _createSignal(value: T): Signal.State { + return new Signal.State(value, { + // TODO: improve this because if the value does not change, + // it still notifies all watchers depending on this signal that the value has to be checked + // but it would be better not to do anything at all + equals: returnFalse, + [Signal.subtle.watched]: this._onUseLogic.startUse, + [Signal.subtle.unwatched]: this._onUseLogic.endUse, }); + } - const triggerSubscriberPendingUpdate = (unsubscriber: any, idx: number) => { - if (this.#pending & (1 << idx)) { - unsubscriber[triggerUpdate]?.(); - } - }; - this[triggerUpdate] = () => { - let iterations = 0; - while (this.#pending) { - checkIterations(++iterations); - initDone = false; - unsubscribers.forEach(triggerSubscriberPendingUpdate); - if (this.#pending) { - // safety check: if pending is not 0 after calling triggerUpdate, - // it will never be and this is an endless loop - break; - } - callDerive(true); - } - }; - callDerive(true); - this[triggerUpdate](); - return () => { - this[triggerUpdate] = noop; - callCleanup(); - unsubscribers.forEach(callFn); - }; + override get(): T { + return this._onUseLogic.wrapCall(() => this.#computedSignal.get()); } protected abstract derive(values: StoresInputValues): Unsubscriber | void; @@ -1142,11 +1196,11 @@ export function derived( ? class extends DerivedStore { constructor(stores: S, initialValue: T) { super(stores, initialValue); - this[skipEqualInSet] = true; // skip call to equal in set until the first value is set + this[skipEqual] = true; // skip call to equal until the first value is set } protected override derive(values: StoresInputValues) { this.set(derive(values)); - this[skipEqualInSet] = false; + this[skipEqual] = false; } } : class extends DerivedStore { @@ -1172,189 +1226,7 @@ export function derived( * @param fn - function to be called * @returns the value returned by the given function */ -export const untrack = (fn: () => T): T => { - const previousReactiveContext = reactiveContext; - try { - reactiveContext = defaultReactiveContext; - return fn(); - } finally { - reactiveContext = previousReactiveContext; - } -}; - -interface ComputedStoreSubscription { - versionIndex: number; - resubscribe: () => void; - unsubscribe: UnsubscribeFunction; - pending: boolean; - usedValueIndex: number; - valueIndex: number; - value: T; -} - -const callUnsubscribe = ({ unsubscribe }: ComputedStoreSubscription) => unsubscribe(); -const callResubscribe = ({ resubscribe }: ComputedStoreSubscription) => resubscribe(); - -abstract class ComputedStore extends Store { - #computing = false; - #skipCallCompute = false; - #versionIndex = 0; - #subscriptions = new Map, ComputedStoreSubscription>(); - - #reactiveContext = (storeInput: StoreInput): U => - untrack(() => this.#getSubscriptionValue(storeInput)); - - constructor() { - super(undefined as T); - this[skipEqualInSet] = true; // skip call to equal in set until the first value is set - } - - #createSubscription(subscribe: Readable['subscribe']) { - const res: ComputedStoreSubscription = { - versionIndex: this.#versionIndex, - unsubscribe: noop, - resubscribe: noop, - pending: false, - usedValueIndex: 0, - value: undefined as T, - valueIndex: 0, - }; - const subscriber: SubscriberFunction & Partial> = (value: T) => { - res.value = value; - res.valueIndex++; - res.pending = false; - if (!this.#skipCallCompute && !this.#isPending()) { - batch(() => this.#callCompute()); - } - }; - subscriber.next = subscriber; - subscriber.pause = () => { - res.pending = true; - this.pauseSubscribers(); - }; - subscriber.resume = () => { - res.pending = false; - if (!this.#skipCallCompute && !this.#isPending()) { - batch(() => this.#callCompute()); - } - }; - res.resubscribe = () => { - res.unsubscribe = subscribe(subscriber); - subscriber[oldSubscription] = res.unsubscribe; - }; - res.resubscribe(); - return res; - } - - #getSubscriptionValue(storeInput: StoreInput) { - let res = this.#subscriptions.get(storeInput); - if (res) { - res.versionIndex = this.#versionIndex; - (res.unsubscribe as any)[triggerUpdate]?.(); - } else { - res = this.#createSubscription(getNormalizedSubscribe(storeInput)); - this.#subscriptions.set(storeInput, res); - } - res.usedValueIndex = res.valueIndex; - return res.value; - } - - #callCompute(resubscribe = false) { - this.#computing = true; - this.#skipCallCompute = true; - try { - if (this.#versionIndex > 0) { - if (resubscribe) { - this.#subscriptions.forEach(callResubscribe); - } - if (!this.#hasChange()) { - this.resumeSubscribers(); - return; - } - } - this.#versionIndex++; - const versionIndex = this.#versionIndex; - const previousReactiveContext = reactiveContext; - let value: T; - try { - reactiveContext = this.#reactiveContext; - value = this.compute(); - } finally { - reactiveContext = previousReactiveContext; - } - this.set(value); - this[skipEqualInSet] = false; - for (const [store, info] of this.#subscriptions) { - if (info.versionIndex !== versionIndex) { - this.#subscriptions.delete(store); - info.unsubscribe(); - } - } - } finally { - this.#skipCallCompute = false; - this.#computing = false; - } - } - - #isPending() { - for (const [, { pending }] of this.#subscriptions) { - if (pending) { - return true; - } - } - return false; - } - - #hasChange() { - for (const [, { valueIndex, usedValueIndex }] of this.#subscriptions) { - if (valueIndex != usedValueIndex) { - return true; - } - } - return false; - } - - protected override resumeSubscribers(): void { - if (!this.#isPending()) { - super.resumeSubscribers(); - } - } - - /** @internal */ - protected override [triggerUpdate](): void { - if (this.#computing) { - throw new Error('recursive computed'); - } - let iterations = 0; - while (this.#isPending()) { - checkIterations(++iterations); - this.#skipCallCompute = true; - try { - for (const [, { pending, unsubscribe }] of this.#subscriptions) { - if (pending) { - (unsubscribe as any)[triggerUpdate]?.(); - } - } - } finally { - this.#skipCallCompute = false; - } - if (this.#isPending()) { - // safety check: if it is still pending after calling triggerUpdate, - // it will always be and this is an endless loop - break; - } - this.#callCompute(); - } - } - - protected abstract compute(): T; - - protected override onUse(): Unsubscriber { - this.#callCompute(true); - this[triggerUpdate](); - return () => this.#subscriptions.forEach(callUnsubscribe); - } -} +export const untrack = Signal.subtle.untrack; /** * Creates a store whose value is computed by the provided function. @@ -1380,17 +1252,15 @@ abstract class ComputedStore extends Store { */ export function computed( fn: () => T, - options: Omit, 'onUse'> = {} + { equal: optEqual, notEqual: optNotEqual }: Omit, 'onUse'> = {} ): ReadableSignal { - const Computed = class extends ComputedStore { - protected override compute(): T { - return fn(); - } - }; return asReadable( - applyStoreOptions(new Computed(), { - ...options, - onUse: undefined /* setting onUse is not allowed from computed */, + new Signal.Computed(fn, { + equals: optEqual + ? (a, b) => optEqual(a, b) + : optNotEqual + ? (a, b) => !optNotEqual(a, b) + : equal, }) ); }