From 8a50060e08133b4077d3224821003e71cc1ec921 Mon Sep 17 00:00:00 2001 From: Owen Pearson Date: Thu, 22 Jun 2023 10:50:39 +0100 Subject: [PATCH 1/2] feat: `RealtimeChannel.attach` returns `ChannelStateChange` Makes it so that `RealtimeChannel.attach` exposes the `ChannelStateChange` via whatever async api (invoke callback or fulfill promise). This makes it easy for users to access flags on the `ChannelStateChange` to access information about the attachment. --- ably.d.ts | 52 ++++++++++++-------- src/common/lib/client/realtimechannel.ts | 19 +++++--- test/realtime/channel.test.js | 61 ++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 26 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index bc92e8565e..f404077c14 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2556,9 +2556,9 @@ declare namespace Types { /** * Attach to this channel ensuring the channel is created in the Ably system and all messages published on the channel are received by any channel listeners registered using {@link RealtimeChannelCallbacks.subscribe | `subscribe()`}. Any resulting channel state change will be emitted to any listeners registered using the {@link EventEmitter.on | `on()`} or {@link EventEmitter.once | `once()`} methods. As a convenience, `attach()` is called implicitly if {@link RealtimeChannelCallbacks.subscribe | `subscribe()`} for the channel is called, or {@link RealtimePresenceCallbacks.enter | `enter()`} or {@link RealtimePresenceCallbacks.subscribe | `subscribe()`} are called on the {@link RealtimePresenceCallbacks} object for this channel. * - * @param callback - A function which will be called upon completion of the operation. If the operation succeeded, then the function will be called with `null`. If it failed, the function will be called with information about the error. + * @param callback - A function which will be called upon completion of the operation. If the operation succeeded and the channel became attached, then the function will be called with a {@link ChannelStateChange} object. If the channel was already attached the function will be called with `null`. If it failed, the function will be called with information about the error. */ - attach(callback?: errorCallback): void; + attach(callback?: StandardCallback): void; /** * Detach from this channel. Any resulting channel state change is emitted to any listeners registered using the {@link EventEmitter.on | `on()`} or {@link EventEmitter.once | `once()`} methods. Once all clients globally have detached from the channel, the channel will be released in the Ably service within two minutes. * @@ -2590,32 +2590,44 @@ declare namespace Types { * * @param event - The event name. * @param listener - An event listener function. - * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded, then the function will be called with `null`. If it failed, the function will be called with information about the error. + * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded and the channel became attached, then the function will be called with a {@link ChannelStateChange} object. If the channel was already attached the function will be called with `null`. If it failed, the function will be called with information about the error. */ - subscribe(event: string, listener?: messageCallback, callbackWhenAttached?: errorCallback): void; + subscribe( + event: string, + listener?: messageCallback, + callbackWhenAttached?: StandardCallback + ): void; /** * Registers a listener for messages on this channel for multiple event name values. * * @param events - An array of event names. * @param listener - An event listener function. - * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded, then the function will be called with `null`. If it failed, the function will be called with information about the error. + * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded and the channel became attached, then the function will be called with a {@link ChannelStateChange} object. If the channel was already attached the function will be called with `null`. If it failed, the function will be called with information about the error. */ - subscribe(events: Array, listener?: messageCallback, callbackWhenAttached?: errorCallback): void; + subscribe( + events: Array, + listener?: messageCallback, + callbackWhenAttached?: StandardCallback + ): void; /** * Registers a listener for messages on this channel that match the supplied filter. * * @param filter - A {@link MessageFilter}. * @param listener - An event listener function. - * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded, then the function will be called with `null`. If it failed, the function will be called with information about the error. + * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded and the channel became attached, then the function will be called with a {@link ChannelStateChange} object. If the channel was already attached the function will be called with `null`. If it failed, the function will be called with information about the error. */ - subscribe(filter: MessageFilter, listener?: messageCallback, callbackWhenAttached?: errorCallback): void; + subscribe( + filter: MessageFilter, + listener?: messageCallback, + callbackWhenAttached?: StandardCallback + ): void; /** * Registers a listener for messages on this channel. The caller supplies a listener function, which is called each time one or more messages arrives on the channel. * * @param listener - An event listener function. - * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded, then the function will be called with `null`. If it failed, the function will be called with information about the error. + * @param callbackWhenAttached - A function which will be called upon completion of the channel {@link RealtimeChannelCallbacks.attach | `attach()`} operation. If the operation succeeded and the channel became attached, then the function will be called with a {@link ChannelStateChange} object. If the channel was already attached the function will be called with `null`. If it failed, the function will be called with information about the error. */ - subscribe(listener: messageCallback, callbackWhenAttached?: errorCallback): void; + subscribe(listener: messageCallback, callbackWhenAttached?: StandardCallback): void; /** * Publishes a single message to the channel with the given event name and payload. When publish is called with this client library, it won't attempt to implicitly attach to the channel, so long as [transient publishing](https://ably.com/docs/realtime/channels#transient-publish) is available in the library. Otherwise, the client will implicitly attach. * @@ -2666,9 +2678,9 @@ declare namespace Types { /** * Attach to this channel ensuring the channel is created in the Ably system and all messages published on the channel are received by any channel listeners registered using {@link RealtimeChannelPromise.subscribe | `subscribe()`}. Any resulting channel state change will be emitted to any listeners registered using the {@link EventEmitter.on | `on()`} or {@link EventEmitter.once | `once()`} methods. As a convenience, `attach()` is called implicitly if {@link RealtimeChannelPromise.subscribe | `subscribe()`} for the channel is called, or {@link RealtimePresencePromise.enter | `enter()`} or {@link RealtimePresencePromise.subscribe | `subscribe()`} are called on the {@link RealtimePresencePromise} object for this channel. * - * @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon success, if the channel became attached will be fulfilled with a {@link ChannelStateChange} object. If the channel was already attached the promise will be fulfilled with `null`. Upon failure, the promise will be rejected with an {@link ErrorInfo} object. */ - attach(): Promise; + attach(): Promise; /** * Detach from this channel. Any resulting channel state change is emitted to any listeners registered using the {@link EventEmitter.on | `on()`} or {@link EventEmitter.once | `once()`} methods. Once all clients globally have detached from the channel, the channel will be released in the Ably service within two minutes. * @@ -2694,32 +2706,32 @@ declare namespace Types { * * @param event - The event name. * @param listener - An event listener function. - * @returns A promise which resolves upon success of the channel {@link RealtimeChannelPromise.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon successful attachment to the channel, will be fulfilled with a {@link ChannelStateChange} object. If the channel was already attached the promise will be resolved with `null`. Upon failure, the promise will be rejected with an {@link ErrorInfo} object. */ - subscribe(event: string, listener?: messageCallback): Promise; + subscribe(event: string, listener?: messageCallback): Promise; /** * Registers a listener for messages on this channel for multiple event name values. * * @param events - An array of event names. * @param listener - An event listener function. - * @returns A promise which resolves upon success of the channel {@link RealtimeChannelPromise.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon successful attachment to the channel, will be fulfilled with a {@link ChannelStateChange} object. If the channel was already attached the promise will be resolved with `null`. Upon failure, the promise will be rejected with an {@link ErrorInfo} object. */ - subscribe(events: Array, listener?: messageCallback): Promise; + subscribe(events: Array, listener?: messageCallback): Promise; /** * Registers a listener for messages on this channel that match the supplied filter. * * @param filter - A {@link MessageFilter}. * @param listener - An event listener function. - * @returns A promise which resolves upon success of the channel {@link RealtimeChannelPromise.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon successful attachment to the channel, will be fulfilled with a {@link ChannelStateChange} object. If the channel was already attached the promise will be resolved with `null`. Upon failure, the promise will be rejected with an {@link ErrorInfo} object. */ - subscribe(filter: MessageFilter, listener?: messageCallback): Promise; + subscribe(filter: MessageFilter, listener?: messageCallback): Promise; /** * Registers a listener for messages on this channel. The caller supplies a listener function, which is called each time one or more messages arrives on the channel. * * @param callback - An event listener function. - * @returns A promise which resolves upon success of the channel {@link RealtimeChannelPromise.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon successful attachment to the channel, will be fulfilled with a {@link ChannelStateChange} object. If the channel was already attached the promise will be resolved with `null`. Upon failure, the promise will be rejected with an {@link ErrorInfo} object. */ - subscribe(callback: messageCallback): Promise; + subscribe(callback: messageCallback): Promise; /** * Publishes a single message to the channel with the given event name and payload. When publish is called with this client library, it won't attempt to implicitly attach to the channel, so long as [transient publishing](https://ably.com/docs/realtime/channels#transient-publish) is available in the library. Otherwise, the client will implicitly attach. * diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 58a4abf595..538e49a7bf 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -12,7 +12,7 @@ import ConnectionErrors from '../transport/connectionerrors'; import * as API from '../../../../ably'; import ConnectionManager from '../transport/connectionmanager'; import ConnectionStateChange from './connectionstatechange'; -import { ErrCallback, PaginatedResultCallback } from '../../types/utils'; +import { ErrCallback, PaginatedResultCallback, StandardCallback } from '../../types/utils'; import Realtime from './realtime'; interface RealtimeHistoryParams { @@ -272,7 +272,10 @@ class RealtimeChannel extends Channel { } } - attach(flags?: API.Types.ChannelMode[] | ErrCallback, callback?: ErrCallback): void | Promise { + attach( + flags?: API.Types.ChannelMode[] | ErrCallback, + callback?: StandardCallback + ): void | Promise { let _flags: API.Types.ChannelMode[] | null | undefined; if (typeof flags === 'function') { callback = flags; @@ -296,14 +299,18 @@ class RealtimeChannel extends Channel { * current mode differs from requested mode */ this._requestedFlags = _flags as API.Types.ChannelMode[]; } else if (this.state === 'attached') { - callback(); + callback(null, null); return; } this._attach(false, null, callback); } - _attach(forceReattach: boolean, attachReason: ErrorInfo | null, callback?: ErrCallback): void { + _attach( + forceReattach: boolean, + attachReason: ErrorInfo | null, + callback?: StandardCallback + ): void { if (!callback) { callback = function (err?: ErrorInfo | null) { if (err) { @@ -325,7 +332,7 @@ class RealtimeChannel extends Channel { this.once(function (this: { event: string }, stateChange: ChannelStateChange) { switch (this.event) { case 'attached': - callback?.(); + callback?.(null, stateChange); break; case 'detached': case 'suspended': @@ -422,7 +429,7 @@ class RealtimeChannel extends Channel { this.sendMessage(msg, callback || noop); } - subscribe(...args: unknown[] /* [event], listener, [callback] */): void | Promise { + subscribe(...args: unknown[] /* [event], listener, [callback] */): void | Promise { const [event, listener, callback] = RealtimeChannel.processListenerArgs(args); if (!callback && this.realtime.options.promises) { diff --git a/test/realtime/channel.test.js b/test/realtime/channel.test.js index 13993e6f54..b12519bb2b 100644 --- a/test/realtime/channel.test.js +++ b/test/realtime/channel.test.js @@ -1569,5 +1569,66 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async channel.subscribe(subscriber); }); + + it('attach_returns_state_change', function (done) { + var realtime = helper.AblyRealtime(); + var channelName = 'attach_returns_state_chnage'; + var channel = realtime.channels.get(channelName); + channel.attach(function (err, stateChange) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + + try { + expect(stateChange.current).to.equal('attached'); + expect(stateChange.previous).to.equal('attaching'); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + + // for an already-attached channel, null is returned + channel.attach(function (err, stateChange) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + + try { + expect(stateChange).to.equal(null); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + closeAndFinish(done, realtime); + }); + }); + }); + + it('subscribe_returns_state_change', function (done) { + var realtime = helper.AblyRealtime(); + var channelName = 'subscribe_returns_state_chnage'; + var channel = realtime.channels.get(channelName); + channel.subscribe( + function () {}, // message listener + // attach callback + function (err, stateChange) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + + try { + expect(stateChange.current).to.equal('attached'); + expect(stateChange.previous).to.equal('attaching'); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + closeAndFinish(done, realtime); + } + ); + }); }); }); From b75e0e46be5ed371d9e16a7aff1d3bf90e6058af Mon Sep 17 00:00:00 2001 From: Owen Pearson Date: Thu, 22 Jun 2023 11:09:38 +0100 Subject: [PATCH 2/2] feat: add `ChannelStateChange.hasBacklog` Exposes the `hasBacklog` flag on `ChannelStateChange`. This may be used in combination with rewind to check whether to expect a backlog of messages upon attachment. --- ably.d.ts | 4 ++ src/common/lib/client/channelstatechange.ts | 14 +++++- src/common/lib/client/realtimechannel.ts | 10 ++-- src/common/lib/client/realtimepresence.ts | 2 +- test/realtime/channel.test.js | 54 +++++++++++++++++++++ 5 files changed, 77 insertions(+), 7 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index f404077c14..331835ac36 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -1149,6 +1149,10 @@ declare namespace Types { * Indicates whether message continuity on this channel is preserved, see [Nonfatal channel errors](https://ably.com/docs/realtime/channels#nonfatal-errors) for more info. */ resumed: boolean; + /** + * Indicates whether the client can expect a backlog of messages from a rewind or resume. + */ + hasBacklog?: boolean; } /** diff --git a/src/common/lib/client/channelstatechange.ts b/src/common/lib/client/channelstatechange.ts index b778210032..09cf0a2c6f 100644 --- a/src/common/lib/client/channelstatechange.ts +++ b/src/common/lib/client/channelstatechange.ts @@ -5,11 +5,21 @@ class ChannelStateChange { current: string; resumed?: boolean; reason?: string | Error | ErrorInfo; + hasBacklog?: boolean; - constructor(previous: string, current: string, resumed?: boolean, reason?: string | Error | ErrorInfo | null) { + constructor( + previous: string, + current: string, + resumed?: boolean, + hasBacklog?: boolean, + reason?: string | Error | ErrorInfo | null + ) { this.previous = previous; this.current = current; - if (current === 'attached') this.resumed = resumed; + if (current === 'attached') { + this.resumed = resumed; + this.hasBacklog = hasBacklog; + } if (reason) this.reason = reason; } } diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 538e49a7bf..9b8829cea5 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -622,12 +622,13 @@ class RealtimeChannel extends Channel { this.modes = (modesFromFlags && Utils.allToLowerCase(modesFromFlags)) || undefined; const resumed = message.hasFlag('RESUMED'); const hasPresence = message.hasFlag('HAS_PRESENCE'); + const hasBacklog = message.hasFlag('HAS_BACKLOG'); if (this.state === 'attached') { if (!resumed) { /* On a loss of continuity, the presence set needs to be re-synced */ this.presence.onAttached(hasPresence); } - const change = new ChannelStateChange(this.state, this.state, resumed, message.error); + const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error); this._allChannelChanges.emit('update', change); if (!resumed || this.channelOptions.updateOnAttached) { this.emit('update', change); @@ -636,7 +637,7 @@ class RealtimeChannel extends Channel { /* RTL5i: re-send DETACH and remain in the 'detaching' state */ this.checkPendingState(); } else { - this.notifyState('attached', message.error, resumed, hasPresence); + this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog); } break; } @@ -797,7 +798,8 @@ class RealtimeChannel extends Channel { state: API.Types.ChannelState, reason?: ErrorInfo | null, resumed?: boolean, - hasPresence?: boolean + hasPresence?: boolean, + hasBacklog?: boolean ): void { Logger.logAction( Logger.LOG_MICRO, @@ -823,7 +825,7 @@ class RealtimeChannel extends Channel { if (reason) { this.errorReason = reason; } - const change = new ChannelStateChange(this.state, state, resumed, reason); + const change = new ChannelStateChange(this.state, state, resumed, hasBacklog, reason); const logLevel = state === 'failed' ? Logger.LOG_ERROR : Logger.LOG_MAJOR; Logger.logAction( logLevel, diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 632bf49182..4cbc4148d5 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -477,7 +477,7 @@ class RealtimePresence extends Presence { const msg = 'Presence auto-re-enter failed: ' + err.toString(); const wrappedErr = new ErrorInfo(msg, 91004, 400); Logger.logAction(Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', msg); - const change = new ChannelStateChange(this.channel.state, this.channel.state, true, wrappedErr); + const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr); this.channel.emit('update', change); } }; diff --git a/test/realtime/channel.test.js b/test/realtime/channel.test.js index b12519bb2b..886548bdf9 100644 --- a/test/realtime/channel.test.js +++ b/test/realtime/channel.test.js @@ -1630,5 +1630,59 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async } ); }); + + it('rewind_has_backlog_0', function (done) { + var realtime = helper.AblyRealtime(); + var channelName = 'rewind_has_backlog_0'; + var channelOpts = { params: { rewind: '1' } }; + var channel = realtime.channels.get(channelName, channelOpts); + + // attach with rewind but no channel history - hasBacklog should be false + channel.attach(function (err, stateChange) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + + try { + expect(!stateChange.hasBacklog).to.be.ok; + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + closeAndFinish(done, realtime); + }); + }); + + it('rewind_has_backlog_1', function (done) { + var realtime = helper.AblyRealtime(); + var rest = helper.AblyRest(); + var channelName = 'rewind_has_backlog_1'; + var channelOpts = { params: { rewind: '1' } }; + var rtChannel = realtime.channels.get(channelName, channelOpts); + var restChannel = rest.channels.get(channelName); + + // attach with rewind after publishing - hasBacklog should be true + restChannel.publish('foo', 'bar', function (err) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + rtChannel.attach(function (err, stateChange) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + + try { + expect(stateChange.hasBacklog).to.be.ok; + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + closeAndFinish(done, realtime); + }); + }); + }); }); });