From 01a535ab4c013b85340ad6de185e02b3c7623392 Mon Sep 17 00:00:00 2001 From: Steven Lindsay Date: Thu, 13 Feb 2025 10:34:24 +0000 Subject: [PATCH] feat: add presence set change listener and related types - Provides a way to listen for presence set changes, as well as the presence message that moved the set into its current state. - Also exposes the current sync state of the presence set in the change event. --- ably.d.ts | 59 +++++++++++++++++++++++ src/common/lib/client/realtimepresence.ts | 56 ++++++++++++++++++++- 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/ably.d.ts b/ably.d.ts index 089da9994..62433548e 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -1020,6 +1020,40 @@ export interface RealtimePresenceParams { connectionId?: string; } +export interface PresenceSetChange { + /** + * An array containing the members of the current presence set. + * + * Each member is represented by their latest {@link PresenceMessage}. + */ + members: PresenceMessage[], + /** + * The presence message that triggered the change. + * + * Represented as a {@link PresenceMessage} object. + */ + current: PresenceMessage, + /** + * The previous presence member before the change, if any. + * + * Represented as a {@link PresenceMessage} object. + */ + previous?: PresenceMessage, + /** + * Indicates if the presence members' synchronization is in progress. + * + * When `true`, the presence set is not fully synchronized, and the `members` list may not + * accurately reflect the current state of the channel. It often occurs when there is a + * disruption in the connection or during an initial presence sync. + * + * If the presence set was previously synchronized, but became desynchronized due to a + * connection issue, this value will remain `true` until re-synchronization completes. + * + * It is recommended to only rely on the `members` list when `syncInProgress` is `false`. + */ + syncInProgress: boolean +} + /** * The `RealtimeHistoryParams` interface describes the parameters accepted by the following methods: * @@ -1512,6 +1546,14 @@ export type messageCallback = (message: T) => void; * @param changeStateChange - The state change that occurred. */ export type channelEventCallback = (changeStateChange: ChannelStateChange) => void; +/** + * A callback invoked whenever there is a change in the presence set of a channel. + * This is used to listen to changes in the members and their states within a channel's presence set. + * + * @param presenceSetChange - The details of the presence set change event that occurred. + */ +export type PresenceSetChangeListener = (presenceSetChange: PresenceSetChange) => void; + /** * The callback used for the events emitted by {@link Connection}. * @@ -1903,6 +1945,23 @@ export declare interface RealtimePresence { * Indicates whether the presence set synchronization between Ably and the clients on the channel has been completed. Set to `true` when the sync is complete. */ syncComplete: boolean; + /** + * Registers a listener that is called each time the presence state for the channel changes. This includes when members join or leave the set, or when their metadata changes. + * The listener is provided with the current set of members, + * the current presence message that triggered the set update, and the previous presence message if applicable. + * The listener is also provided with a flag that indicates whether the presence set is synchronized. + * + * @param listener - A function of type {@link PresenceSetChangeListener}, which is invoked with details of the presence set change. + * @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure. + */ + onPresenceSetChange(listener: PresenceSetChangeListener): Promise + /** + * Deregisters a previously registered listener for presence state changes on the channel. + * This ensures that the provided listener will no longer be invoked when the presence state of the channel changes. + * + * @param listener - A function of type {@link PresenceSetChangeListener} that was previously registered using {@link onPresenceSetChange}. + */ + offPresenceSetChange(listener: PresenceSetChangeListener): void; /** * Deregisters a specific listener that is registered to receive {@link PresenceMessage} on the channel for a given {@link PresenceAction}. * diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 63b1d4b82..a2e0482a1 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -55,6 +55,13 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: ( } } +type PresenceSetChangeListener = (event: { + members: PresenceMessage[]; + current: PresenceMessage; + previous?: PresenceMessage; + syncInProgress: boolean; +}) => void; + class RealtimePresence extends EventEmitter { channel: RealtimeChannel; pendingPresence: { presence: WirePresenceMessage; callback: ErrCallback }[]; @@ -63,6 +70,9 @@ class RealtimePresence extends EventEmitter { _myMembers: PresenceMap; subscriptions: EventEmitter; name?: string; + private _current?: PresenceMessage; + private _previous?: PresenceMessage; + private _presenceSetChangeEventEmitter: EventEmitter constructor(channel: RealtimeChannel) { super(channel.logger); @@ -73,6 +83,10 @@ class RealtimePresence extends EventEmitter { this._myMembers = new PresenceMap(this, (item) => item.clientId!); this.subscriptions = new EventEmitter(this.logger); this.pendingPresence = []; + this._presenceSetChangeEventEmitter = new EventEmitter(this.logger); + + // Subscribe the internal listener to the presence set change event + this._internalPresenceSubscribe(); } async enter(data: unknown): Promise { @@ -101,7 +115,7 @@ class RealtimePresence extends EventEmitter { id: string | undefined, clientId: string | undefined, data: unknown, - action: string, + action: string ): Promise { const channel = this.channel; if (!channel.connectionManager.activeState()) { @@ -443,6 +457,42 @@ class RealtimePresence extends EventEmitter { }); } + private _internalPresenceSubscribe(): void { + this.subscriptions.on((...args: any[]) => { + this._previous = this._current; + this._current = args[0]; + this._presenceSetChangeEventEmitter.emit('internal',{ + members: this.members.values(), + current: this._current, + previous: this._previous, + syncInProgress: this.members.syncInProgress, + }); + }); + } + + async onPresenceSetChange(listener: PresenceSetChangeListener): Promise { + const channel = this.channel; + if (channel.state === 'failed') { + throw ErrorInfo.fromValues(channel.invalidStateError()); + } + + // Add the listener to the dedicated presence emitter + this._presenceSetChangeEventEmitter.on(listener); + + // TODO - Add spec point for this + if (channel.channelOptions.attachOnSubscribe !== false) { + await channel.attach(); + } + } + + /** + * Removes a previously subscribed listener. + */ + offPresenceSetChange(listener: PresenceSetChangeListener): void { + // Remove the listener from the dedicated presence emitter + this._presenceSetChangeEventEmitter.off(listener); + } + async subscribe(..._args: unknown[] /* [event], listener */): Promise { const args = RealtimeChannel.processListenerArgs(_args); const event = args[0]; @@ -466,6 +516,10 @@ class RealtimePresence extends EventEmitter { const event = args[0]; const listener = args[1]; this.subscriptions.off(event, listener); + if (this.subscriptions.listeners.length === 0) { + // Resubscribe the internal listener if this unsubscribe() call has removed all listeners + this._internalPresenceSubscribe(); + } } }