diff --git a/lib/actor.js b/lib/actor.js index 8ba44d5..086e698 100644 --- a/lib/actor.js +++ b/lib/actor.js @@ -9,7 +9,7 @@ const { Subject } = require('rxjs'); const { stop } = require('./functions'); class Actor { - constructor (parent, name, system, f) { + constructor (parent, name, system, f, { shutdown } = {}) { this.parent = parent; if (!name) { name = `anonymous-${parent.children.size}`; @@ -31,6 +31,13 @@ class Actor { this.mailbox = new Queue(); this.immediate = undefined; this.parent.childSpawned(this); + if (shutdown) { + if (!shutdown.duration) { + throw new Error('Shutdown should be specified as a duration. It is recommended to use the after() function to do this'); + } + this.shutdownPeriod = shutdown.duration; + this.timeout = setTimeout(() => this.stop(), this.shutdownPeriod); + } } static serializeErr (err) { @@ -47,6 +54,10 @@ class Actor { dispatch (message, sender = new Nobody(this.system)) { this.assertNotStopped(); + if (this.shutdownPeriod) { + clearTimeout(this.timeout); + setTimeout(() => this.stop(), this.shutdownPeriod); + } if (!this.busy) { this.handleMessage(message, sender); } else { @@ -61,7 +72,7 @@ class Actor { timeout = Actor.getSafeTimeout(timeout); const timeoutHandle = setTimeout(() => { deffered.reject(new Error('Query Timeout')); }, timeout); - deffered.promise.then(() => clearTimeout(timeoutHandle)).catch(() => {}); + deffered.promise.then(() => clearTimeout(timeoutHandle)).catch(() => { }); let tempReference = new TemporaryReference(deffered); this.dispatch(message, tempReference); @@ -82,7 +93,7 @@ class Actor { if (this.immediate) { clearImmediate(this.immediate); } - this.parent.childStopped(this); + this.parent && this.parent.childStopped(this); this.reference && dereference(this.reference); delete this.reference; delete this.parent; @@ -149,10 +160,10 @@ class Actor { } } -const spawn = (parent, f, name) => - applyOrThrowIfStopped(parent, p => p.assertNotStopped() && new Actor(p, name, p.system, f).reference); +const spawn = (parent, f, name, properties) => + applyOrThrowIfStopped(parent, p => p.assertNotStopped() && new Actor(p, name, p.system, f, properties).reference); -const spawnStateless = (parent, f, name) => +const spawnStateless = (parent, f, name, properties) => spawn(parent, function (state, msg, ctx) { try { f.call(ctx, msg, ctx); @@ -160,7 +171,7 @@ const spawnStateless = (parent, f, name) => console.error(e); } return true; - }, name); + }, name, properties); module.exports.spawn = spawn; module.exports.spawnStateless = spawnStateless; diff --git a/lib/index.js b/lib/index.js index eb247bc..9e3fe70 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,9 +1,11 @@ const { spawn, spawnStateless } = require('./actor'); const { stop, state$, query, dispatch } = require('./functions'); const { spawnPersistent, configurePersistence } = require('./persistence'); +const { after } = require('./utils'); module.exports = { ...require('./system'), spawn, + after, spawnStateless, query, dispatch, diff --git a/lib/persistence/persistent-actor.js b/lib/persistence/persistent-actor.js index 3869b87..8d6e1da 100644 --- a/lib/persistence/persistent-actor.js +++ b/lib/persistence/persistent-actor.js @@ -5,8 +5,8 @@ const { Promise } = require('bluebird'); const freeze = require('deep-freeze-node'); class PersistentActor extends Actor { - constructor (parent, name, system, f, key, persistenceEngine) { - super(parent, name, system, f); + constructor (parent, name, system, f, key, persistenceEngine, properties = {}) { + super(parent, name, system, f, properties); if (!key) { throw new Error('Persistence key required'); } @@ -62,7 +62,7 @@ class PersistentActor extends Actor { } const { applyOrThrowIfStopped } = require('../references'); -const spawnPersistent = (reference, f, key, name) => +const spawnPersistent = (reference, f, key, name, properties) => applyOrThrowIfStopped( reference, parent => applyOrThrowIfStopped( diff --git a/lib/utils.js b/lib/utils.js new file mode 100644 index 0000000..55849a1 --- /dev/null +++ b/lib/utils.js @@ -0,0 +1,43 @@ +class After { + constructor (amount) { + this._amount = amount; + Object.freeze(this); + } + + get hours () { + return { duration: (this._amount * 60 * 60 * 1000) | 0 }; + } + + get hour () { + return this.hours; + } + + get minutes () { + return { duration: (this._amount * 60 * 1000) | 0 }; + } + + get minute () { + return this.minutes; + } + + get seconds () { + return { duration: (this._amount * 1000) | 0 }; + } + + get second () { + return this.seconds; + } + + get milliseconds () { + return { duration: this._amount | 0 }; + } + get millisecond () { + return this.milliseconds; + } +} + +const after = (amount) => new After(amount); + +module.exports = { + after +}; diff --git a/package.json b/package.json index 80b6e87..3baf11c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nact", - "version": "3.1.5", + "version": "3.2.0", "description": "nact ⇒ node.js + actors = your services have never been so µ", "main": "lib/index.js", "scripts": { diff --git a/test/actor.js b/test/actor.js index bee43ce..05baa91 100644 --- a/test/actor.js +++ b/test/actor.js @@ -5,10 +5,10 @@ const chai = require('chai'); const chaiAsPromised = require('chai-as-promised'); chai.use(chaiAsPromised); chai.should(); -const { start, spawn, spawnStateless, dispatch, stop, query, state$ } = require('../lib'); +const { start, spawn, after, spawnStateless, dispatch, stop, query, state$ } = require('../lib'); const { Promise } = require('bluebird'); const { LocalPath } = require('../lib/paths'); -const delay = Promise.delay; +const delay = Promise.delay.bind(Promise); const { applyOrThrowIfStopped } = require('../lib/references'); const spawnChildrenEchoer = (parent, name) => @@ -176,7 +176,7 @@ describe('Actor', function () { console.error = ignore; let child = spawnStateless(system, (msg) => { throw new Error('testError'); }); dispatch(child); - delay(50); + await delay(50); isStopped(child).should.not.be.true; }); @@ -195,6 +195,35 @@ describe('Actor', function () { }); }); + describe('timeout', function () { + let system; + beforeEach(() => { system = start(); }); + afterEach(() => { + stop(system); + // reset console + delete console.error; + }); + + it('should automatically stop after timeout if timeout is specified', async function () { + console.error = ignore; + let child = spawnStateless(system, (msg) => {}, 'test', { shutdown: after(100).milliseconds }); + await delay(110); + isStopped(child).should.be.true; + }); + + it('should automatically renew timeout after message', async function () { + let child = spawnStateless(system, ignore, 'test1', { shutdown: after(60).milliseconds }); + await delay(30); + dispatch(child, {}); + await delay(40); + isStopped(child).should.not.be.true; + }); + + it('should throw if timeout does not include a duration field', async function () { + (() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw(); + }); + }); + describe('#stop()', function () { let system; beforeEach(() => { system = start(); }); @@ -224,7 +253,7 @@ describe('Actor', function () { let resultPromise = query(child, 2, 100); await delay(20); stop(child); - await resultPromise.should.be.rejectedWith(Error); + return resultPromise.should.be.rejectedWith(Error); }); it('stops children when parent is stopped', async function () { @@ -260,7 +289,7 @@ describe('Actor', function () { isStopped(child).should.be.true; }); - it('should ignore subsequent dispatchs', async function () { + it('should ignore subsequent dispatches', async function () { let child = spawnStateless(system, () => { throw new Error('Should not be triggered'); }); stop(child); await retry(() => isStopped(child).should.be.true, 12, 10); diff --git a/test/utils.js b/test/utils.js new file mode 100644 index 0000000..11505ca --- /dev/null +++ b/test/utils.js @@ -0,0 +1,29 @@ +/* eslint-env mocha */ +/* eslint-disable no-unused-expressions */ +const chai = require('chai'); +chai.should(); +const { after } = require('../lib'); + +describe('#after', function () { + it('should correctly calculate milliseconds', function () { + after(100).milliseconds.duration.should.equal(100); + after(1).millisecond.duration.should.equal(1); + after(0).milliseconds.duration.should.equal(0); + }); + it('should correctly calculate seconds', function () { + after(1).second.duration.should.equal(1000); + after(0).seconds.duration.should.equal(0); + after(10).seconds.duration.should.equal(10000); + after(1.5).seconds.duration.should.equal(1500); + }); + + it('should correctly calculate minutes', function () { + after(1).minute.duration.should.equal(60000); + after(0).minutes.duration.should.equal(0); + }); + + it('should correctly calculate hours', function () { + after(1).hour.duration.should.equal(3600000); + after(0).hours.duration.should.equal(0); + }); +});