-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
73 lines (65 loc) · 2.07 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
const EventEmitter = require('events');
var rabbit = require('rabbot');
const EVENT_PREFIX = "RABBITMQ_";
module.exports = function ({config, events, log, listening}) {
log = log || function() {};
events = events || new EventEmitter();
listening = (listening === undefined)?true:listening;
rabbit.on( "unreachable", function() {
log("Couldn't connect. Retrying");
rabbit.retry();
});
rabbit.configure(config).then(() => {
if (listening)
startListening();
}).catch(function(err) {
log("Rabbit startup error is", err);
});
process.on('SIGINT', () => {
rabbit.shutdown();
});
function on(event, listener) {
events.on(EVENT_PREFIX + event, listener);
}
function emit(event, payload, ct) {
ct = ct || 0;
var message = {};
message.routingKey = 'email';
message.body = {
payload: payload,
type: event,
};
rabbit.publish("worker.exchange", message).then(function() {
log('Sending successful! ', payload);
}).catch(function(err) {
log("Sending error is", err, ". Trying again..");
if ((ct+1)%5 == 0) {
log("Retrying rabbit mq connection");
rabbit.retry();
}
setTimeout(emit.bind(this, event, payload, ct+1), Math.max(ct+1, 6)*500);
});
}
function handleMessage(message) {
if (!message || !message.body || !message.body.type)
log("Broken message!", message);
try {
events.emit(EVENT_PREFIX + message.body.type, message.body.payload);
message.ack();
log("Received message", message.body);
} catch(err) {
message.nack();
}
};
function startListening() {
rabbit.handle({}, handleMessage);
rabbit.onUnhandled( function( message ) {
log("Unhandled message here", message);
});
rabbit.startSubscription(config.queues[0].name);
};
return {
emit,
on
};
}