Skip to content

Commit

Permalink
Merge pull request #1950 from ably/DTP-1138/map-counter-creates
Browse files Browse the repository at this point in the history
[DTP-1138] Add object-level write API for LiveMap/LiveCounter creation
  • Loading branch information
VeskeR authored Jan 31, 2025
2 parents 8d6fb4e + 5cfbc9b commit 0e63215
Show file tree
Hide file tree
Showing 16 changed files with 990 additions and 175 deletions.
2 changes: 1 addition & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { gzip } from 'zlib';
import Table from 'cli-table';

// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 101, gzip: 31 };
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 102, gzip: 31 };

const baseClientNames = ['BaseRest', 'BaseRealtime'];

Expand Down
33 changes: 9 additions & 24 deletions src/common/lib/client/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ class Auth {
capability = tokenParams.capability || '';

if (!request.timestamp) {
request.timestamp = await this.getTimestamp(authOptions && authOptions.queryTime);
request.timestamp = await this._getTimestamp(authOptions && authOptions.queryTime);
}

/* nonce */
Expand Down Expand Up @@ -832,28 +832,6 @@ class Auth {
}
}

/**
* Get the current time based on the local clock,
* or if the option queryTime is true, return the server time.
* The server time offset from the local time is stored so that
* only one request to the server to get the time is ever needed
*/
async getTimestamp(queryTime: boolean): Promise<number> {
if (!this.isTimeOffsetSet() && (queryTime || this.authOptions.queryTime)) {
return this.client.time();
} else {
return this.getTimestampUsingOffset();
}
}

getTimestampUsingOffset() {
return Date.now() + (this.client.serverTimeOffset || 0);
}

isTimeOffsetSet() {
return this.client.serverTimeOffset !== null;
}

