Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-1024] GC tombstoned map entries for LiveMap and objects in the global pool #1937

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/plugins/liveobjects/defaults.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const DEFAULTS = {
gcInterval: 1000 * 60 * 5, // 5 minutes
/**
* Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation
* with an earlier origin timeserial that would not have been applied if the tombstone still existed.
*
* Applies both for map entries tombstones and object tombstones.
*/
gcGracePeriod: 1000 * 60 * 60 * 24, // 24 hours
};
8 changes: 8 additions & 0 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

/**
* @internal
*/
onGCInterval(): void {
// nothing to GC for a counter object
return;
}

protected _getZeroValueData(): LiveCounterData {
return { data: 0 };
}
Expand Down
30 changes: 28 additions & 2 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import deepEqual from 'deep-equal';

import type * as API from '../../../ably';
import { DEFAULTS } from './defaults';
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import {
Expand Down Expand Up @@ -33,6 +34,10 @@ export type StateData = ObjectIdStateData | ValueStateData;

export interface MapEntry {
tombstone: boolean;
/**
* Can't use timeserial from the operation that deleted the entry for the same reason as for {@link LiveObject} tombstones, see explanation there.
*/
tombstonedAt: number | undefined;
timeserial: string | undefined;
data: StateData | undefined;
}
Expand Down Expand Up @@ -295,6 +300,22 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

/**
* @internal
*/
onGCInterval(): void {
// should remove any tombstoned entries from the underlying map data that have exceeded the GC grace period

const keysToDelete: string[] = [];
for (const [key, value] of this._dataRef.data.entries()) {
if (value.tombstone === true && Date.now() - value.tombstonedAt! >= DEFAULTS.gcGracePeriod) {
keysToDelete.push(key);
}
}

keysToDelete.forEach((x) => this._dataRef.data.delete(x));
}

protected _getZeroValueData(): LiveMapData {
return { data: new Map<string, MapEntry>() };
}
Expand Down Expand Up @@ -459,11 +480,13 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

if (existingEntry) {
existingEntry.tombstone = false;
existingEntry.tombstonedAt = undefined;
existingEntry.timeserial = opOriginTimeserial;
existingEntry.data = liveData;
} else {
const newEntry: MapEntry = {
tombstone: false,
tombstonedAt: undefined,
timeserial: opOriginTimeserial,
data: liveData,
};
Expand All @@ -490,11 +513,13 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

if (existingEntry) {
existingEntry.tombstone = true;
existingEntry.tombstonedAt = Date.now();
existingEntry.timeserial = opOriginTimeserial;
existingEntry.data = undefined;
} else {
const newEntry: MapEntry = {
tombstone: true,
tombstonedAt: Date.now(),
timeserial: opOriginTimeserial,
data: undefined,
};
Expand Down Expand Up @@ -548,9 +573,10 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

const liveDataEntry: MapEntry = {
timeserial: entry.timeserial,
// true only if we received explicit true. otherwise always false
tombstone: entry.tombstone === true,
data: liveData,
// consider object as tombstoned only if we received an explicit flag stating that. otherwise it exists
tombstone: entry.tombstone === true,
tombstonedAt: entry.tombstone === true ? Date.now() : undefined,
};

liveMapData.data.set(key, liveDataEntry);
Expand Down
22 changes: 22 additions & 0 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ export abstract class LiveObject<
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;
private _tombstone: boolean;
/**
* Even though the `timeserial` from the operation that deleted the object contains the timestamp value,
* the `timeserial` should be treated as an opaque string on the client, meaning we should not attempt to parse it.
*
* Therefore, we need to set our own timestamp using local clock when the object is deleted client-side.
* Strictly speaking, this does make an assumption about the client clock not being too heavily skewed behind the server,
* but it is an acceptable compromise for the time being, as the likelihood of encountering a race here is pretty low given the grace periods we use.
*/
private _tombstonedAt: number | undefined;

protected constructor(
protected _liveObjects: LiveObjects,
Expand Down Expand Up @@ -108,6 +117,7 @@ export abstract class LiveObject<
*/
tombstone(): void {
this._tombstone = true;
this._tombstonedAt = Date.now();
this._dataRef = this._getZeroValueData();
// TODO: emit "deleted" event so that end users get notified about this object getting deleted
}
Expand All @@ -119,6 +129,13 @@ export abstract class LiveObject<
return this._tombstone;
}

/**
* @internal
*/
tombstonedAt(): number | undefined {
return this._tombstonedAt;
}

/**
* Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object.
*
Expand Down Expand Up @@ -168,6 +185,11 @@ export abstract class LiveObject<
* @internal
*/
abstract overrideWithStateObject(stateObject: StateObject): TUpdate | LiveObjectUpdateNoop;
/**
* @internal
*/
abstract onGCInterval(): void;

protected abstract _getZeroValueData(): TData;
/**
* Calculate the update object based on the current Live Object data and incoming new data.
Expand Down
4 changes: 4 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type BaseClient from 'common/lib/client/baseclient';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import type EventEmitter from 'common/lib/util/eventemitter';
import type * as API from '../../../ably';
import { DEFAULTS } from './defaults';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
Expand All @@ -26,6 +27,9 @@ export class LiveObjects {
private _currentSyncCursor: string | undefined;
private _bufferedStateOperations: StateMessage[];

// Used by tests
static _DEFAULTS = DEFAULTS;

constructor(channel: RealtimeChannel) {
this._channel = channel;
this._client = channel.client;
Expand Down
24 changes: 24 additions & 0 deletions src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type BaseClient from 'common/lib/client/baseclient';
import { DEFAULTS } from './defaults';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
Expand All @@ -13,10 +14,16 @@ export const ROOT_OBJECT_ID = 'root';
export class LiveObjectsPool {
private _client: BaseClient;
private _pool: Map<string, LiveObject>;
private _gcInterval: ReturnType<typeof setInterval>;

constructor(private _liveObjects: LiveObjects) {
this._client = this._liveObjects.getClient();
this._pool = this._getInitialPool();
this._gcInterval = setInterval(() => {
this._onGCInterval();
}, DEFAULTS.gcInterval);
// call nodejs's Timeout.unref to not require Node.js event loop to remain active due to this interval. see https://nodejs.org/api/timers.html#timeoutunref
this._gcInterval.unref?.();
}

get(objectId: string): LiveObject | undefined {
Expand Down Expand Up @@ -68,4 +75,21 @@ export class LiveObjectsPool {
pool.set(root.getObjectId(), root);
return pool;
}

private _onGCInterval(): void {
const toDelete: string[] = [];
for (const [objectId, obj] of this._pool.entries()) {
// tombstoned objects should be removed from the pool if they have been tombstoned for longer than grace period.
// by removing them from the local pool, LiveObjects plugin no longer keeps a reference to those objects, allowing JS's
// Garbage Collection to eventually free the memory for those objects, provided the user no longer references them either.
if (obj.isTombstoned() && Date.now() - obj.tombstonedAt()! >= DEFAULTS.gcGracePeriod) {
toDelete.push(objectId);
continue;
}

obj.onGCInterval();
}

toDelete.forEach((x) => this._pool.delete(x));
}
}
7 changes: 7 additions & 0 deletions test/common/modules/private_api_recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.Defaults.getPort',
'call.Defaults.normaliseOptions',
'call.EventEmitter.emit',
'call.LiveObject.isTombstoned',
'call.LiveObjects._liveObjectsPool._onGCInterval',
'call.LiveObjects._liveObjectsPool.get',
'call.Message.decode',
'call.Message.encode',
'call.Platform.Config.push.storage.clear',
Expand Down Expand Up @@ -72,6 +75,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'pass.clientOption.webSocketSlowTimeout',
'pass.clientOption.wsConnectivityCheckUrl', // actually ably-js public API (i.e. it’s in the TypeScript typings) but no other SDK has it. At the same time it's not entirely clear if websocket connectivity check should be considered an ably-js-specific functionality (as for other params above), so for the time being we consider it as private API
'read.Defaults.version',
'read.LiveMap._dataRef.data',
'read.EventEmitter.events',
'read.Platform.Config.push',
'read.Realtime._transports',
Expand Down Expand Up @@ -112,6 +116,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'read.transport.params.mode',
'read.transport.recvRequest.recvUri',
'read.transport.uri',
'replace.LiveObjects._liveObjectsPool._onGCInterval',
'replace.channel.attachImpl',
'replace.channel.processMessage',
'replace.channel.sendMessage',
Expand All @@ -128,6 +133,8 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'serialize.recoveryKey',
'write.Defaults.ENVIRONMENT',
'write.Defaults.wsConnectivityCheckUrl',
'write.LiveObjects._DEFAULTS.gcGracePeriod',
'write.LiveObjects._DEFAULTS.gcInterval',
'write.Platform.Config.push', // This implies using a mock implementation of the internal IPlatformPushConfig interface. Our mock (in push_channel_transport.js) then interacts with internal objects and private APIs of public objects to implement this interface; I haven’t added annotations for that private API usage, since there wasn’t an easy way to pass test context information into the mock. I think that for now we can just say that if we wanted to get rid of this private API usage, then we’d need to remove this mock entirely.
'write.auth.authOptions.requestHeaders',
'write.auth.key',
Expand Down
Loading
Loading