Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task: Implement the resume event for tasks. #140

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
- name: Run tests
env:
RESONATE_STORE_URL: http://localhost:8001
RESONATE_TASK_SOURCE_URL: http://localhost:3000/recv
run: npm test -- --verbose

- name: Upload coverage report to Codecov
Expand Down
7 changes: 7 additions & 0 deletions lib/core/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@ export type ResonateOptions = {
/**
* The remote promise store url. If not provided, an in-memory
* promise store will be used.
* Must be set if `tasksUrl` is set.
*/
url: string;

/**
* Tasks Url to listen for tasks from the server, must be a valid http url.
* Default port 3000. Must be set if `url` is set.
*/
tasksUrl: string;
};

/**
Expand Down
13 changes: 9 additions & 4 deletions lib/core/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export function isRetryPolicy(value: unknown): value is RetryPolicy {
export function exponential(
initialDelayMs: number = 100,
backoffFactor: number = 2,
maxAttempts: number = Infinity,
maxAttempts: number = -1,
maxDelayMs: number = 60000,
): Exponential {
return {
Expand All @@ -71,7 +71,7 @@ export function exponential(
};
}

export function linear(delayMs: number = 1000, maxAttempts: number = Infinity): Linear {
export function linear(delayMs: number = 1000, maxAttempts: number = -1): Linear {
return {
kind: "linear",
delayMs,
Expand Down Expand Up @@ -157,25 +157,30 @@ export async function runWithRetry<T>(
throw error;
}

// Maps every of the supported retry policies to have the same
// fields so we can reuse the same function for retries
function retryDefaults(retryPolicy: RetryPolicy): {
initialDelay: number;
backoffFactor: number;
maxAttempts: number;
maxDelay: number;
} {
let maxAttemps;
switch (retryPolicy.kind) {
case "exponential":
maxAttemps = retryPolicy.maxAttempts === -1 ? Infinity : retryPolicy.maxAttempts;
return {
initialDelay: retryPolicy.initialDelayMs,
backoffFactor: retryPolicy.backoffFactor,
maxAttempts: retryPolicy.maxAttempts,
maxAttempts: maxAttemps,
maxDelay: retryPolicy.maxDelayMs,
};
case "linear":
maxAttemps = retryPolicy.maxAttempts === -1 ? Infinity : retryPolicy.maxAttempts;
return {
initialDelay: retryPolicy.delayMs,
backoffFactor: 1,
maxAttempts: retryPolicy.maxAttempts,
maxAttempts: maxAttemps,
maxDelay: retryPolicy.delayMs,
};
case "never":
Expand Down
34 changes: 34 additions & 0 deletions lib/core/store.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DurablePromiseRecord } from "./promises/types";

import { Schedule } from "./schedules/types";
import { CallbackRecord, ResumeBody } from "./tasks";

/**
* Store Interface
Expand All @@ -9,6 +10,8 @@ export interface IStore {
readonly promises: IPromiseStore;
readonly schedules: IScheduleStore;
readonly locks: ILockStore;
readonly callbacks: ICallbackStore;
readonly tasks: ITaskStore;
}

/**
Expand Down Expand Up @@ -195,3 +198,34 @@ export interface ILockStore {
*/
release(id: string, eid: string): Promise<boolean>;
}

/**
* Task Store API
*/
export interface ITaskStore {
/**
* Claims a task.
*
* @param tasId Id of the task to claim.
* @param count Count of the task to claim.
* @returns A boolean indicating whether or not the task was claim.
*
*/
claim(taskId: string, count: number): Promise<ResumeBody>;

/**
* Completes the task.
*
* @param taskId Id of the task to complete.
* @param count Count of the task to claim.
*
*/
complete(taskId: string, count: number): Promise<boolean>;
}

/**
* Callback Store API
*/
export interface ICallbackStore {
create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<CallbackRecord>;
}
93 changes: 91 additions & 2 deletions lib/core/stores/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import { Schedule } from "../schedules/types";
import { IStorage } from "../storage";
import { MemoryStorage } from "../storages/memory";
import { WithTimeout } from "../storages/withTimeout";
import { IStore, IPromiseStore, IScheduleStore, ILockStore } from "../store";
import { IStore, IPromiseStore, IScheduleStore, ILockStore, ICallbackStore, ITaskStore } from "../store";
import { CallbackRecord, ResumeBody, isResumeBody } from "../tasks";

