Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add presence set change listener and related types #1970

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,40 @@ export interface RealtimePresenceParams {
connectionId?: string;
}

export interface PresenceSetChange {
/**
* An array containing the members of the current presence set.
*
* Each member is represented by their latest {@link PresenceMessage}.
*/
members: PresenceMessage[],
/**
* The presence message that triggered the change.
*
* Represented as a {@link PresenceMessage} object.
*/
current: PresenceMessage,
/**
* The previous presence member before the change, if any.
*
* Represented as a {@link PresenceMessage} object.
*/
previous?: PresenceMessage,
/**
* Indicates if the presence members' synchronization is in progress.
*
* When `true`, the presence set is not fully synchronized, and the `members` list may not
* accurately reflect the current state of the channel. It often occurs when there is a
* disruption in the connection or during an initial presence sync.
*
* If the presence set was previously synchronized, but became desynchronized due to a
* connection issue, this value will remain `true` until re-synchronization completes.
*
* It is recommended to only rely on the `members` list when `syncInProgress` is `false`.
*/
syncInProgress: boolean
}

/**
* The `RealtimeHistoryParams` interface describes the parameters accepted by the following methods:
*
Expand Down Expand Up @@ -1512,6 +1546,14 @@ export type messageCallback<T> = (message: T) => void;
* @param changeStateChange - The state change that occurred.
*/
export type channelEventCallback = (changeStateChange: ChannelStateChange) => void;
/**
* A callback invoked whenever there is a change in the presence set of a channel.
* This is used to listen to changes in the members and their states within a channel's presence set.
*
* @param presenceSetChange - The details of the presence set change event that occurred.
*/
export type PresenceSetChangeListener = (presenceSetChange: PresenceSetChange) => void;

/**
* The callback used for the events emitted by {@link Connection}.
*
Expand Down Expand Up @@ -1903,6 +1945,23 @@ export declare interface RealtimePresence {
* Indicates whether the presence set synchronization between Ably and the clients on the channel has been completed. Set to `true` when the sync is complete.
*/
syncComplete: boolean;
/**
* Registers a listener that is called each time the presence state for the channel changes. This includes when members join or leave the set, or when their metadata changes.
* The listener is provided with the current set of members,
* the current presence message that triggered the set update, and the previous presence message if applicable.
* The listener is also provided with a flag that indicates whether the presence set is synchronized.
*
* @param listener - A function of type {@link PresenceSetChangeListener}, which is invoked with details of the presence set change.
* @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure.
*/
onPresenceSetChange(listener: PresenceSetChangeListener): Promise<void>
/**
* Deregisters a previously registered listener for presence state changes on the channel.
* This ensures that the provided listener will no longer be invoked when the presence state of the channel changes.
*
* @param listener - A function of type {@link PresenceSetChangeListener} that was previously registered using {@link onPresenceSetChange}.
*/
offPresenceSetChange(listener: PresenceSetChangeListener): void;
/**
* Deregisters a specific listener that is registered to receive {@link PresenceMessage} on the channel for a given {@link PresenceAction}.
*
Expand Down
56 changes: 55 additions & 1 deletion src/common/lib/client/realtimepresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: (
}
}

type PresenceSetChangeListener = (event: {
members: PresenceMessage[];
current: PresenceMessage;
previous?: PresenceMessage;
syncInProgress: boolean;
}) => void;

class RealtimePresence extends EventEmitter {
channel: RealtimeChannel;
pendingPresence: { presence: WirePresenceMessage; callback: ErrCallback }[];
Expand All @@ -63,6 +70,9 @@ class RealtimePresence extends EventEmitter {
_myMembers: PresenceMap;
subscriptions: EventEmitter;
name?: string;
private _current?: PresenceMessage;
private _previous?: PresenceMessage;
private _presenceSetChangeEventEmitter: EventEmitter

constructor(channel: RealtimeChannel) {
super(channel.logger);
Expand All @@ -73,6 +83,10 @@ class RealtimePresence extends EventEmitter {
this._myMembers = new PresenceMap(this, (item) => item.clientId!);
this.subscriptions = new EventEmitter(this.logger);
this.pendingPresence = [];
this._presenceSetChangeEventEmitter = new EventEmitter(this.logger);

// Subscribe the internal listener to the presence set change event
this._internalPresenceSubscribe();
}

async enter(data: unknown): Promise<void> {
Expand Down Expand Up @@ -101,7 +115,7 @@ class RealtimePresence extends EventEmitter {
id: string | undefined,
clientId: string | undefined,
data: unknown,
action: string,
action: string
): Promise<void> {
const channel = this.channel;
if (!channel.connectionManager.activeState()) {
Expand Down Expand Up @@ -443,6 +457,42 @@ class RealtimePresence extends EventEmitter {
});
}

private _internalPresenceSubscribe(): void {
this.subscriptions.on((...args: any[]) => {
this._previous = this._current;
this._current = args[0];
this._presenceSetChangeEventEmitter.emit('internal',{
members: this.members.values(),
current: this._current,
previous: this._previous,
syncInProgress: this.members.syncInProgress,
});
});
}

async onPresenceSetChange(listener: PresenceSetChangeListener): Promise<void> {
const channel = this.channel;
if (channel.state === 'failed') {
throw ErrorInfo.fromValues(channel.invalidStateError());
}

// Add the listener to the dedicated presence emitter
this._presenceSetChangeEventEmitter.on(listener);

// TODO - Add spec point for this
if (channel.channelOptions.attachOnSubscribe !== false) {
await channel.attach();
}
}

/**
* Removes a previously subscribed listener.
*/
offPresenceSetChange(listener: PresenceSetChangeListener): void {
// Remove the listener from the dedicated presence emitter
this._presenceSetChangeEventEmitter.off(listener);
}

async subscribe(..._args: unknown[] /* [event], listener */): Promise<void> {
const args = RealtimeChannel.processListenerArgs(_args);
const event = args[0];
Expand All @@ -466,6 +516,10 @@ class RealtimePresence extends EventEmitter {
const event = args[0];
const listener = args[1];
this.subscriptions.off(event, listener);
if (this.subscriptions.listeners.length === 0) {
// Resubscribe the internal listener if this unsubscribe() call has removed all listeners
this._internalPresenceSubscribe();
}
}
}

Expand Down
Loading