Skip to content

Commit

Permalink
Merge pull request #25 from ncthbrt/feature/timeout
Browse files Browse the repository at this point in the history
Feature/timeout
  • Loading branch information
ncthbrt authored Dec 2, 2017
2 parents 9203df2 + 2adc1e2 commit 3d56554
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 16 deletions.
25 changes: 18 additions & 7 deletions lib/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const { Subject } = require('rxjs');
const { stop } = require('./functions');

class Actor {
constructor (parent, name, system, f) {
constructor (parent, name, system, f, { shutdown } = {}) {
this.parent = parent;
if (!name) {
name = `anonymous-${parent.children.size}`;
Expand All @@ -31,6 +31,13 @@ class Actor {
this.mailbox = new Queue();
this.immediate = undefined;
this.parent.childSpawned(this);
if (shutdown) {
if (!shutdown.duration) {
throw new Error('Shutdown should be specified as a duration. It is recommended to use the after() function to do this');
}
this.shutdownPeriod = shutdown.duration;
this.timeout = setTimeout(() => this.stop(), this.shutdownPeriod);
}
}

static serializeErr (err) {
Expand All @@ -47,6 +54,10 @@ class Actor {

dispatch (message, sender = new Nobody(this.system)) {
this.assertNotStopped();
if (this.shutdownPeriod) {
clearTimeout(this.timeout);
setTimeout(() => this.stop(), this.shutdownPeriod);
}
if (!this.busy) {
this.handleMessage(message, sender);
} else {
Expand All @@ -61,7 +72,7 @@ class Actor {

timeout = Actor.getSafeTimeout(timeout);
const timeoutHandle = setTimeout(() => { deffered.reject(new Error('Query Timeout')); }, timeout);
deffered.promise.then(() => clearTimeout(timeoutHandle)).catch(() => {});
deffered.promise.then(() => clearTimeout(timeoutHandle)).catch(() => { });

let tempReference = new TemporaryReference(deffered);
this.dispatch(message, tempReference);
Expand All @@ -82,7 +93,7 @@ class Actor {
if (this.immediate) {
clearImmediate(this.immediate);
}
this.parent.childStopped(this);
this.parent && this.parent.childStopped(this);
this.reference && dereference(this.reference);
delete this.reference;
delete this.parent;
Expand Down Expand Up @@ -149,18 +160,18 @@ class Actor {
}
}

const spawn = (parent, f, name) =>
applyOrThrowIfStopped(parent, p => p.assertNotStopped() && new Actor(p, name, p.system, f).reference);
const spawn = (parent, f, name, properties) =>
applyOrThrowIfStopped(parent, p => p.assertNotStopped() && new Actor(p, name, p.system, f, properties).reference);

const spawnStateless = (parent, f, name) =>
const spawnStateless = (parent, f, name, properties) =>
spawn(parent, function (state, msg, ctx) {
try {
f.call(ctx, msg, ctx);
} catch (e) {
console.error(e);
}
return true;
}, name);
}, name, properties);

module.exports.spawn = spawn;
module.exports.spawnStateless = spawnStateless;
Expand Down
2 changes: 2 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const { spawn, spawnStateless } = require('./actor');
const { stop, state$, query, dispatch } = require('./functions');
const { spawnPersistent, configurePersistence } = require('./persistence');
const { after } = require('./utils');
module.exports = {
...require('./system'),
spawn,
after,
spawnStateless,
query,
dispatch,
Expand Down
6 changes: 3 additions & 3 deletions lib/persistence/persistent-actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ const { Promise } = require('bluebird');
const freeze = require('deep-freeze-node');

class PersistentActor extends Actor {
constructor (parent, name, system, f, key, persistenceEngine) {
super(parent, name, system, f);
constructor (parent, name, system, f, key, persistenceEngine, properties = {}) {
super(parent, name, system, f, properties);
if (!key) {
throw new Error('Persistence key required');
}
Expand Down Expand Up @@ -62,7 +62,7 @@ class PersistentActor extends Actor {
}

const { applyOrThrowIfStopped } = require('../references');
const spawnPersistent = (reference, f, key, name) =>
const spawnPersistent = (reference, f, key, name, properties) =>
applyOrThrowIfStopped(
reference,
parent => applyOrThrowIfStopped(
Expand Down
43 changes: 43 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
class After {
constructor (amount) {
this._amount = amount;
Object.freeze(this);
}

get hours () {
return { duration: (this._amount * 60 * 60 * 1000) | 0 };
}

get hour () {
return this.hours;
}

get minutes () {
return { duration: (this._amount * 60 * 1000) | 0 };
}

get minute () {
return this.minutes;
}

get seconds () {
return { duration: (this._amount * 1000) | 0 };
}

get second () {
return this.seconds;
}

get milliseconds () {
return { duration: this._amount | 0 };
}
get millisecond () {
return this.milliseconds;
}
}

const after = (amount) => new After(amount);

module.exports = {
after
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "3.1.5",
"version": "3.2.0",
"description": "nact ⇒ node.js + actors = your services have never been so µ",
"main": "lib/index.js",
"scripts": {
Expand Down
39 changes: 34 additions & 5 deletions test/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
chai.should();
const { start, spawn, spawnStateless, dispatch, stop, query, state$ } = require('../lib');
const { start, spawn, after, spawnStateless, dispatch, stop, query, state$ } = require('../lib');
const { Promise } = require('bluebird');
const { LocalPath } = require('../lib/paths');
const delay = Promise.delay;
const delay = Promise.delay.bind(Promise);
const { applyOrThrowIfStopped } = require('../lib/references');

const spawnChildrenEchoer = (parent, name) =>
Expand Down Expand Up @@ -176,7 +176,7 @@ describe('Actor', function () {
console.error = ignore;
let child = spawnStateless(system, (msg) => { throw new Error('testError'); });
dispatch(child);
delay(50);
await delay(50);
isStopped(child).should.not.be.true;
});

Expand All @@ -195,6 +195,35 @@ describe('Actor', function () {
});
});

describe('timeout', function () {
let system;
beforeEach(() => { system = start(); });
afterEach(() => {
stop(system);
// reset console
delete console.error;
});

it('should automatically stop after timeout if timeout is specified', async function () {
console.error = ignore;
let child = spawnStateless(system, (msg) => {}, 'test', { shutdown: after(100).milliseconds });
await delay(110);
isStopped(child).should.be.true;
});

it('should automatically renew timeout after message', async function () {
let child = spawnStateless(system, ignore, 'test1', { shutdown: after(60).milliseconds });
await delay(30);
dispatch(child, {});
await delay(40);
isStopped(child).should.not.be.true;
});

it('should throw if timeout does not include a duration field', async function () {
(() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw();
});
});

describe('#stop()', function () {
let system;
beforeEach(() => { system = start(); });
Expand Down Expand Up @@ -224,7 +253,7 @@ describe('Actor', function () {
let resultPromise = query(child, 2, 100);
await delay(20);
stop(child);
await resultPromise.should.be.rejectedWith(Error);
return resultPromise.should.be.rejectedWith(Error);
});

it('stops children when parent is stopped', async function () {
Expand Down Expand Up @@ -260,7 +289,7 @@ describe('Actor', function () {
isStopped(child).should.be.true;
});

it('should ignore subsequent dispatchs', async function () {
it('should ignore subsequent dispatches', async function () {
let child = spawnStateless(system, () => { throw new Error('Should not be triggered'); });
stop(child);
await retry(() => isStopped(child).should.be.true, 12, 10);
Expand Down
29 changes: 29 additions & 0 deletions test/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* eslint-env mocha */
/* eslint-disable no-unused-expressions */
const chai = require('chai');
chai.should();
const { after } = require('../lib');

describe('#after', function () {
it('should correctly calculate milliseconds', function () {
after(100).milliseconds.duration.should.equal(100);
after(1).millisecond.duration.should.equal(1);
after(0).milliseconds.duration.should.equal(0);
});
it('should correctly calculate seconds', function () {
after(1).second.duration.should.equal(1000);
after(0).seconds.duration.should.equal(0);
after(10).seconds.duration.should.equal(10000);
after(1.5).seconds.duration.should.equal(1500);
});

it('should correctly calculate minutes', function () {
after(1).minute.duration.should.equal(60000);
after(0).minutes.duration.should.equal(0);
});

it('should correctly calculate hours', function () {
after(1).hour.duration.should.equal(3600000);
after(0).hours.duration.should.equal(0);
});
});

0 comments on commit 3d56554

Please sign in to comment.