_saveBasicOptions(authOptions: AuthOptions) {
this.method = 'basic';
this.key = authOptions.key;
Expand Down Expand Up @@ -913,7 +891,7 @@ class Auth {
/* RSA4b1 -- if we have a server time offset set already, we can
* automatically remove expired tokens. Else just use the cached token. If it is
* expired Ably will tell us and we'll discard it then. */
if (!this.isTimeOffsetSet() || !token.expires || token.expires >= this.getTimestampUsingOffset()) {
if (!this.client.isTimeOffsetSet() || !token.expires || token.expires >= this.client.getTimestampUsingOffset()) {
Logger.logAction(
this.logger,
Logger.LOG_MINOR,
Expand Down Expand Up @@ -1020,6 +998,13 @@ class Auth {
): Promise<TokenRevocationResult> {
return this.client.rest.revokeTokens(specifiers, options);
}

/**
* Same as {@link BaseClient.getTimestamp} but also takes into account {@link Auth.authOptions}
*/
private async _getTimestamp(queryTime: boolean): Promise<number> {
return this.client.getTimestamp(queryTime || !!this.authOptions.queryTime);
}
}

export default Auth;
22 changes: 22 additions & 0 deletions src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,28 @@ class BaseClient {
this.logger.setLog(logOptions.level, logOptions.handler);
}

/**
* Get the current time based on the local clock,
* or if the option queryTime is true, return the server time.
* The server time offset from the local time is stored so that
* only one request to the server to get the time is ever needed
*/
async getTimestamp(queryTime: boolean): Promise<number> {
if (!this.isTimeOffsetSet() && queryTime) {
return this.time();
}

return this.getTimestampUsingOffset();
}

getTimestampUsingOffset(): number {
return Date.now() + (this.serverTimeOffset || 0);
}

isTimeOffsetSet(): boolean {
return this.serverTimeOffset !== null;
}

static Platform = Platform;

/**
Expand Down
3 changes: 3 additions & 0 deletions src/common/types/IBufferUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export default interface IBufferUtils<Bufferlike, Output, ToBufferOutput> {
toBuffer: (buffer: Bufferlike) => ToBufferOutput;
toArrayBuffer: (buffer: Bufferlike) => ArrayBuffer;
base64Encode: (buffer: Bufferlike) => string;
base64UrlEncode: (buffer: Bufferlike) => string;
base64Decode: (string: string) => Output;
hexEncode: (buffer: Bufferlike) => string;
hexDecode: (string: string) => Output;
Expand All @@ -19,5 +20,7 @@ export default interface IBufferUtils<Bufferlike, Output, ToBufferOutput> {
* Returns ArrayBuffer on browser and Buffer on Node.js
*/
arrayBufferViewToBuffer: (arrayBufferView: ArrayBufferView) => Bufferlike;
concat(buffers: Bufferlike[]): Output;
sha256(message: Bufferlike): Output;
hmacSha256(message: Bufferlike, key: Bufferlike): Output;
}
19 changes: 15 additions & 4 deletions src/platform/nodejs/lib/util/bufferutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return this.toBuffer(buffer).toString('base64');
}

base64UrlEncode(buffer: Bufferlike): string {
return this.toBuffer(buffer).toString('base64url');
}

areBuffersEqual(buffer1: Bufferlike, buffer2: Bufferlike): boolean {
if (!buffer1 || !buffer2) return false;
return this.toBuffer(buffer1).compare(this.toBuffer(buffer2)) == 0;
Expand Down Expand Up @@ -70,14 +74,21 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return Buffer.from(string, 'utf8');
}

concat(buffers: Bufferlike[]): Output {
return Buffer.concat(buffers.map((x) => this.toBuffer(x)));
}

sha256(message: Bufferlike): Output {
const messageBuffer = this.toBuffer(message);

return crypto.createHash('SHA256').update(messageBuffer).digest();
}

hmacSha256(message: Bufferlike, key: Bufferlike): Output {
const messageBuffer = this.toBuffer(message);
const keyBuffer = this.toBuffer(key);

const hmac = crypto.createHmac('SHA256', keyBuffer);
hmac.update(messageBuffer);

return hmac.digest();
return crypto.createHmac('SHA256', keyBuffer).update(messageBuffer).digest();
}
}

Expand Down
27 changes: 26 additions & 1 deletion src/platform/web/lib/util/bufferutils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Platform from 'common/platform';
import IBufferUtils from 'common/types/IBufferUtils';
import { hmac as hmacSha256 } from './hmac-sha256';
import { hmac as hmacSha256, sha256 } from './hmac-sha256';

/* Most BufferUtils methods that return a binary object return an ArrayBuffer
* The exception is toBuffer, which returns a Uint8Array */
Expand Down Expand Up @@ -116,6 +116,11 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return this.uint8ViewToBase64(this.toBuffer(buffer));
}

base64UrlEncode(buffer: Bufferlike): string {
// base64url encoding is based on regular base64 with following changes: https://base64.guru/standards/base64url
return this.base64Encode(buffer).replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '');
}

