Skip to content

Commit

Permalink
feat: combine callbacks to simplify the API
Browse files Browse the repository at this point in the history
  • Loading branch information
vicary committed Sep 17, 2022
1 parent 3e4f387 commit dc16139
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 33 deletions.
23 changes: 9 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,17 @@ As a proof of concept, this is the most basic implementation of an in-memory que

```ts
type Payload = any;

type MemoryMutexTask = Task<Payload> & { active?: boolean };

const tasks = new Set() < MemoryMutexTask > [];
const tasks = new Set<MemoryMutexTask>();
const pool = new Workerpool<Payload>({
concurrency: 1,
runners: [runnerA, runnerB],
enqueue: (task: MemoryMutexTask) => {
enqueue(task: MemoryMutexTask) {
task.active = false;
tasks.add(task);
},
dequeue: () => {
dequeue() {
// Uncomment the following line for FIFO queues
// if ([...tasks].find(({ active }) => active)) return;

Expand All @@ -70,19 +69,15 @@ const pool = new Workerpool<Payload>({
return task;
}
},
success: (result, { task }) => {
console.log("Result:", result);
onTaskFinish(error, result, { task }) {
tasks.delete(task);

const index = tasks.indexOf(task);
if (index > -1) {
tasks.splice(index, 1);
if (error) {
console.error(error);
} else {
console.log(result);
}
},
failure: (error, { task }) => {
console.error(error);

const index = tasks.indexOf(task);
},
});
```

Expand Down
2 changes: 1 addition & 1 deletion Workerpool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe("Workerpool", () => {
return task;
}
},
success: (_, { task }) => {
onTaskFinished: (_error, _result, { task }) => {
const index = queue.indexOf(task);
if (index > -1) {
queue.splice(index, 1);
Expand Down
36 changes: 18 additions & 18 deletions Workerpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,20 @@ export type WorkerpoolOptions<TPayload = JsonValue, TResult = unknown> = {
dequeue: () => Promisable<Task<TPayload> | undefined>;

/**
* Called when a dequeued task is successful, use this function to remove
* finished tasks (mutex).
* Callback style task handler.
*/
success?: (
result: TResult,
context: CallbackContext<TPayload, TResult>
) => Promisable<void>;

/**
* Called when a failing task has exceeded maximum retries.
*/
failure?: (
error: Error,
context: CallbackContext<TPayload, TResult>
) => Promisable<void>;
onTaskFinished?: {
(
error: Error,
result: null,
context: CallbackContext<TPayload, TResult>
): Promisable<void>;
(
error: null,
result: TResult,
context: CallbackContext<TPayload, TResult>
): Promisable<void>;
};

/**
* Called when the state of the pool is changed.
Expand Down Expand Up @@ -127,7 +126,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {

start() {
if (this.#active) {
return;
return this;
}

this.#active = true;
Expand Down Expand Up @@ -221,16 +220,17 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
runner
.execute(task.payload)
.then(
(result) => this.options.success?.(result, { task, runner }),
(result) =>
this.options.onTaskFinished?.(null, result, { task, runner }),
(error) => {
if (
error instanceof RunnerExecutionError &&
error.retryable &&
task.executionCount < this.#maximumRetries
) {
this.enqueue(task);
} else if (this.options.failure) {
return this.options.failure(error, { task, runner });
} else if (this.options.onTaskFinished) {
return this.options.onTaskFinished(error, null, { task, runner });
} else {
throw error;
}
Expand Down

0 comments on commit dc16139

Please sign in to comment.