export class LocalStore implements IStore {
public promises: LocalPromiseStore;
public schedules: LocalScheduleStore;
public locks: LocalLockStore;
public callbacks: LocalCallbackStore;
public tasks: LocalTaskStore;

public readonly logger: ILogger;

Expand All @@ -32,10 +35,13 @@ export class LocalStore implements IStore {
promiseStorage: IStorage<DurablePromiseRecord> = new WithTimeout(new MemoryStorage<DurablePromiseRecord>()),
scheduleStorage: IStorage<Schedule> = new MemoryStorage<Schedule>(),
lockStorage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(),
callbacksStorage: IStorage<CallbackRecord> = new MemoryStorage<CallbackRecord>(),
) {
this.callbacks = new LocalCallbackStore(this, callbacksStorage);
this.promises = new LocalPromiseStore(this, promiseStorage);
this.schedules = new LocalScheduleStore(this, scheduleStorage);
this.locks = new LocalLockStore(this, lockStorage);
this.tasks = new LocalTaskStore(this);

this.logger = opts.logger ?? new Logger();

Expand Down Expand Up @@ -170,7 +176,7 @@ export class LocalPromiseStore implements IPromiseStore {
headers: Record<string, string> | undefined,
data: string | undefined,
): Promise<DurablePromiseRecord> {
return this.storage.rmw(id, (promise) => {
return await this.storage.rmw(id, (promise) => {
if (!promise) {
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
}
Expand Down Expand Up @@ -492,6 +498,89 @@ export class LocalLockStore implements ILockStore {
}
}

export class LocalCallbackStore implements ICallbackStore {
constructor(
private store: LocalStore,
private storage: IStorage<CallbackRecord>,
) {}

async create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<CallbackRecord> {
const promise = await this.store.promises.get(promiseId);
const callbackRecord = {
callback: {
id: promiseId,
promiseId,
message: {
data: data,
recv: recv,
},
timeout: timeout,
createdOn: Date.now(),
},
promise: promise,
};

if (promise.state !== "PENDING") {
// Returns a mock callback with the resolved promise, doesn't actually creates the callback
return callbackRecord;
}

// If the promise is pending creates the callback for it.
return await this.storage.rmw(promiseId, (callback) => {
if (!callback) {
return callbackRecord;
}
return callback;
});
}

async get(promiseId: string): Promise<CallbackRecord> {
return this.storage.rmw(promiseId, (callback) => {
if (!callback) {
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
}
return callback;
});
}

async getAll(): Promise<CallbackRecord[]> {
// TODO(avillega): Migrate this loop to `Array.fromAsync` once we make node 22 our minimum version
const result = [];
for await (const callbacks of this.storage.all()) {
result.push(...callbacks);
}
return result;
}

async delete(callbackId: string): Promise<boolean> {
return await this.storage.rmd(callbackId, (callback) => callback.callback.id === callbackId);
}
}

export class LocalTaskStore implements ITaskStore {
constructor(private store: LocalStore) {}

// NOTE: Just for the Local Store the taskId === callbackId which allows us to
// claim the task by getting the data from the callback
async claim(taskId: string, count: number): Promise<ResumeBody> {
const callback = await this.store.callbacks.get(taskId);
if (!callback) {
throw new ResonateError("Task not found", ErrorCodes.STORE_NOT_FOUND);
}

const resumeBody = JSON.parse(callback.callback.message.data);
if (!isResumeBody(resumeBody)) {
throw new ResonateError("Invalid response", ErrorCodes.STORE_PAYLOAD, resumeBody);
}
return resumeBody;
}

async complete(taskId: string, count: number): Promise<boolean> {
await this.store.callbacks.delete(taskId);
return true;
}
}

// Utils

function searchStates(state: string | undefined): string[] {
Expand Down
101 changes: 99 additions & 2 deletions lib/core/stores/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import { Logger } from "../loggers/logger";
import { StoreOptions } from "../options";
import { DurablePromiseRecord, isDurablePromiseRecord, isCompletedPromise } from "../promises/types";
import { Schedule, isSchedule } from "../schedules/types";
import { IStore, IPromiseStore, IScheduleStore, ILockStore } from "../store";
import { IStore, IPromiseStore, IScheduleStore, ILockStore, ICallbackStore, ITaskStore } from "../store";
import { CallbackRecord, ResumeBody, isCallbackRecord, isResumeBody } from "../tasks";
import * as utils from "../utils";

export class RemoteStore implements IStore {
Expand All @@ -18,6 +19,8 @@ export class RemoteStore implements IStore {
public readonly promises: RemotePromiseStore;
public readonly schedules: RemoteScheduleStore;
public readonly locks: RemoteLockStore;
public readonly callbacks: ICallbackStore;
public readonly tasks: ITaskStore;

public readonly encoder: IEncoder<string, string>;
public readonly heartbeat: number;
Expand All @@ -33,6 +36,8 @@ export class RemoteStore implements IStore {
this.promises = new RemotePromiseStore(this);
this.schedules = new RemoteScheduleStore(this);
this.locks = new RemoteLockStore(this);
this.callbacks = new RemoteCallbackStore(this);
this.tasks = new RemoteTasksStore(this);

// store options
this.encoder = opts.encoder ?? new Base64Encoder();
Expand All @@ -56,7 +61,7 @@ export class RemoteStore implements IStore {
for (let i = 0; i < this.retries + 1; i++) {
try {
this.logger.debug("store:req", {
url: this.url,
url: `${this.url}/${path}`,
method: options.method,
headers: options.headers,
body: options.body,
Expand All @@ -79,6 +84,7 @@ export class RemoteStore implements IStore {
const body: unknown = r.status !== 204 ? await r.json() : undefined;

this.logger.debug("store:res", {
url: `${this.url}/${path}`,
status: r.status,
body: body,
});
Expand Down Expand Up @@ -478,6 +484,97 @@ export class RemoteLockStore implements ILockStore {
}
}

export class RemoteTasksStore implements ITaskStore {
#heartbeatInterval: NodeJS.Timeout | undefined = undefined;
#activeTasks: number = 0;

constructor(private store: RemoteStore) {}

async claim(taskId: string, counter: number): Promise<ResumeBody> {
const resumeBody = await this.store.call<ResumeBody>("tasks/claim", isResumeBody, {
method: "POST",
body: JSON.stringify({
id: taskId,
counter: counter,
processId: this.store.pid,
frequency: this.store.heartbeat * 4,
}),
});

this.#activeTasks++;

this.startHeartbeat();
return resumeBody;
}

private async startHeartbeat(): Promise<void> {
if (!this.#heartbeatInterval) {
this.#heartbeatInterval = setInterval(() => this.heartbeat(), this.store.heartbeat);
}
}

private async stopHeartbeat(): Promise<void> {
if (this.#heartbeatInterval) {
clearInterval(this.#heartbeatInterval);
this.#heartbeatInterval = undefined;
}
}

private async heartbeat(): Promise<void> {
const res = await this.store.call<{ activeTasks: number }>(
`tasks/heartbeat`,
(b: unknown): b is { activeTasks: number } =>
typeof b === "object" && b !== null && "activeTasks" in b && typeof b.activeTasks === "number",
{
method: "POST",
body: JSON.stringify({
processId: this.store.pid,
}),
},
);

this.#activeTasks = res.activeTasks;
if (res.activeTasks === 0) {
this.stopHeartbeat();
}
}

async complete(taskId: string, counter: number): Promise<boolean> {
await this.store.call<void>(`tasks/complete`, (b: unknown): b is void => b === null, {
method: "POST",
body: JSON.stringify({
id: taskId,
counter: counter,
}),
});

// decrement the number of active tasks
this.#activeTasks = Math.max(this.#activeTasks - 1, 0);

if (this.#activeTasks === 0) {
this.stopHeartbeat();
}

return true;
}
}

export class RemoteCallbackStore implements ICallbackStore {
constructor(private store: RemoteStore) {}

async create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<CallbackRecord> {
return this.store.call("callbacks", isCallbackRecord, {
method: "POST",
body: JSON.stringify({
promiseId,
recv,
timeout,
data: data ? encode(data, this.store.encoder) : undefined,
}),
});
}
}

// Utils

function encode(value: string, encoder: IEncoder<string, string>): string {
Expand Down
Loading
Loading