base64Decode(str: string): Output {
if (ArrayBuffer && Platform.Config.atob) {
return this.base64ToArrayBuffer(str);
Expand Down Expand Up @@ -195,6 +200,26 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return this.toArrayBuffer(arrayBufferView);
}

concat(buffers: Bufferlike[]): Output {
const sumLength = buffers.reduce((acc, v) => acc + v.byteLength, 0);
const result = new Uint8Array(sumLength);
let offset = 0;

for (const buffer of buffers) {
const uint8Array = this.toBuffer(buffer);
// see TypedArray.set for TypedArray argument https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/TypedArray/set#typedarray
result.set(uint8Array, offset);
offset += uint8Array.byteLength;
}

return result.buffer;
}

sha256(message: Bufferlike): Output {
const hash = sha256(this.toBuffer(message));
return this.toArrayBuffer(hash);
}

hmacSha256(message: Bufferlike, key: Bufferlike): Output {
const hash = hmacSha256(this.toBuffer(key), this.toBuffer(message));
return this.toArrayBuffer(hash);
Expand Down
4 changes: 2 additions & 2 deletions src/platform/web/lib/util/hmac-sha256.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function rightRotate(word: number, bits: number) {
return (word >>> bits) | (word << (32 - bits));
}

function sha256(data: Uint8Array) {
export function sha256(data: Uint8Array): Uint8Array {
// Copy default state
var STATE = DEFAULT_STATE.slice();

Expand Down Expand Up @@ -185,7 +185,7 @@ function sha256(data: Uint8Array) {
);
}

export function hmac(key: Uint8Array, data: Uint8Array) {
export function hmac(key: Uint8Array, data: Uint8Array): Uint8Array {
if (key.length > 64) key = sha256(key);

if (key.length < 64) {
Expand Down
107 changes: 87 additions & 20 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage';

export interface LiveCounterData extends LiveObjectData {
Expand Down Expand Up @@ -32,47 +33,113 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
return obj;
}

value(): number {
return this._dataRef.data;
}

/**
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
*
* This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when
* the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
* Returns a {@link LiveCounter} instance based on the provided COUNTER_CREATE state operation.
* The provided state operation must hold a valid counter object data.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
* @internal
*/
async increment(amount: number): Promise<void> {
const stateMessage = this.createCounterIncMessage(amount);
return this._liveObjects.publish([stateMessage]);
static fromStateOperation(liveobjects: LiveObjects, stateOperation: StateOperation): LiveCounter {
const obj = new LiveCounter(liveobjects, stateOperation.objectId);
obj._mergeInitialDataFromCreateOperation(stateOperation);
return obj;
}

/**
* @internal
*/
createCounterIncMessage(amount: number): StateMessage {
static createCounterIncMessage(liveObjects: LiveObjects, objectId: string, amount: number): StateMessage {
const client = liveObjects.getClient();

if (typeof amount !== 'number' || !isFinite(amount)) {
throw new this._client.ErrorInfo('Counter value increment should be a valid number', 40013, 400);
throw new client.ErrorInfo('Counter value increment should be a valid number', 40013, 400);
}

const stateMessage = StateMessage.fromValues(
{
operation: {
action: StateOperationAction.COUNTER_INC,
objectId: this.getObjectId(),
objectId,
counterOp: { amount },
},
} as StateOperation,
},
this._client.Utils,
this._client.MessageEncoding,
client.Utils,
client.MessageEncoding,
);

return stateMessage;
}

/**
* @internal
*/
static async createCounterCreateMessage(liveObjects: LiveObjects, count?: number): Promise<StateMessage> {
const client = liveObjects.getClient();

if (count !== undefined && (typeof count !== 'number' || !Number.isFinite(count))) {
throw new client.ErrorInfo('Counter value should be a valid number', 40013, 400);
}

const initialValueObj = LiveCounter.createInitialValueObject(count);
const { encodedInitialValue, format } = StateMessage.encodeInitialValue(initialValueObj, client);
const nonce = client.Utils.cheapRandStr();
const msTimestamp = await client.getTimestamp(true);

const objectId = ObjectId.fromInitialValue(
client.Platform,
'counter',
encodedInitialValue,
nonce,
msTimestamp,
).toString();

const stateMessage = StateMessage.fromValues(
{
operation: {
...initialValueObj,
action: StateOperationAction.COUNTER_CREATE,
objectId,
nonce,
initialValue: encodedInitialValue,
initialValueEncoding: format,
} as StateOperation,
},
client.Utils,
client.MessageEncoding,
);

return stateMessage;
}

/**
* @internal
*/
static createInitialValueObject(count?: number): Pick<StateOperation, 'counter'> {
return {
counter: {
count: count ?? 0,
},
};
}

value(): number {
return this._dataRef.data;
}

/**
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
*
* This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when
* the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
*/
async increment(amount: number): Promise<void> {
const stateMessage = LiveCounter.createCounterIncMessage(this._liveObjects, this.getObjectId(), amount);
return this._liveObjects.publish([stateMessage]);
}

/**
* Alias for calling {@link LiveCounter.increment | LiveCounter.increment(-amount)}
*/
Expand Down Expand Up @@ -185,7 +252,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
this._siteTimeserials = stateObject.siteTimeserials ?? {};

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
// this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of state object message processing
return { noop: true };
}

Expand Down
Loading

0 comments on commit 0e63215

Please sign in to comment.