-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
executable file
·95 lines (83 loc) · 2.63 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
'use strict';
const amqp = require('amqplib');
const Executor = require('@runnerty/module-core').Executor;
class amqpExecutor extends Executor {
constructor(process) {
super(process);
this.options = {};
this.endOptions = { end: 'end' };
}
async exec(options) {
this.options = options;
//QUEUE or EXANGE is setted:
try {
if (this.options.exange) {
await this.sendMessageToExange();
} else if (this.options.queue) {
await this.sendMessageToQueue();
} else {
throw new Error('Not found queue or exange.');
}
this.end(this.endOptions);
} catch (err) {
this.endOptions.end = 'error';
this.endOptions.messageLog = err.message;
this.endOptions.err_output = err.message;
this.end(this.endOptions);
}
}
async connect() {
const connectOptions = {
protocol: this.options.protocol,
hostname: this.options.hostname,
port: this.options.port,
username: this.options.username,
password: this.options.password,
locale: this.options.locale,
frameMax: this.options.frameMax,
heartbeat: this.options.heartbeat,
vhost: this.options.vhost
};
try {
const connection = await amqp.connect(connectOptions);
const channel = await connection.createChannel();
return [connection, channel];
} catch (err) {
throw new Error(err.message);
}
}
async sendMessageToQueue() {
const [connection, channel] = await this.connect();
await channel.assertQueue(this.options.queue);
const sendResponse = channel.sendToQueue(
this.options.queue,
Buffer.from(this.options.message),
this.options.options || {}
);
if (!sendResponse) {
throw new Error(`Message not sended: ${this.options.message} to ${this.options.queue} queue.`);
}
await channel.close();
connection.close();
}
async sendMessageToExange() {
if (!this.options.exangeType) throw new Error(`For publish in exange you must set the exangeType.`);
const [connection, channel] = await this.connect();
if (this.options.queue) {
await channel.assertQueue(this.options.queue);
}
await channel.assertExchange(this.options.exange, this.options.exangeType);
const sendResponse = channel.publish(
this.options.exange,
this.options.routingKey || '',
Buffer.from(this.options.message),
this.options.options || {}
);
if (!sendResponse) {
throw new Error(`Message not sended: ${this.options.message} to ${this.options.exange} exange.`);
}
await channel.close();
connection.close();
}
}
module.exports = amqpExecutor;