diff --git a/ably.d.ts b/ably.d.ts index 77ed8b98e..5028351ea 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -875,7 +875,7 @@ export type ChannelMode = | ChannelModes.PUBLISH | ChannelModes.SUBSCRIBE | ChannelModes.PRESENCE - | ChannelModes.PRESENCE_SUBSCRIBE + | ChannelModes.PRESENCE_SUBSCRIBE; /** * Passes additional properties to a {@link Channel} or {@link RealtimeChannel} object, such as encryption, {@link ChannelMode} and channel parameters. diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index 8117950a4..0697830bc 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -264,7 +264,10 @@ async function checkBaseRealtimeFiles() { 'src/common/lib/transport/transport.ts', 'src/common/lib/types/errorinfo.ts', 'src/common/lib/types/message.ts', + 'src/common/lib/types/basemessage.ts', + 'src/common/lib/types/presencemessage.ts', 'src/common/lib/types/protocolmessage.ts', + 'src/common/lib/types/protocolmessagecommon.ts', 'src/common/lib/types/pushchannelsubscription.ts', // TODO why? https://github.com/ably/ably-js/issues/1506 'src/common/lib/util/defaults.ts', 'src/common/lib/util/eventemitter.ts', diff --git a/src/common/lib/client/defaultrealtime.ts b/src/common/lib/client/defaultrealtime.ts index da67d5d4a..447b82ee6 100644 --- a/src/common/lib/client/defaultrealtime.ts +++ b/src/common/lib/client/defaultrealtime.ts @@ -12,11 +12,7 @@ import { DefaultPresenceMessage } from '../types/defaultpresencemessage'; import WebSocketTransport from '../transport/websockettransport'; import { FilteredSubscriptions } from './filteredsubscriptions'; import { PresenceMap } from './presencemap'; -import { - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - fromWireProtocol as presenceMessageFromWireProtocol, -} from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; import { Http } from 'common/types/http'; import Defaults from '../util/defaults'; import Logger from '../util/logger'; @@ -39,9 +35,8 @@ export class DefaultRealtime extends BaseRealtime { MsgPack, RealtimePresence: { RealtimePresence, - presenceMessageFromValues, - presenceMessagesFromValuesArray, - presenceMessageFromWireProtocol, + PresenceMessage, + WirePresenceMessage, }, WebSocketTransport, MessageInteractions: FilteredSubscriptions, diff --git a/src/common/lib/client/modularplugins.ts b/src/common/lib/client/modularplugins.ts index c3bdf1ea0..1729d59f9 100644 --- a/src/common/lib/client/modularplugins.ts +++ b/src/common/lib/client/modularplugins.ts @@ -5,18 +5,13 @@ import RealtimePresence from './realtimepresence'; import XHRRequest from 'platform/web/lib/http/request/xhrrequest'; import fetchRequest from 'platform/web/lib/http/request/fetchrequest'; import { FilteredSubscriptions } from './filteredsubscriptions'; -import { - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - fromWireProtocol as presenceMessageFromWireProtocol, -} from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; import { TransportCtor } from '../transport/transport'; import * as PushPlugin from 'plugins/push'; export interface PresenceMessagePlugin { - presenceMessageFromValues: typeof presenceMessageFromValues; - presenceMessagesFromValuesArray: typeof presenceMessagesFromValuesArray; - presenceMessageFromWireProtocol: typeof presenceMessageFromWireProtocol; + PresenceMessage: typeof PresenceMessage; + WirePresenceMessage: typeof WirePresenceMessage; } export type RealtimePresencePlugin = PresenceMessagePlugin & { diff --git a/src/common/lib/client/presencemap.ts b/src/common/lib/client/presencemap.ts index 793fde54b..0cace6c59 100644 --- a/src/common/lib/client/presencemap.ts +++ b/src/common/lib/client/presencemap.ts @@ -1,7 +1,7 @@ import * as Utils from '../util/utils'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; -import PresenceMessage, { fromValues as presenceMessageFromValues } from '../types/presencemessage'; +import PresenceMessage from '../types/presencemessage'; import type RealtimePresence from './realtimepresence'; @@ -80,7 +80,7 @@ export class PresenceMap extends EventEmitter { put(item: PresenceMessage) { if (item.action === 'enter' || item.action === 'update') { - item = presenceMessageFromValues(item); + item = PresenceMessage.fromValues(item); item.action = 'present'; } const map = this.map, @@ -118,7 +118,7 @@ export class PresenceMap extends EventEmitter { /* RTP2f */ if (this.syncInProgress) { - item = presenceMessageFromValues(item); + item = PresenceMessage.fromValues(item); item.action = 'absent'; map[key] = item; } else { diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 988d7828f..5e67ab11e 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -1,24 +1,13 @@ -import ProtocolMessage, { - actions, - channelModes, - fromValues as protocolMessageFromValues, -} from '../types/protocolmessage'; +import { actions, channelModes } from '../types/protocolmessagecommon'; +import ProtocolMessage, { fromValues as protocolMessageFromValues } from '../types/protocolmessage'; import EventEmitter from '../util/eventemitter'; import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RealtimePresence from './realtimepresence'; -import Message, { - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, - encodeArray as encodeMessagesArray, - decode as decodeMessage, - getMessagesSize, - CipherOptions, - EncodingDecodingContext, -} from '../types/message'; +import { EncodingDecodingContext, CipherOptions, populateFieldsFromParent } from '../types/basemessage'; +import Message, { WireMessage, getMessagesSize, encodeArray as encodeMessagesArray } from '../types/message'; import ChannelStateChange from './channelstatechange'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; -import PresenceMessage, { decode as decodePresenceMessage } from '../types/presencemessage'; import ConnectionErrors from '../transport/connectionerrors'; import * as API from '../../../../ably'; import ConnectionManager from '../transport/connectionmanager'; @@ -29,6 +18,7 @@ import { ChannelOptions } from '../../types/channel'; import { normaliseChannelOptions } from '../util/defaults'; import { PaginatedResult } from './paginatedresource'; import type { PushChannel } from 'plugins/push'; +import type { WirePresenceMessage } from '../types/presencemessage'; interface RealtimeHistoryParams { start?: number; @@ -225,28 +215,32 @@ class RealtimeChannel extends EventEmitter { } async publish(...args: any[]): Promise { - let messages = args[0]; + let messages: Message[]; let argCount = args.length; if (!this.connectionManager.activeState()) { throw this.connectionManager.getError(); } if (argCount == 1) { - if (Utils.isObject(messages)) messages = [messageFromValues(messages)]; - else if (Array.isArray(messages)) messages = messagesFromValuesArray(messages); - else + if (Utils.isObject(args[0])) { + messages = [Message.fromValues(args[0])]; + } else if (Array.isArray(args[0])) { + messages = Message.fromValuesArray(args[0]); + } else { throw new ErrorInfo( 'The single-argument form of publish() expects a message object or an array of message objects', 40013, 400, ); + } } else { - messages = [messageFromValues({ name: args[0], data: args[1] })]; + messages = [Message.fromValues({ name: args[0], data: args[1] })]; } const maxMessageSize = this.client.options.maxMessageSize; - await encodeMessagesArray(messages, this.channelOptions as CipherOptions); + // TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken + const wireMessages = await encodeMessagesArray(messages, this.channelOptions as CipherOptions); /* RSL1i */ - const size = getMessagesSize(messages); + const size = getMessagesSize(wireMessages); if (size > maxMessageSize) { throw new ErrorInfo( 'Maximum size of messages that can be published at once exceeded ( was ' + @@ -259,11 +253,11 @@ class RealtimeChannel extends EventEmitter { ); } return new Promise((resolve, reject) => { - this._publish(messages, (err) => (err ? reject(err) : resolve())); + this._publish(wireMessages, (err) => (err ? reject(err) : resolve())); }); } - _publish(messages: Array, callback: ErrCallback) { + _publish(messages: Array, callback: ErrCallback) { Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.publish()', 'message count = ' + messages.length); const state = this.state; switch (state) { @@ -482,13 +476,11 @@ class RealtimeChannel extends EventEmitter { this.connectionManager.send(msg, this.client.options.queueMessages, callback); } - sendPresence(presence: PresenceMessage | PresenceMessage[], callback?: ErrCallback): void { + sendPresence(presence: WirePresenceMessage[], callback?: ErrCallback): void { const msg = protocolMessageFromValues({ action: actions.PRESENCE, channel: this.name, - presence: Array.isArray(presence) - ? this.client._RealtimePresence!.presenceMessagesFromValuesArray(presence) - : [this.client._RealtimePresence!.presenceMessageFromValues(presence)], + presence: presence, }); this.sendMessage(msg, callback); } @@ -565,16 +557,19 @@ class RealtimeChannel extends EventEmitter { if (!message.presence) break; // eslint-disable-next-line no-fallthrough case actions.PRESENCE: { - const presenceMessages = message.presence; - - if (!presenceMessages) { + if (!message.presence) { break; } + populateFieldsFromParent(message); const options = this.channelOptions; - await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options)); - if (this._presence) { + const presenceMessages = await Promise.all( + message.presence.map((wpm) => { + return wpm.decode(options, this.logger); + }), + ); + this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any); } break; @@ -597,9 +592,11 @@ class RealtimeChannel extends EventEmitter { return; } - const messages = message.messages as Array, - firstMessage = messages[0], - lastMessage = messages[messages.length - 1], + populateFieldsFromParent(message); + + const encoded = message.messages!, + firstMessage = encoded[0], + lastMessage = encoded[encoded.length - 1], channelSerial = message.channelSerial; if ( @@ -618,44 +615,34 @@ class RealtimeChannel extends EventEmitter { break; } - const { unrecoverableError } = await this._decodeAndPrepareMessages( - message, - messages, - (msg) => decodeMessage(msg, this._decodingContext), - (e) => { - /* decrypt failed .. the most likely cause is that we have the wrong key */ - const errorInfo = e as ErrorInfo; + let messages: Message[] = []; + for (let i = 0; i < encoded.length; i++) { + const { decoded, err } = await encoded[i].decodeWithErr(this._decodingContext, this.logger); + messages[i] = decoded; - switch (errorInfo.code) { + if (err) { + switch (err.code) { case 40018: /* decode failure */ - this._startDecodeFailureRecovery(errorInfo); - return { unrecoverableError: true }; + this._startDecodeFailureRecovery(err); + return; - case 40019: - /* No vcdiff plugin passed in - no point recovering, give up */ - // eslint-disable-next-line no-fallthrough + case 40019: /* No vcdiff plugin passed in - no point recovering, give up */ case 40021: /* Browser does not support deltas, similarly no point recovering */ - this.notifyState('failed', errorInfo); - return { unrecoverableError: true }; + this.notifyState('failed', err); + return; default: - return { unrecoverableError: false }; + // do nothing, continue decoding } - }, - ); - if (unrecoverableError) { - return; + } } for (let i = 0; i < messages.length; i++) { const msg = messages[i]; if (channelSerial && !msg.version) { msg.version = channelSerial + ':' + i.toString().padStart(3, '0'); - // already done in fromWireProtocol -- but for realtime messages the source - // fields might be copied from the protocolmessage, so need to do it again - msg.expandFields(); } } @@ -688,51 +675,6 @@ class RealtimeChannel extends EventEmitter { } } - /** - * Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data. - * - * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding - * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided - */ - private async _decodeAndPrepareMessages( - protocolMessage: ProtocolMessage, - messages: T[], - decodeFn: (msg: T) => Promise, - decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean }, - ): Promise<{ unrecoverableError: boolean }> { - const { id, connectionId, timestamp } = protocolMessage; - - for (let i = 0; i < messages.length; i++) { - const msg = messages[i]; - - try { - // decode underlying data for a message - await decodeFn(msg); - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.decodeAndPrepareMessages()', - (e as Error).toString(), - ); - - if (decodeErrorRecoveryHandler) { - const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error); - if (unrecoverableError) { - // break out of for loop by returning - return { unrecoverableError: true }; - } - } - } - - if (!msg.connectionId) msg.connectionId = connectionId; - if (!msg.timestamp) msg.timestamp = timestamp; - if (id && !msg.id) msg.id = id + ':' + i; - } - - return { unrecoverableError: false }; - } - _startDecodeFailureRecovery(reason: ErrorInfo): void { if (!this._lastPayload.decodeFailureRecoveryInProgress) { Logger.logAction( diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 64ad344f6..7f1ce2d9d 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -1,16 +1,12 @@ import * as Utils from '../util/utils'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; -import PresenceMessage, { - fromValues as presenceMessageFromValues, - fromData as presenceMessageFromData, - encode as encodePresenceMessage, -} from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; +import type { CipherOptions } from '../types/basemessage'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; import RealtimeChannel from './realtimechannel'; import Multicaster from '../util/multicaster'; import ChannelStateChange from './channelstatechange'; -import { CipherOptions } from '../types/message'; import { ErrCallback } from '../../types/utils'; import { PaginatedResult } from './paginatedresource'; import { PresenceMap, RealtimePresenceParams } from './presencemap'; @@ -61,7 +57,7 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: ( class RealtimePresence extends EventEmitter { channel: RealtimeChannel; - pendingPresence: { presence: PresenceMessage; callback: ErrCallback }[]; + pendingPresence: { presence: WirePresenceMessage; callback: ErrCallback }[]; syncComplete: boolean; members: PresenceMap; _myMembers: PresenceMap; @@ -119,7 +115,7 @@ class RealtimePresence extends EventEmitter { 'channel = ' + channel.name + ', id = ' + id + ', client = ' + (clientId || '(implicit) ' + getClientId(this)), ); - const presence = presenceMessageFromData(data); + const presence = PresenceMessage.fromData(data); presence.action = action; if (id) { presence.id = id; @@ -127,12 +123,12 @@ class RealtimePresence extends EventEmitter { if (clientId) { presence.clientId = clientId; } + const wirePresMsg = await presence.encode(channel.channelOptions as CipherOptions); - await encodePresenceMessage(presence, channel.channelOptions as CipherOptions); switch (channel.state) { case 'attached': return new Promise((resolve, reject) => { - channel.sendPresence(presence, (err) => (err ? reject(err) : resolve())); + channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve())); }); case 'initialized': case 'detached': @@ -141,7 +137,7 @@ class RealtimePresence extends EventEmitter { case 'attaching': return new Promise((resolve, reject) => { this.pendingPresence.push({ - presence: presence, + presence: wirePresMsg, callback: (err) => (err ? reject(err) : resolve()), }); }); @@ -175,20 +171,21 @@ class RealtimePresence extends EventEmitter { 'RealtimePresence.leaveClient()', 'leaving; channel = ' + this.channel.name + ', client = ' + clientId, ); - const presence = presenceMessageFromData(data); + const presence = PresenceMessage.fromData(data); presence.action = 'leave'; if (clientId) { presence.clientId = clientId; } + const wirePresMsg = await presence.encode(channel.channelOptions as CipherOptions); return new Promise((resolve, reject) => { switch (channel.state) { case 'attached': - channel.sendPresence(presence, (err) => (err ? reject(err) : resolve())); + channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve())); break; case 'attaching': this.pendingPresence.push({ - presence: presence, + presence: wirePresMsg, callback: (err) => (err ? reject(err) : resolve()), }); break; @@ -288,8 +285,7 @@ class RealtimePresence extends EventEmitter { } } - for (let i = 0; i < presenceSet.length; i++) { - const presence = presenceMessageFromValues(presenceSet[i]); + for (let presence of presenceSet) { switch (presence.action) { case 'leave': if (members.remove(presence)) { @@ -320,7 +316,7 @@ class RealtimePresence extends EventEmitter { /* broadcast to listeners */ for (let i = 0; i < broadcastMessages.length; i++) { const presence = broadcastMessages[i]; - this.subscriptions.emit(presence.action as string, presence); + this.subscriptions.emit(presence.action!, presence); } } @@ -435,7 +431,7 @@ class RealtimePresence extends EventEmitter { _synthesizeLeaves(items: PresenceMessage[]): void { const subscriptions = this.subscriptions; items.forEach(function (item) { - const presence = presenceMessageFromValues({ + const presence = PresenceMessage.fromValues({ action: 'leave', connectionId: item.connectionId, clientId: item.clientId, diff --git a/src/common/lib/client/restchannel.ts b/src/common/lib/client/restchannel.ts index e27133c6e..0c14d120a 100644 --- a/src/common/lib/client/restchannel.ts +++ b/src/common/lib/client/restchannel.ts @@ -2,13 +2,11 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RestPresence from './restpresence'; import Message, { - encodeArray as encodeMessagesArray, serialize as serializeMessage, getMessagesSize, - CipherOptions, - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, + encodeArray as encodeMessagesArray, } from '../types/message'; +import { CipherOptions } from '../types/basemessage'; import ErrorInfo from '../types/errorinfo'; import { PaginatedResult } from './paginatedresource'; import Resource from './resource'; @@ -74,13 +72,13 @@ class RestChannel { if (typeof first === 'string' || first === null) { /* (name, data, ...) */ - messages = [messageFromValues({ name: first, data: second })]; + messages = [Message.fromValues({ name: first, data: second })]; params = args[2]; } else if (Utils.isObject(first)) { - messages = [messageFromValues(first)]; + messages = [Message.fromValues(first)]; params = args[1]; } else if (Array.isArray(first)) { - messages = messagesFromValuesArray(first); + messages = Message.fromValuesArray(first); params = args[1]; } else { throw new ErrorInfo( @@ -110,10 +108,10 @@ class RestChannel { }); } - await encodeMessagesArray(messages, this.channelOptions as CipherOptions); + const wireMessages = await encodeMessagesArray(messages, this.channelOptions as CipherOptions); /* RSL1i */ - const size = getMessagesSize(messages), + const size = getMessagesSize(wireMessages), maxMessageSize = options.maxMessageSize; if (size > maxMessageSize) { throw new ErrorInfo( @@ -127,7 +125,7 @@ class RestChannel { ); } - await this._publish(serializeMessage(messages, client._MsgPack, format), headers, params); + await this._publish(serializeMessage(wireMessages, client._MsgPack, format), headers, params); } async _publish(requestBody: RequestBody | null, headers: Record, params: any): Promise { diff --git a/src/common/lib/client/restchannelmixin.ts b/src/common/lib/client/restchannelmixin.ts index 67fd701f3..b24ccb6bd 100644 --- a/src/common/lib/client/restchannelmixin.ts +++ b/src/common/lib/client/restchannelmixin.ts @@ -2,7 +2,7 @@ import * as API from '../../../../ably'; import RestChannel from './restchannel'; import RealtimeChannel from './realtimechannel'; import * as Utils from '../util/utils'; -import Message, { WireProtocolMessage, _fromEncodedArray } from '../types/message'; +import Message, { WireMessage, _fromEncodedArray } from '../types/message'; import Defaults from '../util/defaults'; import PaginatedResource, { PaginatedResult } from './paginatedresource'; import Resource from './resource'; @@ -35,9 +35,9 @@ export class RestChannelMixin { headers, unpacked, ) { - const decoded: WireProtocolMessage[] = unpacked - ? (body as WireProtocolMessage[]) - : Utils.decodeBody(body, client._MsgPack, format); + const decoded = ( + unpacked ? body : Utils.decodeBody(body, client._MsgPack, format) + ) as Utils.Properties[]; return _fromEncodedArray(decoded, channel); }).get(params as Record); diff --git a/src/common/lib/client/restpresence.ts b/src/common/lib/client/restpresence.ts index 6bf932746..0ac42a0c5 100644 --- a/src/common/lib/client/restpresence.ts +++ b/src/common/lib/client/restpresence.ts @@ -1,7 +1,7 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import PaginatedResource, { PaginatedResult } from './paginatedresource'; -import PresenceMessage, { WireProtocolPresenceMessage, _fromEncodedArray } from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage, _fromEncodedArray } from '../types/presencemessage'; import RestChannel from './restchannel'; import Defaults from '../util/defaults'; @@ -31,9 +31,9 @@ class RestPresence { headers, envelope, async (body, headers, unpacked) => { - const decoded: WireProtocolPresenceMessage[] = unpacked - ? (body as WireProtocolPresenceMessage[]) - : Utils.decodeBody(body, client._MsgPack, format); + const decoded = ( + unpacked ? body : Utils.decodeBody(body, client._MsgPack, format) + ) as Utils.Properties[]; return _fromEncodedArray(decoded, this.channel); }, diff --git a/src/common/lib/client/restpresencemixin.ts b/src/common/lib/client/restpresencemixin.ts index e9d562120..5ca340fb2 100644 --- a/src/common/lib/client/restpresencemixin.ts +++ b/src/common/lib/client/restpresencemixin.ts @@ -3,7 +3,7 @@ import RealtimePresence from './realtimepresence'; import * as Utils from '../util/utils'; import Defaults from '../util/defaults'; import PaginatedResource, { PaginatedResult } from './paginatedresource'; -import PresenceMessage, { WireProtocolPresenceMessage, _fromEncodedArray } from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage, _fromEncodedArray } from '../types/presencemessage'; import { RestChannelMixin } from './restchannelmixin'; export class RestPresenceMixin { @@ -28,9 +28,9 @@ export class RestPresenceMixin { headers, envelope, async (body, headers, unpacked) => { - const decoded: WireProtocolPresenceMessage[] = unpacked - ? (body as WireProtocolPresenceMessage[]) - : Utils.decodeBody(body, client._MsgPack, format); + const decoded = ( + unpacked ? body : Utils.decodeBody(body, client._MsgPack, format) + ) as Utils.Properties[]; return _fromEncodedArray(decoded, presence.channel); }, diff --git a/src/common/lib/transport/comettransport.ts b/src/common/lib/transport/comettransport.ts index ab468b6cc..77f0a75f6 100644 --- a/src/common/lib/transport/comettransport.ts +++ b/src/common/lib/transport/comettransport.ts @@ -1,6 +1,6 @@ import * as Utils from '../util/utils'; +import { actions } from '../types/protocolmessagecommon'; import ProtocolMessage, { - actions, fromValues as protocolMessageFromValues, fromDeserialized as protocolMessageFromDeserialized, } from '../types/protocolmessage'; diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index 8ef56dcc2..73e948379 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -1,5 +1,5 @@ +import { actions } from '../types/protocolmessagecommon'; import ProtocolMessage, { - actions, stringify as stringifyProtocolMessage, fromValues as protocolMessageFromValues, } from 'common/lib/types/protocolmessage'; diff --git a/src/common/lib/transport/protocol.ts b/src/common/lib/transport/protocol.ts index cb91a1db2..5f792b1a7 100644 --- a/src/common/lib/transport/protocol.ts +++ b/src/common/lib/transport/protocol.ts @@ -1,4 +1,5 @@ -import ProtocolMessage, { actions, stringify as stringifyProtocolMessage } from '../types/protocolmessage'; +import { actions } from '../types/protocolmessagecommon'; +import ProtocolMessage, { stringify as stringifyProtocolMessage } from '../types/protocolmessage'; import * as Utils from '../util/utils'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; diff --git a/src/common/lib/transport/transport.ts b/src/common/lib/transport/transport.ts index 23ad1ec05..5172bd6d6 100644 --- a/src/common/lib/transport/transport.ts +++ b/src/common/lib/transport/transport.ts @@ -1,5 +1,5 @@ +import { actions } from '../types/protocolmessagecommon'; import ProtocolMessage, { - actions, fromValues as protocolMessageFromValues, stringify as stringifyProtocolMessage, } from '../types/protocolmessage'; diff --git a/src/common/lib/types/basemessage.ts b/src/common/lib/types/basemessage.ts new file mode 100644 index 000000000..749ca5b03 --- /dev/null +++ b/src/common/lib/types/basemessage.ts @@ -0,0 +1,273 @@ +import Platform from 'common/platform'; +import Logger from '../util/logger'; +import ErrorInfo from './errorinfo'; +import * as Utils from '../util/utils'; +import { Bufferlike as BrowserBufferlike } from '../../../platform/web/lib/util/bufferutils'; +import * as API from '../../../../ably'; +import { actions } from './protocolmessagecommon'; + +import type { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; +import type { ChannelOptions } from '../../types/channel'; +import type ProtocolMessage from './protocolmessage'; + +export type CipherOptions = { + channelCipher: { + encrypt: Function; + algorithm: 'aes'; + }; + cipher?: { + channelCipher: { + encrypt: Function; + algorithm: 'aes'; + }; + }; +}; + +export type EncodingDecodingContext = { + channelOptions: ChannelOptions; + plugins: { + vcdiff?: { + decode: (delta: Uint8Array, source: Uint8Array) => Uint8Array; + }; + }; + baseEncodedPreviousPayload?: Buffer | BrowserBufferlike; +}; + +function normaliseContext(context: CipherOptions | EncodingDecodingContext | ChannelOptions): EncodingDecodingContext { + if (!context || !(context as EncodingDecodingContext).channelOptions) { + return { + channelOptions: context as ChannelOptions, + plugins: {}, + baseEncodedPreviousPayload: undefined, + }; + } + return context as EncodingDecodingContext; +} + +export function normalizeCipherOptions( + Crypto: IUntypedCryptoStatic | null, + logger: Logger, + options: API.ChannelOptions | null, +): ChannelOptions { + if (options && options.cipher) { + if (!Crypto) Utils.throwMissingPluginError('Crypto'); + const cipher = Crypto.getCipher(options.cipher, logger); + return { + cipher: cipher.cipherParams, + channelCipher: cipher.cipher, + }; + } + return options ?? {}; +} + +async function encrypt(msg: T, options: CipherOptions): Promise { + let data = msg.data, + encoding = msg.encoding, + cipher = options.channelCipher; + + encoding = encoding ? encoding + '/' : ''; + if (!Platform.BufferUtils.isBuffer(data)) { + data = Platform.BufferUtils.utf8Encode(String(data)); + encoding = encoding + 'utf-8/'; + } + const ciphertext = await cipher.encrypt(data); + msg.data = ciphertext; + msg.encoding = encoding + 'cipher+' + cipher.algorithm; + return msg; +} + +export async function encode(msg: T, options: CipherOptions): Promise { + const data = msg.data; + const nativeDataType = + typeof data == 'string' || Platform.BufferUtils.isBuffer(data) || data === null || data === undefined; + + if (!nativeDataType) { + if (Utils.isObject(data) || Array.isArray(data)) { + msg.data = JSON.stringify(data); + msg.encoding = msg.encoding ? msg.encoding + '/json' : 'json'; + } else { + throw new ErrorInfo('Data type is unsupported', 40013, 400); + } + } + + if (options != null && options.cipher) { + return encrypt(msg, options); + } else { + return msg; + } +} + +export async function decode( + message: T, + inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, +): Promise { + const context = normaliseContext(inputContext); + + let lastPayload = message.data; + const encoding = message.encoding; + if (encoding) { + const xforms = encoding.split('/'); + let lastProcessedEncodingIndex, + encodingsToProcess = xforms.length, + data = message.data; + + let xform = ''; + try { + while ((lastProcessedEncodingIndex = encodingsToProcess) > 0) { + // eslint-disable-next-line security/detect-unsafe-regex + const match = xforms[--encodingsToProcess].match(/([-\w]+)(\+([\w-]+))?/); + if (!match) break; + xform = match[1]; + switch (xform) { + case 'base64': + data = Platform.BufferUtils.base64Decode(String(data)); + if (lastProcessedEncodingIndex == xforms.length) { + lastPayload = data; + } + continue; + case 'utf-8': + data = Platform.BufferUtils.utf8Decode(data); + continue; + case 'json': + data = JSON.parse(data); + continue; + case 'cipher': + if ( + context.channelOptions != null && + context.channelOptions.cipher && + context.channelOptions.channelCipher + ) { + const xformAlgorithm = match[3], + cipher = context.channelOptions.channelCipher; + /* don't attempt to decrypt unless the cipher params are compatible */ + if (xformAlgorithm != cipher.algorithm) { + throw new Error('Unable to decrypt message with given cipher; incompatible cipher params'); + } + data = await cipher.decrypt(data); + continue; + } else { + throw new Error('Unable to decrypt message; not an encrypted channel'); + } + case 'vcdiff': + if (!context.plugins || !context.plugins.vcdiff) { + throw new ErrorInfo('Missing Vcdiff decoder (https://github.com/ably-forks/vcdiff-decoder)', 40019, 400); + } + if (typeof Uint8Array === 'undefined') { + throw new ErrorInfo( + 'Delta decoding not supported on this browser (need ArrayBuffer & Uint8Array)', + 40020, + 400, + ); + } + try { + let deltaBase = context.baseEncodedPreviousPayload; + if (typeof deltaBase === 'string') { + deltaBase = Platform.BufferUtils.utf8Encode(deltaBase); + } + + // vcdiff expects Uint8Arrays, can't copy with ArrayBuffers. + const deltaBaseBuffer = Platform.BufferUtils.toBuffer(deltaBase as Buffer); + data = Platform.BufferUtils.toBuffer(data); + + data = Platform.BufferUtils.arrayBufferViewToBuffer(context.plugins.vcdiff.decode(data, deltaBaseBuffer)); + lastPayload = data; + } catch (e) { + throw new ErrorInfo('Vcdiff delta decode failed with ' + e, 40018, 400); + } + continue; + default: + throw new Error('Unknown encoding'); + } + } + } catch (e) { + const err = e as ErrorInfo; + throw new ErrorInfo( + 'Error processing the ' + xform + ' encoding, decoder returned ‘' + err.message + '’', + err.code || 40013, + 400, + ); + } finally { + message.encoding = + (lastProcessedEncodingIndex as number) <= 0 ? null : xforms.slice(0, lastProcessedEncodingIndex).join('/'); + message.data = data; + } + } + context.baseEncodedPreviousPayload = lastPayload; +} + +export function wireToJSON(this: BaseMessage, ...args: any[]): any { + /* encode data to base64 if present and we're returning real JSON; + * although msgpack calls toJSON(), we know it is a stringify() + * call if it has a non-empty arguments list */ + let encoding = this.encoding; + let data = this.data; + if (data && Platform.BufferUtils.isBuffer(data)) { + if (args.length > 0) { + /* stringify call */ + encoding = encoding ? encoding + '/base64' : 'base64'; + data = Platform.BufferUtils.base64Encode(data); + } else { + /* Called by msgpack. toBuffer returns a datatype understandable by + * that platform's msgpack implementation (Buffer in node, Uint8Array + * in browsers) */ + data = Platform.BufferUtils.toBuffer(data); + } + } + return Object.assign({}, this, { encoding, data }); +} + +// in-place, generally called on the protocol message before decoding +export function populateFieldsFromParent(parent: ProtocolMessage) { + let msgs: BaseMessage[]; + switch (parent.action) { + case actions.MESSAGE: + msgs = parent.messages!; + break; + case actions.PRESENCE: + case actions.SYNC: + msgs = parent.presence!; + break; + default: + throw new ErrorInfo('Unexpected action ' + parent.action, 40000, 400); + } + + const { id, connectionId, timestamp } = parent; + for (let i = 0; i < msgs.length; i++) { + const msg = msgs[i]; + if (!msg.connectionId) msg.connectionId = connectionId; + if (!msg.timestamp) msg.timestamp = timestamp; + if (id && !msg.id) msg.id = id + ':' + i; + } +} + +export function strMsg(m: any, cls: string) { + let result = '[' + cls; + for (const attr in m) { + if (attr === 'data') { + if (typeof m.data == 'string') { + result += '; data=' + m.data; + } else if (Platform.BufferUtils.isBuffer(m.data)) { + result += '; data (buffer)=' + Platform.BufferUtils.base64Encode(m.data); + } else { + result += '; data (json)=' + JSON.stringify(m.data); + } + } else if (attr && (attr === 'extras' || attr === 'operation')) { + result += '; ' + attr + '=' + JSON.stringify(m[attr]); + } else if (m[attr] !== undefined) { + result += '; ' + attr + '=' + m[attr]; + } + } + result += ']'; + return result; +} + +export abstract class BaseMessage { + id?: string; + timestamp?: number; + clientId?: string; + connectionId?: string; + data?: any; + encoding?: string | null; + extras?: any; + size?: number; +} diff --git a/src/common/lib/types/defaultmessage.ts b/src/common/lib/types/defaultmessage.ts index 1161e909c..41221f088 100644 --- a/src/common/lib/types/defaultmessage.ts +++ b/src/common/lib/types/defaultmessage.ts @@ -1,18 +1,6 @@ -import Message, { - WireProtocolMessage, - CipherOptions, - decode, - encode, - EncodingDecodingContext, - fromEncoded, - fromEncodedArray, - fromValues, - fromWireProtocol, -} from './message'; +import Message, { WireMessage, fromEncoded, fromEncodedArray } from './message'; import * as API from '../../../../ably'; import Platform from 'common/platform'; -import PresenceMessage from './presencemessage'; -import { ChannelOptions } from 'common/types/channel'; import Logger from '../util/logger'; import type { Properties } from '../util/utils'; @@ -21,33 +9,14 @@ import type { Properties } from '../util/utils'; */ export class DefaultMessage extends Message { static async fromEncoded(encoded: unknown, inputOptions?: API.ChannelOptions): Promise { - return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireProtocolMessage, inputOptions); + return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireMessage, inputOptions); } static async fromEncodedArray(encodedArray: Array, options?: API.ChannelOptions): Promise { - return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WireProtocolMessage[], options); + return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WireMessage[], options); } - // Used by tests static fromValues(values: Properties): Message { - return fromValues(values); - } - - // Used by tests - static fromWireProtocol(values: WireProtocolMessage): Message { - return fromWireProtocol(values); - } - - // Used by tests - static async encode(msg: T, options: CipherOptions): Promise { - return encode(msg, options); - } - - // Used by tests - static async decode( - message: Message | PresenceMessage, - inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, - ): Promise { - return decode(message, inputContext); + return Message.fromValues(values); } } diff --git a/src/common/lib/types/defaultpresencemessage.ts b/src/common/lib/types/defaultpresencemessage.ts index a5ef3877d..cc5a64424 100644 --- a/src/common/lib/types/defaultpresencemessage.ts +++ b/src/common/lib/types/defaultpresencemessage.ts @@ -1,11 +1,6 @@ import * as API from '../../../../ably'; import Logger from '../util/logger'; -import PresenceMessage, { - fromEncoded, - fromEncodedArray, - fromValues, - WireProtocolPresenceMessage, -} from './presencemessage'; +import PresenceMessage, { fromEncoded, fromEncodedArray, WirePresenceMessage } from './presencemessage'; import Platform from 'common/platform'; import type { Properties } from '../util/utils'; @@ -14,22 +9,17 @@ import type { Properties } from '../util/utils'; */ export class DefaultPresenceMessage extends PresenceMessage { static async fromEncoded(encoded: unknown, inputOptions?: API.ChannelOptions): Promise { - return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireProtocolPresenceMessage, inputOptions); + return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WirePresenceMessage, inputOptions); } static async fromEncodedArray( encodedArray: Array, options?: API.ChannelOptions, ): Promise { - return fromEncodedArray( - Logger.defaultLogger, - Platform.Crypto, - encodedArray as WireProtocolPresenceMessage[], - options, - ); + return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WirePresenceMessage[], options); } static fromValues(values: Properties): PresenceMessage { - return fromValues(values); + return PresenceMessage.fromValues(values); } } diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 4946f34d8..99b02efa7 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -1,9 +1,15 @@ -import Platform from 'common/platform'; import Logger from '../util/logger'; -import ErrorInfo from './errorinfo'; -import PresenceMessage from './presencemessage'; +import { + BaseMessage, + encode, + decode, + wireToJSON, + normalizeCipherOptions, + EncodingDecodingContext, + CipherOptions, + strMsg, +} from './basemessage'; import * as Utils from '../util/utils'; -import { Bufferlike as BrowserBufferlike } from '../../../platform/web/lib/util/bufferutils'; import * as API from '../../../../ably'; import type { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; @@ -11,83 +17,22 @@ import type { ChannelOptions } from '../../types/channel'; import type { Properties } from '../util/utils'; import type RestChannel from '../client/restchannel'; import type RealtimeChannel from '../client/realtimechannel'; +import type ErrorInfo from './errorinfo'; type Channel = RestChannel | RealtimeChannel; -const MessageActionArray: API.MessageAction[] = [ +const actions: API.MessageAction[] = [ 'message.create', 'message.update', 'message.delete', 'meta.occupancy', - 'message.summary' + 'message.summary', ]; -const MessageActionMap = new Map(MessageActionArray.map((action, index) => [action, index])); - -const ReverseMessageActionMap = new Map( - MessageActionArray.map((action, index) => [index, action]), -); - -function toMessageActionString(actionNumber: number): API.MessageAction | undefined { - return ReverseMessageActionMap.get(actionNumber); -} - -function toMessageActionNumber(messageAction?: API.MessageAction): number | undefined { - return messageAction ? MessageActionMap.get(messageAction) : undefined; -} - -export type CipherOptions = { - channelCipher: { - encrypt: Function; - algorithm: 'aes'; - }; - cipher?: { - channelCipher: { - encrypt: Function; - algorithm: 'aes'; - }; - }; -}; - -export type EncodingDecodingContext = { - channelOptions: ChannelOptions; - plugins: { - vcdiff?: { - decode: (delta: Uint8Array, source: Uint8Array) => Uint8Array; - }; - }; - baseEncodedPreviousPayload?: Buffer | BrowserBufferlike; -}; - -export type WireProtocolMessage = Omit & { action: number }; - -function normaliseContext(context: CipherOptions | EncodingDecodingContext | ChannelOptions): EncodingDecodingContext { - if (!context || !(context as EncodingDecodingContext).channelOptions) { - return { - channelOptions: context as ChannelOptions, - plugins: {}, - baseEncodedPreviousPayload: undefined, - }; - } - return context as EncodingDecodingContext; +function stringifyAction(action: number | undefined): string { + return actions[action || 0] || 'unknown'; } -export function normalizeCipherOptions( - Crypto: IUntypedCryptoStatic | null, - logger: Logger, - options: API.ChannelOptions | null, -): ChannelOptions { - if (options && options.cipher) { - if (!Crypto) Utils.throwMissingPluginError('Crypto'); - const cipher = Crypto.getCipher(options.cipher, logger); - return { - cipher: cipher.cipherParams, - channelCipher: cipher.cipher, - }; - } - return options ?? {}; -} - -function getMessageSize(msg: Message) { +function getMessageSize(msg: WireMessage) { let size = 0; if (msg.name) { size += msg.name.length; @@ -107,25 +52,18 @@ function getMessageSize(msg: Message) { export async function fromEncoded( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encoded: WireProtocolMessage, + encoded: Properties, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromWireProtocol(encoded); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); - /* if decoding fails at any point, catch and return the message decoded to - * the fullest extent possible */ - try { - await decode(msg, options); - } catch (e) { - Logger.logAction(logger, Logger.LOG_ERROR, 'Message.fromEncoded()', (e as Error).toString()); - } - return msg; + const wm = WireMessage.fromValues(encoded); + return wm.decode(options, logger); } export async function fromEncodedArray( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encodedArray: Array, + encodedArray: Array, options?: API.ChannelOptions, ): Promise { return Promise.all( @@ -137,17 +75,12 @@ export async function fromEncodedArray( // these forms of the functions are used internally when we have a channel instance // already, so don't need to normalise channel options -export async function _fromEncoded(encoded: WireProtocolMessage, channel: Channel): Promise { - const msg = fromWireProtocol(encoded); - try { - await decode(msg, channel.channelOptions); - } catch (e) { - Logger.logAction(channel.logger, Logger.LOG_ERROR, 'Message._fromEncoded()', (e as Error).toString()); - } - return msg; +export async function _fromEncoded(encoded: Properties, channel: Channel): Promise { + const wm = WireMessage.fromValues(encoded); + return wm.decode(channel.channelOptions, channel.logger); } -export async function _fromEncodedArray(encodedArray: WireProtocolMessage[], channel: Channel): Promise { +export async function _fromEncodedArray(encodedArray: Properties[], channel: Channel): Promise { return Promise.all( encodedArray.map(function (encoded) { return _fromEncoded(encoded, channel); @@ -155,165 +88,15 @@ export async function _fromEncodedArray(encodedArray: WireProtocolMessage[], cha ); } -async function encrypt(msg: T, options: CipherOptions): Promise { - let data = msg.data, - encoding = msg.encoding, - cipher = options.channelCipher; - - encoding = encoding ? encoding + '/' : ''; - if (!Platform.BufferUtils.isBuffer(data)) { - data = Platform.BufferUtils.utf8Encode(String(data)); - encoding = encoding + 'utf-8/'; - } - const ciphertext = await cipher.encrypt(data); - msg.data = ciphertext; - msg.encoding = encoding + 'cipher+' + cipher.algorithm; - return msg; -} - -export async function encode(msg: T, options: CipherOptions): Promise { - const data = msg.data; - const nativeDataType = - typeof data == 'string' || Platform.BufferUtils.isBuffer(data) || data === null || data === undefined; - - if (!nativeDataType) { - if (Utils.isObject(data) || Array.isArray(data)) { - msg.data = JSON.stringify(data); - msg.encoding = msg.encoding ? msg.encoding + '/json' : 'json'; - } else { - throw new ErrorInfo('Data type is unsupported', 40013, 400); - } - } - - if (options != null && options.cipher) { - return encrypt(msg, options); - } else { - return msg; - } -} - -export async function encodeArray(messages: Array, options: CipherOptions): Promise> { - return Promise.all(messages.map((message) => encode(message, options))); +export async function encodeArray(messages: Array, options: CipherOptions): Promise> { + return Promise.all(messages.map((message) => message.encode(options))); } export const serialize = Utils.encodeBody; -export async function decode( - message: Message | PresenceMessage, - inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, -): Promise { - const context = normaliseContext(inputContext); - - let lastPayload = message.data; - const encoding = message.encoding; - if (encoding) { - const xforms = encoding.split('/'); - let lastProcessedEncodingIndex, - encodingsToProcess = xforms.length, - data = message.data; - - let xform = ''; - try { - while ((lastProcessedEncodingIndex = encodingsToProcess) > 0) { - // eslint-disable-next-line security/detect-unsafe-regex - const match = xforms[--encodingsToProcess].match(/([-\w]+)(\+([\w-]+))?/); - if (!match) break; - xform = match[1]; - switch (xform) { - case 'base64': - data = Platform.BufferUtils.base64Decode(String(data)); - if (lastProcessedEncodingIndex == xforms.length) { - lastPayload = data; - } - continue; - case 'utf-8': - data = Platform.BufferUtils.utf8Decode(data); - continue; - case 'json': - data = JSON.parse(data); - continue; - case 'cipher': - if ( - context.channelOptions != null && - context.channelOptions.cipher && - context.channelOptions.channelCipher - ) { - const xformAlgorithm = match[3], - cipher = context.channelOptions.channelCipher; - /* don't attempt to decrypt unless the cipher params are compatible */ - if (xformAlgorithm != cipher.algorithm) { - throw new Error('Unable to decrypt message with given cipher; incompatible cipher params'); - } - data = await cipher.decrypt(data); - continue; - } else { - throw new Error('Unable to decrypt message; not an encrypted channel'); - } - case 'vcdiff': - if (!context.plugins || !context.plugins.vcdiff) { - throw new ErrorInfo('Missing Vcdiff decoder (https://github.com/ably-forks/vcdiff-decoder)', 40019, 400); - } - if (typeof Uint8Array === 'undefined') { - throw new ErrorInfo( - 'Delta decoding not supported on this browser (need ArrayBuffer & Uint8Array)', - 40020, - 400, - ); - } - try { - let deltaBase = context.baseEncodedPreviousPayload; - if (typeof deltaBase === 'string') { - deltaBase = Platform.BufferUtils.utf8Encode(deltaBase); - } - - // vcdiff expects Uint8Arrays, can't copy with ArrayBuffers. - const deltaBaseBuffer = Platform.BufferUtils.toBuffer(deltaBase as Buffer); - data = Platform.BufferUtils.toBuffer(data); - - data = Platform.BufferUtils.arrayBufferViewToBuffer(context.plugins.vcdiff.decode(data, deltaBaseBuffer)); - lastPayload = data; - } catch (e) { - throw new ErrorInfo('Vcdiff delta decode failed with ' + e, 40018, 400); - } - continue; - default: - throw new Error('Unknown encoding'); - } - } - } catch (e) { - const err = e as ErrorInfo; - throw new ErrorInfo( - 'Error processing the ' + xform + ' encoding, decoder returned ‘' + err.message + '’', - err.code || 40013, - 400, - ); - } finally { - message.encoding = - (lastProcessedEncodingIndex as number) <= 0 ? null : xforms.slice(0, lastProcessedEncodingIndex).join('/'); - message.data = data; - } - } - context.baseEncodedPreviousPayload = lastPayload; -} - -export function fromValues(values: Properties): Message { - return Object.assign(new Message(), values); -} - -export function fromWireProtocol(values: WireProtocolMessage): Message { - const action = toMessageActionString(values.action as number) || values.action; - const res = Object.assign(new Message(), { ...values, action }); - res.expandFields(); - return res; -} - -export function fromValuesArray(values: Properties[]): Message[] { - return values.map(fromValues); -} - /* This should be called on encode()d (and encrypt()d) Messages (as it * assumes the data is a string or buffer) */ -export function getMessagesSize(messages: Message[]): number { +export function getMessagesSize(messages: WireMessage[]): number { let msg, total = 0; for (let i = 0; i < messages.length; i++) { @@ -323,17 +106,9 @@ export function getMessagesSize(messages: Message[]): number { return total; } -class Message { +class Message extends BaseMessage { name?: string; - id?: string; - timestamp?: number; - clientId?: string; - connectionId?: string; connectionKey?: string; - data?: any; - encoding?: string | null; - extras?: any; - size?: number; action?: API.MessageAction; serial?: string; refSerial?: string; @@ -342,47 +117,6 @@ class Message { version?: string; operation?: API.Operation; - /** - * Overload toJSON() to intercept JSON.stringify() - * @return {*} - */ - toJSON() { - /* encode data to base64 if present and we're returning real JSON; - * although msgpack calls toJSON(), we know it is a stringify() - * call if it has a non-empty arguments list */ - let encoding = this.encoding; - let data = this.data; - if (data && Platform.BufferUtils.isBuffer(data)) { - if (arguments.length > 0) { - /* stringify call */ - encoding = encoding ? encoding + '/base64' : 'base64'; - data = Platform.BufferUtils.base64Encode(data); - } else { - /* Called by msgpack. toBuffer returns a datatype understandable by - * that platform's msgpack implementation (Buffer in node, Uint8Array - * in browsers) */ - data = Platform.BufferUtils.toBuffer(data); - } - } - return { - name: this.name, - id: this.id, - clientId: this.clientId, - connectionId: this.connectionId, - connectionKey: this.connectionKey, - extras: this.extras, - serial: this.serial, - action: toMessageActionNumber(this.action as API.MessageAction) || this.action, - refSerial: this.refSerial, - refType: this.refType, - createdAt: this.createdAt, - version: this.version, - operation: this.operation, - encoding, - data, - }; - } - expandFields() { if (this.action === 'message.create') { // TM2k @@ -396,32 +130,80 @@ class Message { } } - toString(): string { - let result = '[Message'; - if (this.name) result += '; name=' + this.name; - if (this.id) result += '; id=' + this.id; - if (this.timestamp) result += '; timestamp=' + this.timestamp; - if (this.clientId) result += '; clientId=' + this.clientId; - if (this.connectionId) result += '; connectionId=' + this.connectionId; - if (this.encoding) result += '; encoding=' + this.encoding; - if (this.extras) result += '; extras =' + JSON.stringify(this.extras); - if (this.data) { - if (typeof this.data == 'string') result += '; data=' + this.data; - else if (Platform.BufferUtils.isBuffer(this.data)) - result += '; data (buffer)=' + Platform.BufferUtils.base64Encode(this.data); - else result += '; data (json)=' + JSON.stringify(this.data); + async encode(options: CipherOptions): Promise { + const res = Object.assign(new WireMessage(), this, { + action: actions.indexOf(this.action || 'message.create'), + }); + return encode(res, options); + } + + static fromValues(values: Properties): Message { + return Object.assign(new Message(), values); + } + + static fromValuesArray(values: Properties[]): Message[] { + return values.map(Message.fromValues); + } + + toString() { + return strMsg(this, 'Message'); + } +} + +export class WireMessage extends BaseMessage { + name?: string; + connectionKey?: string; + action?: number; + serial?: string; + refSerial?: string; + refType?: string; + createdAt?: number; + version?: string; + operation?: API.Operation; + + // Overload toJSON() to intercept JSON.stringify() + toJSON(...args: any[]) { + return wireToJSON.call(this, ...args); + } + + static fromValues(values: Properties): WireMessage { + return Object.assign(new WireMessage(), values); + } + + static fromValuesArray(values: Properties[]): WireMessage[] { + return values.map(WireMessage.fromValues); + } + + // for contexts where some decoding errors need to be handled specially by the caller + async decodeWithErr( + inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, + logger: Logger, + ): Promise<{ decoded: Message; err: ErrorInfo | undefined }> { + const res: Message = Object.assign(new Message(), { + ...this, + action: stringifyAction(this.action), + }); + let err: ErrorInfo | undefined; + try { + await decode(res, inputContext); + } catch (e) { + Logger.logAction(logger, Logger.LOG_ERROR, 'WireMessage.decode()', Utils.inspectError(e)); + err = e as ErrorInfo; } - if (this.extras) result += '; extras=' + JSON.stringify(this.extras); + res.expandFields(); + return { decoded: res, err: err }; + } + + async decode( + inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, + logger: Logger, + ): Promise { + const { decoded } = await this.decodeWithErr(inputContext, logger); + return decoded; + } - if (this.action) result += '; action=' + this.action; - if (this.serial) result += '; serial=' + this.serial; - if (this.version) result += '; version=' + this.version; - if (this.refSerial) result += '; refSerial=' + this.refSerial; - if (this.refType) result += '; refType=' + this.refType; - if (this.createdAt) result += '; createdAt=' + this.createdAt; - if (this.operation) result += '; operation=' + JSON.stringify(this.operation); - result += ']'; - return result; + toString() { + return strMsg(this, 'WireMessage'); } } diff --git a/src/common/lib/types/presencemessage.ts b/src/common/lib/types/presencemessage.ts index 62ac6c073..f736d21e4 100644 --- a/src/common/lib/types/presencemessage.ts +++ b/src/common/lib/types/presencemessage.ts @@ -1,44 +1,32 @@ import Logger from '../util/logger'; -import Platform from 'common/platform'; -import { normalizeCipherOptions, encode as encodeMessage, decode as decodeMessage, getMessagesSize } from './message'; +import { BaseMessage, encode, decode, wireToJSON, normalizeCipherOptions, CipherOptions, strMsg } from './basemessage'; import * as API from '../../../../ably'; +import * as Utils from '../util/utils'; import type { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; import type { Properties } from '../util/utils'; import type RestChannel from '../client/restchannel'; import type RealtimeChannel from '../client/realtimechannel'; +import type { ChannelOptions } from '../../types/channel'; type Channel = RestChannel | RealtimeChannel; const actions = ['absent', 'present', 'enter', 'leave', 'update']; -export type WireProtocolPresenceMessage = Omit & { action: number }; - -function toActionValue(actionString: string) { - return actions.indexOf(actionString); -} - export async function fromEncoded( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encoded: WireProtocolPresenceMessage, + encoded: WirePresenceMessage, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromWireProtocol(encoded); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); - /* if decoding fails at any point, catch and return the message decoded to - * the fullest extent possible */ - try { - await decode(msg, options ?? {}); - } catch (e) { - Logger.logAction(logger, Logger.LOG_ERROR, 'PresenceMessage.fromEncoded()', (e as Error).toString()); - } - return msg; + const wpm = WirePresenceMessage.fromValues(encoded); + return wpm.decode(options, logger); } export async function fromEncodedArray( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encodedArray: WireProtocolPresenceMessage[], + encodedArray: WirePresenceMessage[], options?: API.ChannelOptions, ): Promise { return Promise.all( @@ -50,18 +38,15 @@ export async function fromEncodedArray( // these forms of the functions are used internally when we have a channel instance // already, so don't need to normalise channel options -export async function _fromEncoded(encoded: WireProtocolPresenceMessage, channel: Channel): Promise { - const msg = fromWireProtocol(encoded); - try { - await decode(msg, channel.channelOptions); - } catch (e) { - Logger.logAction(channel.logger, Logger.LOG_ERROR, 'PresenceMessage._fromEncoded()', (e as Error).toString()); - } - return msg; +export async function _fromEncoded( + encoded: Properties, + channel: Channel, +): Promise { + return WirePresenceMessage.fromValues(encoded).decode(channel.channelOptions, channel.logger); } export async function _fromEncodedArray( - encodedArray: WireProtocolPresenceMessage[], + encodedArray: Properties[], channel: Channel, ): Promise { return Promise.all( @@ -71,43 +56,8 @@ export async function _fromEncodedArray( ); } -export function fromValues(values: Properties): PresenceMessage { - return Object.assign(new PresenceMessage(), values); -} - -export function fromWireProtocol(values: WireProtocolPresenceMessage): PresenceMessage { - const action = actions[values.action]; - return Object.assign(new PresenceMessage(), { ...values, action }); -} - -export { encodeMessage as encode }; -export const decode = decodeMessage; - -export function fromValuesArray(values: Properties[]): PresenceMessage[] { - return values.map(fromValues); -} - -export function fromData(data: any): PresenceMessage { - if (data instanceof PresenceMessage) { - return data; - } - return fromValues({ - data, - }); -} - -export { getMessagesSize }; - -class PresenceMessage { +class PresenceMessage extends BaseMessage { action?: string; - id?: string; - timestamp?: number; - clientId?: string; - connectionId?: string; - data?: string | Buffer | Uint8Array; - encoding?: string; - extras?: any; - size?: number; /* Returns whether this presenceMessage is synthesized, i.e. was not actually * sent by the connection (usually means a leave event sent 15s after a @@ -132,65 +82,65 @@ class PresenceMessage { }; } - /** - * Overload toJSON() to intercept JSON.stringify() - * @return {*} - */ - toJSON(): { - id?: string; - clientId?: string; - action: number; - data: string | Buffer | Uint8Array; - encoding?: string; - extras?: any; - } { - /* encode data to base64 if present and we're returning real JSON; - * although msgpack calls toJSON(), we know it is a stringify() - * call if it has a non-empty arguments list */ - let data = this.data as string | Buffer | Uint8Array; - let encoding = this.encoding; - if (data && Platform.BufferUtils.isBuffer(data)) { - if (arguments.length > 0) { - /* stringify call */ - encoding = encoding ? encoding + '/base64' : 'base64'; - data = Platform.BufferUtils.base64Encode(data); - } else { - /* Called by msgpack. toBuffer returns a datatype understandable by - * that platform's msgpack implementation (Buffer in node, Uint8Array - * in browsers) */ - data = Platform.BufferUtils.toBuffer(data); - } - } - return { - id: this.id, - clientId: this.clientId, - /* Convert presence action back to an int for sending to Ably */ - action: toActionValue(this.action as string), - data: data, - encoding: encoding, - extras: this.extras, - }; + async encode(options: CipherOptions): Promise { + const res = Object.assign(new WirePresenceMessage(), this, { + action: actions.indexOf(this.action || 'present'), + }); + return encode(res, options); + } + + static fromValues(values: Properties): PresenceMessage { + return Object.assign(new PresenceMessage(), values); + } + + static fromValuesArray(values: Properties[]): PresenceMessage[] { + return values.map(PresenceMessage.fromValues); } - toString(): string { - let result = '[PresenceMessage'; - result += '; action=' + this.action; - if (this.id) result += '; id=' + this.id; - if (this.timestamp) result += '; timestamp=' + this.timestamp; - if (this.clientId) result += '; clientId=' + this.clientId; - if (this.connectionId) result += '; connectionId=' + this.connectionId; - if (this.encoding) result += '; encoding=' + this.encoding; - if (this.data) { - if (typeof this.data == 'string') result += '; data=' + this.data; - else if (Platform.BufferUtils.isBuffer(this.data)) - result += '; data (buffer)=' + Platform.BufferUtils.base64Encode(this.data); - else result += '; data (json)=' + JSON.stringify(this.data); + static fromData(data: any): PresenceMessage { + if (data instanceof PresenceMessage) { + return data; } - if (this.extras) { - result += '; extras=' + JSON.stringify(this.extras); + return PresenceMessage.fromValues({ + data, + }); + } + + toString() { + return strMsg(this, 'PresenceMessage'); + } +} + +export class WirePresenceMessage extends BaseMessage { + action?: number; + + toJSON(...args: any[]) { + return wireToJSON.call(this, ...args); + } + + static fromValues(values: Properties): WirePresenceMessage { + return Object.assign(new WirePresenceMessage(), values); + } + + static fromValuesArray(values: Properties[]): WirePresenceMessage[] { + return values.map(WirePresenceMessage.fromValues); + } + + async decode(channelOptions: ChannelOptions, logger: Logger): Promise { + const res = Object.assign(new PresenceMessage(), { + ...this, + action: actions[this.action!], + }); + try { + await decode(res, channelOptions); + } catch (e) { + Logger.logAction(logger, Logger.LOG_ERROR, 'WirePresenceMessage.decode()', Utils.inspectError(e)); } - result += ']'; - return result; + return res; + } + + toString() { + return strMsg(this, 'WirePresenceMessage'); } } diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index 1912d2002..5d6321777 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -3,60 +3,12 @@ import * as API from '../../../../ably'; import { PresenceMessagePlugin } from '../client/modularplugins'; import * as Utils from '../util/utils'; import ErrorInfo from './errorinfo'; -import Message, { - fromWireProtocol as messageFromWireProtocol, - fromValuesArray as messagesFromValuesArray, - WireProtocolMessage, -} from './message'; -import PresenceMessage, { - fromWireProtocol as presenceMessageFromWireProtocol, - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - WireProtocolPresenceMessage, -} from './presencemessage'; - -export const actions = { - HEARTBEAT: 0, - ACK: 1, - NACK: 2, - CONNECT: 3, - CONNECTED: 4, - DISCONNECT: 5, - DISCONNECTED: 6, - CLOSE: 7, - CLOSED: 8, - ERROR: 9, - ATTACH: 10, - ATTACHED: 11, - DETACH: 12, - DETACHED: 13, - PRESENCE: 14, - MESSAGE: 15, - SYNC: 16, - AUTH: 17, - ACTIVATE: 18, -}; - -export const ActionName: string[] = []; -Object.keys(actions).forEach(function (name) { - ActionName[(actions as { [key: string]: number })[name]] = name; -}); - -const flags: { [key: string]: number } = { - /* Channel attach state flags */ - HAS_PRESENCE: 1 << 0, - HAS_BACKLOG: 1 << 1, - RESUMED: 1 << 2, - TRANSIENT: 1 << 4, - ATTACH_RESUME: 1 << 5, - /* Channel mode flags */ - PRESENCE: 1 << 16, - PUBLISH: 1 << 17, - SUBSCRIBE: 1 << 18, - PRESENCE_SUBSCRIBE: 1 << 19, -}; -const flagNames = Object.keys(flags); -flags.MODE_ALL = flags.PRESENCE | flags.PUBLISH | flags.SUBSCRIBE | flags.PRESENCE_SUBSCRIBE; +import { WireMessage } from './message'; +import PresenceMessage, { WirePresenceMessage } from './presencemessage'; +import { flags, flagNames, channelModes, ActionName } from './protocolmessagecommon'; +import type { Properties } from '../util/utils'; + +export const serialize = Utils.encodeBody; function toStringArray(array?: any[]): string { const result = []; @@ -68,10 +20,6 @@ function toStringArray(array?: any[]): string { return '[ ' + result.join(', ') + ' ]'; } -export const channelModes = ['PRESENCE', 'PUBLISH', 'SUBSCRIBE', 'PRESENCE_SUBSCRIBE']; - -export const serialize = Utils.encodeBody; - export function deserialize( serialized: unknown, MsgPack: MsgPack | null, @@ -86,35 +34,31 @@ export function fromDeserialized( deserialized: Record, presenceMessagePlugin: PresenceMessagePlugin | null, ): ProtocolMessage { - const error = deserialized.error; - if (error) { - deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); + let error: ErrorInfo | undefined; + if (deserialized.error) { + error = ErrorInfo.fromValues(deserialized.error as ErrorInfo); } - let messages: Message[] | undefined; + let messages: WireMessage[] | undefined; if (deserialized.messages) { - const dm = deserialized.messages as WireProtocolMessage[]; - messages = dm.map((m) => messageFromWireProtocol(m)); + messages = WireMessage.fromValuesArray(deserialized.messages as Array>); } - let presence: PresenceMessage[] | undefined; + let presence: WirePresenceMessage[] | undefined; if (presenceMessagePlugin && deserialized.presence) { - const dp = deserialized.presence as WireProtocolPresenceMessage[]; - presence = dp.map((pm) => presenceMessagePlugin.presenceMessageFromWireProtocol(pm)); + presence = presenceMessagePlugin.WirePresenceMessage.fromValuesArray( + deserialized.presence as Array>, + ); } - return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages }); + return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages, error }); } /** * Used by the tests. */ export function fromDeserializedIncludingDependencies(deserialized: Record): ProtocolMessage { - return fromDeserialized(deserialized, { - presenceMessageFromValues, - presenceMessagesFromValuesArray, - presenceMessageFromWireProtocol, - }); + return fromDeserialized(deserialized, { PresenceMessage, WirePresenceMessage }); } export function fromValues(values: unknown): ProtocolMessage { @@ -132,9 +76,9 @@ export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin if (msg[attribute] !== undefined) result += '; ' + attribute + '=' + msg[attribute]; } - if (msg.messages) result += '; messages=' + toStringArray(messagesFromValuesArray(msg.messages)); + if (msg.messages) result += '; messages=' + toStringArray(WireMessage.fromValuesArray(msg.messages)); if (msg.presence && presenceMessagePlugin) - result += '; presence=' + toStringArray(presenceMessagePlugin.presenceMessagesFromValuesArray(msg.presence)); + result += '; presence=' + toStringArray(presenceMessagePlugin.WirePresenceMessage.fromValuesArray(msg.presence)); if (msg.error) result += '; error=' + ErrorInfo.fromValues(msg.error).toString(); if (msg.auth && msg.auth.accessToken) result += '; token=' + msg.auth.accessToken; if (msg.flags) result += '; flags=' + flagNames.filter(msg.hasFlag).join(','); @@ -165,9 +109,9 @@ class ProtocolMessage { channel?: string; channelSerial?: string | null; msgSerial?: number; - messages?: Message[]; + messages?: WireMessage[]; // This will be undefined if we skipped decoding this property due to user not requesting presence functionality — see `fromDeserialized` - presence?: PresenceMessage[]; + presence?: WirePresenceMessage[]; auth?: unknown; connectionDetails?: Record; diff --git a/src/common/lib/types/protocolmessagecommon.ts b/src/common/lib/types/protocolmessagecommon.ts new file mode 100644 index 000000000..d6131d8eb --- /dev/null +++ b/src/common/lib/types/protocolmessagecommon.ts @@ -0,0 +1,49 @@ +// constant definitions that can be imported by anyone without worrying about circular +// deps + +export const actions = { + HEARTBEAT: 0, + ACK: 1, + NACK: 2, + CONNECT: 3, + CONNECTED: 4, + DISCONNECT: 5, + DISCONNECTED: 6, + CLOSE: 7, + CLOSED: 8, + ERROR: 9, + ATTACH: 10, + ATTACHED: 11, + DETACH: 12, + DETACHED: 13, + PRESENCE: 14, + MESSAGE: 15, + SYNC: 16, + AUTH: 17, + ACTIVATE: 18, +}; + +export const ActionName: string[] = []; +Object.keys(actions).forEach(function (name) { + ActionName[(actions as { [key: string]: number })[name]] = name; +}); + +export const flags: { [key: string]: number } = { + /* Channel attach state flags */ + HAS_PRESENCE: 1 << 0, + HAS_BACKLOG: 1 << 1, + RESUMED: 1 << 2, + TRANSIENT: 1 << 4, + ATTACH_RESUME: 1 << 5, + /* Channel mode flags */ + PRESENCE: 1 << 16, + PUBLISH: 1 << 17, + SUBSCRIBE: 1 << 18, + PRESENCE_SUBSCRIBE: 1 << 19, +}; + +export const flagNames = Object.keys(flags); + +flags.MODE_ALL = flags.PRESENCE | flags.PUBLISH | flags.SUBSCRIBE | flags.PRESENCE_SUBSCRIBE; + +export const channelModes = ['PRESENCE', 'PUBLISH', 'SUBSCRIBE', 'PRESENCE_SUBSCRIBE']; diff --git a/src/platform/web/modular/presencemessage.ts b/src/platform/web/modular/presencemessage.ts index 40e90830e..1c1cdaeb5 100644 --- a/src/platform/web/modular/presencemessage.ts +++ b/src/platform/web/modular/presencemessage.ts @@ -1,5 +1,5 @@ import * as API from '../../../../ably'; -import { fromEncoded, fromEncodedArray, fromValues } from '../../../common/lib/types/presencemessage'; +import PresenceMessage, { fromEncoded, fromEncodedArray } from '../../../common/lib/types/presencemessage'; import { Crypto } from './crypto'; import Logger from '../../../common/lib/util/logger'; @@ -21,4 +21,4 @@ export const decodeEncryptedPresenceMessages = ((obj, options) => { return fromEncodedArray(Logger.defaultLogger, Crypto, obj, options); }) as API.PresenceMessageStatic['fromEncodedArray']; -export const constructPresenceMessage = fromValues as API.PresenceMessageStatic['fromValues']; +export const constructPresenceMessage = PresenceMessage.fromValues as API.PresenceMessageStatic['fromValues']; diff --git a/src/platform/web/modular/realtimepresence.ts b/src/platform/web/modular/realtimepresence.ts index 4d42fd932..1d3335ce3 100644 --- a/src/platform/web/modular/realtimepresence.ts +++ b/src/platform/web/modular/realtimepresence.ts @@ -1,16 +1,11 @@ import { RealtimePresencePlugin } from 'common/lib/client/modularplugins'; import { default as realtimePresenceClass } from '../../../common/lib/client/realtimepresence'; -import { - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - fromWireProtocol as presenceMessageFromWireProtocol, -} from '../../../common/lib/types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../../../common/lib/types/presencemessage'; const RealtimePresence: RealtimePresencePlugin = { RealtimePresence: realtimePresenceClass, - presenceMessageFromValues, - presenceMessagesFromValuesArray, - presenceMessageFromWireProtocol, + PresenceMessage, + WirePresenceMessage, }; export { RealtimePresence }; diff --git a/test/realtime/crypto.test.js b/test/realtime/crypto.test.js index 9eeca0eef..a5bfea646 100644 --- a/test/realtime/crypto.test.js +++ b/test/realtime/crypto.test.js @@ -264,9 +264,8 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async function (channelOpts, testMessage, encryptedMessage) { /* encrypt plaintext message; encode() also to handle data that is not already string or buffer */ helper.recordPrivateApi('call.Message.encode'); - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { - /* compare */ - testMessageEquality(done, helper, testMessage, encryptedMessage); + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { + testMessageEquality(done, helper, encrypted, encryptedMessage); }); }, ); @@ -288,9 +287,8 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async function (channelOpts, testMessage, encryptedMessage) { /* encrypt plaintext message; encode() also to handle data that is not already string or buffer */ helper.recordPrivateApi('call.Message.encode'); - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { - /* compare */ - testMessageEquality(done, helper, testMessage, encryptedMessage); + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { + testMessageEquality(done, helper, encrypted, encryptedMessage); }); }, ); @@ -387,9 +385,9 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async false, function (channelOpts, testMessage, encryptedMessage, msgpackEncodedMessage) { helper.recordPrivateApi('call.Message.encode'); - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { helper.recordPrivateApi('call.msgpack.encode'); - var msgpackFromEncoded = msgpack.encode(testMessage); + var msgpackFromEncoded = msgpack.encode(encrypted); var msgpackFromEncrypted = msgpack.encode(encryptedMessage); helper.recordPrivateApi('call.BufferUtils.base64Decode'); helper.recordPrivateApi('call.msgpack.decode'); @@ -431,9 +429,9 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async 2, false, function (channelOpts, testMessage, encryptedMessage, msgpackEncodedMessage) { - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { helper.recordPrivateApi('call.msgpack.encode'); - var msgpackFromEncoded = msgpack.encode(testMessage); + var msgpackFromEncoded = msgpack.encode(encrypted); var msgpackFromEncrypted = msgpack.encode(encryptedMessage); helper.recordPrivateApi('call.BufferUtils.base64Decode'); helper.recordPrivateApi('call.msgpack.decode'); diff --git a/test/realtime/message.test.js b/test/realtime/message.test.js index 9e57327e5..12728b3da 100644 --- a/test/realtime/message.test.js +++ b/test/realtime/message.test.js @@ -1275,7 +1275,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /** * @spec TM2j */ - describe('DefaultMessage.fromWireProtocol', function () { + describe('DefaultMessage.fromEncoded', function () { const testCases = [ { description: 'should stringify the numeric action', @@ -1284,30 +1284,31 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async expectedJSON: { action: 0 }, }, { - description: 'should accept an already stringified action', - action: 'message.update', + description: 'should stringify the numeric action', + action: 1, expectedString: '[Message; action=message.update]', expectedJSON: { action: 1 }, }, { description: 'should handle no action provided', action: undefined, - expectedString: '[Message]', - expectedJSON: { action: undefined }, + expectedString: '[Message; action=message.create]', + expectedJSON: { action: 0 }, }, { description: 'should handle unknown action provided', action: 10, - expectedString: '[Message; action=10]', - expectedJSON: { action: 10 }, + expectedString: '[Message; action=unknown]', }, ]; testCases.forEach(({ description, action, options, expectedString, expectedJSON }) => { - it(description, function () { + it(description, async function () { const values = { action }; - const message = Message.fromWireProtocol(values); + const message = await Message.fromEncoded(values, {}); expect(message.toString()).to.equal(expectedString); - expect(message.toJSON()).to.deep.contains(expectedJSON); + if (expectedJSON) { + expect((await message.encode({})).toJSON()).to.deep.contains(expectedJSON); + } }); }); @@ -1315,17 +1316,17 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * @spec TM2k * @spec TM2o */ - it('create message should fill out serial and createdAt from version/timestamp', function () { - const values = { action: 1, timestamp: 12345, version: 'foo' }; - const message = Message.fromWireProtocol(values); + it('create message should fill out serial and createdAt from version/timestamp', async function () { + const values = { action: 0, timestamp: 12345, version: 'foo' }; + const message = await Message.fromEncoded(values); expect(message.timestamp).to.equal(12345); expect(message.createdAt).to.equal(12345); expect(message.version).to.equal('foo'); expect(message.serial).to.equal('foo'); // should only apply to creates - const update = { action: 2, timestamp: 12345, version: 'foo' }; - const updateMessage = Message.fromWireProtocol(update); + const update = { action: 1, timestamp: 12345, version: 'foo' }; + const updateMessage = await Message.fromEncoded(update); expect(updateMessage.createdAt).to.equal(undefined); expect(updateMessage.serial).to.equal(undefined); }); diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index 231219ea3..4b4696bdb 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -89,7 +89,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }; describe('realtime/presence', function () { - this.timeout(60 * 1000); + this.timeout(20 * 1000); before(function (done) { const helper = Helper.forHook(this); helper.setupApp(function (err) { @@ -1633,13 +1633,15 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async const connId = realtime.connection.connectionManager.connectionId; helper.recordPrivateApi('call.presence._myMembers.put'); - channel.presence._myMembers.put({ - action: 'enter', - clientId: 'two', - connectionId: connId, - id: connId + ':0:0', - data: 'twodata', - }); + channel.presence._myMembers.put( + PresenceMessage.fromValues({ + action: 'present', + clientId: 'two', + connectionId: connId, + id: connId + ':0:0', + data: 'twodata', + }), + ); await helper.becomeSuspended(realtime); @@ -1787,12 +1789,14 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('read.connectionManager.connectionId'); var connId = realtime.connection.connectionManager.connectionId; helper.recordPrivateApi('call.presence._myMembers.put'); - channel.presence._myMembers.put({ - action: 'enter', - clientId: 'me', - connectionId: connId, - id: connId + ':0:0', - }); + channel.presence._myMembers.put( + PresenceMessage.fromValues({ + action: 2, + clientId: 'me', + connectionId: connId, + id: connId + ':0:0', + }), + ); helper.becomeSuspended(realtime, cb); }, function (cb) { @@ -1948,18 +1952,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* Inject an additional member locally */ helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: Date.now(), - presence: [ - { - clientId: goneClientId, - action: 'enter', - }, - ], - }) + .processMessage( + createPM({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: Date.now(), + presence: [ + { + clientId: goneClientId, + action: 2, + }, + ], + }), + ) .then(function () { cb(null); }) @@ -2043,18 +2049,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* Inject a member locally */ helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: Date.now(), - presence: [ - { - clientId: fakeClientId, - action: 'enter', - }, - ], - }) + .processMessage( + createPM({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: Date.now(), + presence: [ + { + clientId: fakeClientId, + action: 2, + }, + ], + }), + ) .then(function () { cb(); }) diff --git a/test/realtime/sync.test.js b/test/realtime/sync.test.js index dccbdeff5..40437ed84 100644 --- a/test/realtime/sync.test.js +++ b/test/realtime/sync.test.js @@ -70,26 +70,28 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async function (cb) { helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }) + .processMessage( + createPM({ + action: 16, + channel: channelName, + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ) .then(function () { cb(); }) @@ -114,26 +116,28 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* Trigger another sync. Two has gone without so much as a `leave` message! */ helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - { - action: 'present', - clientId: 'three', - connectionId: 'three_connid', - id: 'three_connid:0:0', - timestamp: 1e12, - }, - ], - }) + .processMessage( + createPM({ + action: 16, + channel: channelName, + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + { + action: 1, + clientId: 'three', + connectionId: 'three_connid', + id: 'three_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ) .then(function () { cb(); }) @@ -191,67 +195,75 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* First sync */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); /* A second sync, this time in multiple parts, with a presence message in the middle */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:cursor', - presence: [ - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:cursor', + presence: [ + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - presence: [ - { - action: 'enter', - clientId: 'three', - connectionId: 'three_connid', - id: 'three_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + presence: [ + { + action: 2, + clientId: 'three', + connectionId: 'three_connid', + id: 'three_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:', - presence: [ - { - action: 'present', - clientId: 'four', - connectionId: 'four_connid', - id: 'four_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:', + presence: [ + { + action: 1, + clientId: 'four', + connectionId: 'four_connid', + id: 'four_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -302,51 +314,57 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:cursor', - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:cursor', + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - presence: [ - { - action: 'enter', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + presence: [ + { + action: 2, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:', - presence: [ - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:', + presence: [ + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -394,51 +412,57 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:cursor', - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:cursor', + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - presence: [ - { - action: 'enter', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + presence: [ + { + action: 2, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:', - presence: [ - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:', + presence: [ + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -490,124 +514,138 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* One enters */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - id: 'one_connid:1', - connectionId: 'one_connid', - timestamp: 1e12, - presence: [ - { - action: 'enter', - clientId: 'one', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + id: 'one_connid:1', + connectionId: 'one_connid', + timestamp: 1e12, + presence: [ + { + action: 2, + clientId: 'one', + }, + ], + }), + ); /* An earlier leave from one (should be ignored) */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'one_connid', - id: 'one_connid:0', - timestamp: 1e12, - presence: [ - { - action: 'leave', - clientId: 'one', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'one_connid', + id: 'one_connid:0', + timestamp: 1e12, + presence: [ + { + action: 3, + clientId: 'one', + }, + ], + }), + ); /* One adds some data in a newer msgSerial */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'one_connid', - id: 'one_connid:2', - timestamp: 1e12, - presence: [ - { - action: 'update', - clientId: 'one', - data: 'onedata', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'one_connid', + id: 'one_connid:2', + timestamp: 1e12, + presence: [ + { + action: 4, + clientId: 'one', + data: 'onedata', + }, + ], + }), + ); /* Two enters */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'two_connid', - id: 'two_connid:0', - timestamp: 1e12, - presence: [ - { - action: 'enter', - clientId: 'two', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'two_connid', + id: 'two_connid:0', + timestamp: 1e12, + presence: [ + { + action: 2, + clientId: 'two', + }, + ], + }), + ); /* Two updates twice in the same message */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'two_connid', - id: 'two_connid:0', - timestamp: 1e12, - presence: [ - { - action: 'update', - clientId: 'two', - data: 'twowrongdata', - }, - { - action: 'update', - clientId: 'two', - data: 'twodata', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'two_connid', + id: 'two_connid:0', + timestamp: 1e12, + presence: [ + { + action: 4, + clientId: 'two', + data: 'twowrongdata', + }, + { + action: 4, + clientId: 'two', + data: 'twodata', + }, + ], + }), + ); /* Three enters */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'three_connid', - id: 'three_connid:99', - timestamp: 1e12, - presence: [ - { - action: 'enter', - clientId: 'three', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'three_connid', + id: 'three_connid:99', + timestamp: 1e12, + presence: [ + { + action: 2, + clientId: 'three', + }, + ], + }), + ); /* Synthesized leave for three (with earlier msgSerial, incompatible id, * and later timestamp) */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'synthesized', - id: 'synthesized:0', - timestamp: 1e12 + 1, - presence: [ - { - action: 'leave', - clientId: 'three', - connectionId: 'three_connid', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'synthesized', + id: 'synthesized:0', + timestamp: 1e12 + 1, + presence: [ + { + action: 3, + clientId: 'three', + connectionId: 'three_connid', + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -690,18 +728,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('replace.channel.processMessage'); syncerChannel.processMessage = originalProcessMessage; helper.recordPrivateApi('call.channel.processMessage'); - await syncerChannel.processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: 2000000000000, - presence: [ - { - clientId: interrupterClientId, - action: 'enter', - }, - ], - }); + await syncerChannel.processMessage( + createPM({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: 2000000000000, + presence: [ + { + clientId: interrupterClientId, + action: 2, + }, + ], + }), + ); } }; Helper.whenPromiseSettles(syncerChannel.attach(), cb); diff --git a/test/rest/presence.test.js b/test/rest/presence.test.js index 4f2bfae46..8381fe3c5 100644 --- a/test/rest/presence.test.js +++ b/test/rest/presence.test.js @@ -89,9 +89,9 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async var presenceBool = presenceMessages.find(function (msg) { return msg.clientId == 'client_bool'; }); - expect(JSON.parse(JSON.stringify(presenceBool)).action).to.equal(1); // present + expect(JSON.parse(JSON.stringify(await presenceBool.encode({}))).action).to.equal(1); // present presenceBool.action = 'leave'; - expect(JSON.parse(JSON.stringify(presenceBool)).action).to.equal(3); // leave + expect(JSON.parse(JSON.stringify(await presenceBool.encode({}))).action).to.equal(3); // leave }); /**