Skip to content

Commit

Permalink
Add object-level write API for LiveMap and LiveCounter creation
Browse files Browse the repository at this point in the history
Resolves DTP-1138
  • Loading branch information
VeskeR committed Jan 22, 2025
1 parent 5d5874d commit d2f44d0
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage';

export interface LiveCounterData extends LiveObjectData {
Expand Down Expand Up @@ -32,6 +33,18 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
return obj;
}

/**
* Returns a {@link LiveCounter} instance based on the provided state operation.
* The provided state operation must hold a valid counter object data.
*
* @internal
*/
static fromStateOperation(liveobjects: LiveObjects, stateOperation: StateOperation): LiveCounter {
const obj = new LiveCounter(liveobjects, stateOperation.objectId);
obj._mergeInitialDataFromCreateOperation(stateOperation);
return obj;
}

/**
* @internal
*/
Expand All @@ -57,6 +70,58 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
return stateMessage;
}

/**
* @internal
*/
static async createCounterCreateMessage(liveObjects: LiveObjects, count?: number): Promise<StateMessage> {
const client = liveObjects.getClient();

if (count !== undefined && (typeof count !== 'number' || !Number.isFinite(count))) {
throw new client.ErrorInfo('Counter value should be a valid number', 40013, 400);
}

const initialValueObj = LiveCounter.createInitialValueObject(count);
const { encodedInitialValue, format } = StateMessage.encodeInitialValue(client.Utils, initialValueObj);
const nonce = client.Utils.cheapRandStr();
const msTimestamp = await client.getTimestamp(true);

const objectId = ObjectId.fromInitialValue(
client.Platform,
'counter',
encodedInitialValue,
nonce,
msTimestamp,
).toString();

const stateMessage = StateMessage.fromValues(
{
operation: {
...initialValueObj,
action: StateOperationAction.COUNTER_CREATE,
objectId,
nonce,
initialValue: encodedInitialValue,
initialValueEncoding: format,
},
},
client.Utils,
client.MessageEncoding,
);

return stateMessage;
}

/**
* @internal
*/
static createInitialValueObject(count?: number): Pick<StateOperation, 'counter'> {
return {
counter: {
count: count ?? 0,
},
};
}

value(): number {
return this._dataRef.data;
}
Expand Down
84 changes: 84 additions & 0 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type * as API from '../../../ably';
import { DEFAULTS } from './defaults';
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import {
MapSemantics,
StateMapEntry,
Expand Down Expand Up @@ -80,6 +81,21 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return obj;
}

/**
* Returns a {@link LiveMap} instance based on the provided state operation.
* The provided state operation must hold a valid map object data.
*
* @internal
*/
static fromStateOperation<T extends API.LiveMapType>(
liveobjects: LiveObjects,
stateOperation: StateOperation,
): LiveMap<T> {
const obj = new LiveMap<T>(liveobjects, stateOperation.map?.semantics!, stateOperation.objectId);
obj._mergeInitialDataFromCreateOperation(stateOperation);
return obj;
}

/**
* @internal
*/
Expand Down Expand Up @@ -170,6 +186,74 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}
}

/**
* @internal
*/
static async createMapCreateMessage(liveObjects: LiveObjects, entries?: API.LiveMapType): Promise<StateMessage> {
const client = liveObjects.getClient();

if (entries !== undefined && (entries === null || typeof entries !== 'object')) {
throw new client.ErrorInfo('Map entries should be a key/value object', 40013, 400);
}

Object.entries(entries ?? {}).forEach(([key, value]) => LiveMap.validateKeyValue(liveObjects, key, value));

const initialValueObj = LiveMap.createInitialValueObject(entries);
const { encodedInitialValue, format } = StateMessage.encodeInitialValue(client.Utils, initialValueObj);
const nonce = client.Utils.cheapRandStr();
const msTimestamp = await client.getTimestamp(true);

const objectId = ObjectId.fromInitialValue(
client.Platform,
'map',
encodedInitialValue,
nonce,
msTimestamp,
).toString();

const stateMessage = StateMessage.fromValues(
{
operation: {
...initialValueObj,
action: StateOperationAction.MAP_CREATE,
objectId,
nonce,
initialValue: encodedInitialValue,
initialValueEncoding: format,
},
},
client.Utils,
client.MessageEncoding,
);

return stateMessage;
}

/**
* @internal
*/
static createInitialValueObject(entries?: API.LiveMapType): Pick<StateOperation, 'map'> {
const stateMapEntries: Record<string, StateMapEntry> = {};

Object.entries(entries ?? {}).forEach(([key, value]) => {
const stateData: StateData =
value instanceof LiveObject
? ({ objectId: value.getObjectId() } as ObjectIdStateData)
: ({ value } as ValueStateData);

stateMapEntries[key] = {
data: stateData,
};
});

return {
map: {
semantics: MapSemantics.LWW,
entries: stateMapEntries,
},
};
}

/**
* Returns the value associated with the specified key in the underlying Map object.
*
Expand Down
48 changes: 48 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,54 @@ export class LiveObjects {
return this._liveObjectsPool.get(ROOT_OBJECT_ID) as LiveMap<T>;
}

/**
* Send a MAP_CREATE operation to the realtime system to create a new map object in the pool.
*
* Locally on the client it creates a zero-value object with the corresponding id and returns it.
* The object initialization with the initial value is expected to happen when the corresponding MAP_CREATE operation is echoed
* back to the client and applied to the object following the regular operation application procedure.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message. A promise is resolved with a zero-value object created in the local pool.
*/
async createMap<T extends API.LiveMapType>(entries?: T): Promise<LiveMap<T>> {
const stateMessage = await LiveMap.createMapCreateMessage(this, entries);
const objectId = stateMessage.operation?.objectId!;

// must create an object locally before publishing an op, so we don't override an object created by applying echoed operation if we received it before ACK for a publish operation.
// new map object can be created using locally constructed state operation, even though it is missing timeserials for map entries.
// CREATE operation is only applied once, and all map entries will have an "earliest possible" timeserial so that any subsequent operation can be applied to them.
const map = LiveMap.fromStateOperation<T>(this, stateMessage.operation!);
this._liveObjectsPool.set(objectId, map);

await this.publish([stateMessage]);

return map;
}

/**
* Send a COUNTER_CREATE operation to the realtime system to create a new counter object in the pool.
*
* Locally on the client it creates a zero-value object with the corresponding id and returns it.
* The object initialization with the initial value is expected to happen when the corresponding COUNTER_CREATE operation is echoed
* back to the client and applied to the object following the regular operation application procedure.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message. A promise is resolved with a zero-value object created in the local pool.
*/
async createCounter(count?: number): Promise<LiveCounter> {
const stateMessage = await LiveCounter.createCounterCreateMessage(this, count);
const objectId = stateMessage.operation?.objectId!;

// must create an object locally before publishing an op, so we don't override an object created by applying echoed operation if we received it before ACK for a publish operation.
// new counter object can be created using locally constructed state operation.
// CREATE operation is only applied once, so the initial counter value won't be double counted when we eventually receive an echoed CREATE operation
const counter = LiveCounter.fromStateOperation(this, stateMessage.operation!);
this._liveObjectsPool.set(objectId, counter);

await this.publish([stateMessage]);

return counter;
}

/**
* @internal
*/
Expand Down

0 comments on commit d2f44d0

Please sign in to comment.