Skip to content

Commit

Permalink
Refactored timeout and snapshotting to be simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
ncthbrt committed Dec 9, 2017
1 parent 1406529 commit 42b04d1
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 211 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -521,41 +521,42 @@ const spawnUserContactService = (parent, userId) => spawnPersistent(

Sometimes actors accumulate a lot of persisted events. This is problematic because it means that it can take a potentially long time for an actor to recover. For time-sensitive applictions, this would make nact an unsuitable proposition. Snapshotting is a way to skip replaying every single event. When a persistent actor starts up again, nact checks to see if there are any snapshots available in the *snapshot store*. Nact selects the latest snapshot. The snapshot contains the sequence number at which it was taken. The snapshot is passed as the initial state to the actor, and only the events which were persisted after the snapshot are replayed.

To modify the user contacts service to support snapshotting, we import the `every` function and refactor the code to the following:
To modify the user contacts service to support snapshotting, we refactor the code to the following:

```js
const { messages } = require('nact');
const spawnUserContactService = (parent, userId) => spawnPersistent(
parent,
// Same function as before
async (state = { contacts:{} }, msg, ctx) => {},
`contacts:${userId}`,
userId,
{ snapshot: every(20).messages.or(12).hours }
{ snapshotEvery: 20 * messages }
);
```

The final argument to `spawnPersistent` is the actor properties object. Here we are using `every` to instruct nact to make a snapshot every 20 messages or 12 hours (the timer till the next snapshot is reset if a snapshot is made sooner and visa-versa).
The final argument to `spawnPersistent` is the actor properties object. Here we are using `snapshotEvery` to instruct nact to make a snapshot every 20 messages.

### Timeouts

While not strictly a part of the persistent actor, timeouts are frequently used with snapshotting. Actors take up memory, which is still a limited resource. If an actor has not processed messages in a while, it makes sense to shut it down until it is again needed; this frees up memory. Adding a timeout to the user contacts service is similar to snapshotting:

```js
const { messages, minutes } = require('nact');
const spawnUserContactService = (parent, userId) => spawnPersistent(
parent,
// Same function as before
async (state = { contacts:{} }, msg, ctx) => {},
`contacts:${userId}`,
userId,
{ snapshot: every(20).messages.or(12).hours,
timeout: after(10).minutes
{ snapshotEvery: 20 * messages,
shutdownAfter: 10 * minutes
}
);
```

In the code above, the user contacts service stops if it hasn't received a new message in 10 minutes.


# API

## Functions
Expand All @@ -569,8 +570,7 @@ In the code above, the user contacts service stops if it hasn't received a new m
| `spawnPersistent(parent, func, persistenceKey, name = auto, options = {})` | `ActorReference` | Creates a persistent actor. Persistent actors extend stateful actors but also add a persist method to the actor context. When an actor restarts after persisting messages, the persisted messages are played back in order until no futher messages remain. The actor may then start processing new messages. The `persistenceKey` is used to retrieve the persisted messages from the actor. |
| `start(...plugins)` | `SystemReference` | Starts the actor system. Plugins is a variadic list of middleware. Currently this is only being used with `configurePersistence` |
| `state$(actor)` | `Observable<'state>` | Creates an observable which streams the current state of the actor to subscribers. |
| every(amount).[unit] | `Duration | MessageInterval` | |
| after(amount).[unit] | `Duration | MessageInterval` | |


### communication

Expand Down
10 changes: 5 additions & 5 deletions lib/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const { Subject } = require('rxjs');
const { stop } = require('./functions');

class Actor {
constructor (parent, name, system, f, { shutdown } = {}) {
constructor (parent, name, system, f, { shutdownAfter } = {}) {
this.parent = parent;
if (!name) {
name = `anonymous-${parent.children.size}`;
Expand All @@ -31,11 +31,11 @@ class Actor {
this.mailbox = new Queue();
this.immediate = undefined;
this.parent.childSpawned(this);
if (shutdown) {
if (!shutdown.duration) {
throw new Error('Shutdown should be specified as a duration. It is recommended to use the after() function to do this');
if (shutdownAfter) {
if (typeof (shutdownAfter) !== 'number') {
throw new Error('Shutdown should be specified as a number in milliseconds');
}
this.shutdownPeriod = Actor.getSafeTimeout(shutdown.duration);
this.shutdownPeriod = Actor.getSafeTimeout(shutdownAfter);
this.setTimeout();
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
const { spawn, spawnStateless } = require('./actor');
const { stop, state$, query, dispatch } = require('./functions');
const { spawnPersistent, configurePersistence } = require('./persistence');
const utils = require('./utils');
const time = require('./time');
module.exports = {
...require('./system'),
...utils,
...time,
spawn,
spawnStateless,
query,
Expand Down
58 changes: 17 additions & 41 deletions lib/persistence/persistent-actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { Promise } = require('bluebird');
const freeze = require('deep-freeze-node');

class PersistentActor extends Actor {
constructor (parent, name, system, f, key, persistenceEngine, { snapshot, ...properties } = {}) {
constructor (parent, name, system, f, key, persistenceEngine, { snapshotEvery, ...properties } = {}) {
super(parent, name, system, f, properties);
if (!key) {
throw new Error('Persistence key required');
Expand All @@ -19,12 +19,11 @@ class PersistentActor extends Actor {
this.busy = true;
this.key = key;

if (snapshot) {
this.snapshotDuration = snapshot.duration ? Actor.getSafeTimeout(snapshot.duration) : false;
this.snapshotMessageInterval = snapshot.messageInterval || false;
if (!this.snapshotMessageInterval && !this.snapshotDuration) {
throw new Error('Snapshot requires a duration and/or messages field. Correctly specifying the snapshot rule is most easily done using every()');
if (snapshotEvery) {
if (typeof (snapshotEvery) !== 'number') {
throw new Error('Shutdown should be specified as a number. The value indicates how many persisted messages ');
}
this.snapshotMessageInterval = snapshotEvery;
}

setImmediate(() => this.recover());
Expand Down Expand Up @@ -63,7 +62,6 @@ class PersistentActor extends Actor {
this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount;
}

this.resetSnapshotInterval();
this.processNext(state, sequenceNumber === 0);
});
});
Expand All @@ -72,52 +70,30 @@ class PersistentActor extends Actor {
}
}

resetSnapshotInterval () {
if (this.snapshotDuration) {
clearInterval(this.snapshotInterval);
this.snapshotInterval = setInterval(async () => {
const snapshot = new PersistedSnapshot(this.state, this.sequenceNumber, this.key);
this.messagesToNextSnapshot = this.snapshotMessageInterval;
try {
await this.persistenceEngine.takeSnapshot(snapshot);
} catch (e) {
console.error(`Failed to save snapshot ${e}`);
}
}, this.snapshotDuration);
}
}

async processNext (next, initial = false) {
if (!this.stopped && this.snapshotMessageInterval && !initial) {
--this.messagesToNextSnapshot;
if (this.messagesToNextSnapshot <= 0) {
this.resetSnapshotInterval();
this.messagesToNextSnapshot = this.snapshotMessageInterval;
await this.takeSnapshot(next);
}
}
super.processNext(next, initial);
}

async takeSnapshot (state) {
async takeSnapshot (state, sequenceNumber, key) {
try {
const snapshot = new PersistedSnapshot(state, this.sequenceNumber, this.key);
const snapshot = new PersistedSnapshot(state, sequenceNumber, key);
await this.persistenceEngine.takeSnapshot(snapshot);
} catch (e) {
console.error(`Failed to take snapshot ${e}`);
}
}

async persist (msg, tags = []) {
if (this.snapshotMessageInterval) {
--this.messagesToNextSnapshot;
if (this.messagesToNextSnapshot <= 0) {
this.messagesToNextSnapshot = this.snapshotMessageInterval;
const sequenceNumber = this.sequenceNumber;
const state = this.state;
const key = this.key;
this.takeSnapshot(state, sequenceNumber, key);
}
}
const persistedEvent = new PersistedEvent(msg, ++this.sequenceNumber, this.key, tags);
return (await (this.persistenceEngine.persist(persistedEvent))).data;
}

stop () {
super.stop();
clearInterval(this.snapshotInterval);
}

createContext () {
return { ...super.createContext.apply(this, arguments), persist: this.persist.bind(this) };
}
Expand Down
17 changes: 17 additions & 0 deletions lib/time.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
const milliseconds = 1;
const seconds = 1000 * milliseconds;
const minutes = 60 * seconds;
const hours = 60 * minutes;

module.exports = {
milliseconds,
millisecond: milliseconds,
seconds,
second: seconds,
minutes,
minute: minutes,
hours,
hour: hours,
messages: 1,
message: 1
};
72 changes: 0 additions & 72 deletions lib/utils.js

This file was deleted.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "4.1.0",
"version": "4.2.0",
"description": "nact ⇒ node.js + actors = your services have never been so µ",
"main": "lib/index.js",
"scripts": {
Expand Down
12 changes: 6 additions & 6 deletions test/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
chai.should();
const { start, spawn, after, spawnStateless, dispatch, stop, query, state$ } = require('../lib');
const { start, spawn, spawnStateless, dispatch, stop, query, state$, milliseconds } = require('../lib');
const { Promise } = require('bluebird');
const { LocalPath } = require('../lib/paths');
const delay = Promise.delay.bind(Promise);
Expand Down Expand Up @@ -195,7 +195,7 @@ describe('Actor', function () {
});
});

describe('timeout', function () {
describe('shutdownAfter', function () {
let system;
beforeEach(() => { system = start(); });
afterEach(() => {
Expand All @@ -206,21 +206,21 @@ describe('Actor', function () {

it('should automatically stop after timeout if timeout is specified', async function () {
console.error = ignore;
let child = spawnStateless(system, (msg) => {}, 'test', { shutdown: after(100).milliseconds });
let child = spawnStateless(system, (msg) => {}, 'test', { shutdownAfter: 100 * milliseconds });
await delay(110);
isStopped(child).should.be.true;
});

it('should automatically renew timeout after message', async function () {
let child = spawnStateless(system, ignore, 'test1', { shutdown: after(60).milliseconds });
let child = spawnStateless(system, ignore, 'test1', { shutdownAfter: 60 * milliseconds });
await delay(30);
dispatch(child, {});
await delay(40);
isStopped(child).should.not.be.true;
});

it('should throw if timeout does not include a duration field', async function () {
(() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw(Error);
it('should throw if timeout is not a number', async function () {
(() => spawnStateless(system, ignore, 'test1', { shutdownAfter: {} })).should.throw(Error);
});
});

Expand Down
2 changes: 1 addition & 1 deletion test/mock-persistence-engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MockPersistenceEngine extends AbstractPersistenceEngine {
this._snapshots[persistedSnapshot.key] = [...prev, persistedSnapshot];
return Promise.resolve(persistedSnapshot);
} else {
throw new Error('Elvis has left the building');
return Promise.reject(new Error('Elvis has left the building'));
}
}

Expand Down
Loading

2 comments on commit 42b04d1

@qm3ster
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, persistence can only rely on number of messages now, and not time?

@ncthbrt
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. Rehydration duration is proportional to the number of messages, so it makes more sense to rely on that as the snapshot criterion.

Please sign in to comment.