From 3df59891548d143f4f519bccbe8f621e953ce6e1 Mon Sep 17 00:00:00 2001 From: Sean Lang Date: Sun, 23 Jul 2017 17:59:30 -0500 Subject: [PATCH 1/5] don't create array when joining tag & tag prefix --- lib/sender.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sender.js b/lib/sender.js index 9425b3d..99f9e0b 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -118,7 +118,7 @@ FluentSender.prototype._makePacketItem = function(label, data, time){ var self = this; var tag = null; if (self.tag_prefix && label) { - tag = [self.tag_prefix, label].join('.'); + tag = self.tag_prefix + '.' + label; } else if (self.tag_prefix) { tag = self.tag_prefix; } else if (label) { From 7a0ca8fb52af12ccc813fa688561fed38947b3b7 Mon Sep 17 00:00:00 2001 From: Sean Lang Date: Sun, 23 Jul 2017 18:03:04 -0500 Subject: [PATCH 2/5] remove wrapper function --- lib/sender.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 99f9e0b..7d53ed1 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -177,9 +177,7 @@ FluentSender.prototype._connect = function(callback){ self._connect(callback); }); } else { - process.nextTick(function() { - callback(); - }); + process.nextTick(callback); } } }; From bb878f6d5849131ba48dff4d5086c632e61760f1 Mon Sep 17 00:00:00 2001 From: Sean Lang Date: Sun, 23 Jul 2017 18:08:43 -0500 Subject: [PATCH 3/5] standardize quote types --- lib/sender.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 7d53ed1..845e492 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -35,16 +35,16 @@ FluentSender.prototype.emit = function(/*[label] , [timestamp], [callback] var label, data, timestamp, callback; var args = Array.prototype.slice.call(arguments); // Label must be string always - if (typeof args[0] === "string") label = args.shift(); + if (typeof args[0] === 'string') label = args.shift(); // Data can be almost anything data = args.shift(); // Date can be either timestamp number or Date object - if (typeof args[0] !== "function") timestamp = args.shift(); + if (typeof args[0] !== 'function') timestamp = args.shift(); // Last argument is an optional callback - if (typeof args[0] === "function") callback = args.shift(); + if (typeof args[0] === 'function') callback = args.shift(); var self = this; @@ -61,7 +61,7 @@ FluentSender.prototype.emit = function(/*[label] , [timestamp], [callback] self._handleEvent('error', error, callback); return; } - if (typeof item.data !== "object") { + if (typeof item.data !== 'object') { options = { tag_prefix: self.tag_prefix, label: label, @@ -125,7 +125,7 @@ FluentSender.prototype._makePacketItem = function(label, data, time){ tag = label; } - if (typeof time !== "number" && !(time instanceof EventTime)) { + if (typeof time !== 'number' && !(time instanceof EventTime)) { time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution); } @@ -241,7 +241,7 @@ FluentSender.prototype._setupErrorHandler = function _setupErrorHandler() { self.internalLogger.error('Fluentd error', error); self.internalLogger.info('Fluentd will reconnect after ' + self.reconnectInterval / 1000 + ' seconds'); setTimeout(function() { - self.internalLogger.info("Fluentd is reconnecting..."); + self.internalLogger.info('Fluentd is reconnecting...'); self._connect(function() { self.internalLogger.info('Fluentd reconnection finished!!'); }); From eaed988d1942a6c8f537131f63483866fe6b6e0b Mon Sep 17 00:00:00 2001 From: Sean Lang Date: Sun, 23 Jul 2017 18:23:37 -0500 Subject: [PATCH 4/5] `var self = this` isn't needed here You only need to do that when you're passing the value of this to another function. Also remove wrapper function from `FluentSender.emit` --- lib/sender.js | 37 ++++++++++++++++--------------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 845e492..d0a3a60 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -29,6 +29,7 @@ function FluentSender(tag_prefix, options){ this._sendQueue = []; // queue for items waiting for being sent. this._sendQueueTail = -1; this._eventEmitter = new EventEmitter(); + this._flushSendQueue = this._flushSendQueue.bind(this) } FluentSender.prototype.emit = function(/*[label] , [timestamp], [callback] */){ @@ -46,39 +47,35 @@ FluentSender.prototype.emit = function(/*[label] , [timestamp], [callback] // Last argument is an optional callback if (typeof args[0] === 'function') callback = args.shift(); - - var self = this; - var item = self._makePacketItem(label, data, timestamp); + var item = this._makePacketItem(label, data, timestamp); var error; var options; if (item.tag === null) { options = { - tag_prefix: self.tag_prefix, + tag_prefix: this.tag_prefix, label: label }; error = new FluentLoggerError.MissingTag('tag is missing', options); - self._handleEvent('error', error, callback); + this._handleEvent('error', error, callback); return; } if (typeof item.data !== 'object') { options = { - tag_prefix: self.tag_prefix, + tag_prefix: this.tag_prefix, label: label, record: item.data }; error = new FluentLoggerError.DataTypeError('data must be an object', options); - self._handleEvent('error', error, callback); + this._handleEvent('error', error, callback); return; } item.callback = callback; - self._sendQueue.push(item); - self._sendQueueTail++; - self._connect(function(){ - self._flushSendQueue(); - }); + this._sendQueue.push(item); + this._sendQueueTail++; + this._connect(this._flushSendQueue); }; ['addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'].forEach(function(attr, i){ @@ -115,12 +112,11 @@ FluentSender.prototype._close = function() { FluentSender.prototype._makePacketItem = function(label, data, time){ - var self = this; var tag = null; - if (self.tag_prefix && label) { - tag = self.tag_prefix + '.' + label; - } else if (self.tag_prefix) { - tag = self.tag_prefix; + if (this.tag_prefix && label) { + tag = this.tag_prefix + '.' + label; + } else if (this.tag_prefix) { + tag = this.tag_prefix; } else if (label) { tag = label; } @@ -131,7 +127,7 @@ FluentSender.prototype._makePacketItem = function(label, data, time){ var packet = [tag, time, data]; var options = {}; - if (self.requireAckResponse) { + if (this.requireAckResponse) { options = { chunk: crypto.randomBytes(16).toString('base64') }; @@ -225,10 +221,9 @@ FluentSender.prototype._flushSendQueue = function() { }; FluentSender.prototype._handleEvent = function _handleEvent(signal, data, callback) { - var self = this; callback && callback(data); - if (self._eventEmitter.listenerCount(signal) > 0) { - self._eventEmitter.emit(signal, data); + if (this._eventEmitter.listenerCount(signal) > 0) { + this._eventEmitter.emit(signal, data); } }; From 59dd35189813769d2f9f9f03307ef1f9f3430e06 Mon Sep 17 00:00:00 2001 From: Sean Lang Date: Sun, 23 Jul 2017 19:52:08 -0500 Subject: [PATCH 5/5] cleanup ack handling Previously, receiving multiple acks in a 100ms window cause all but the last one to be dropped. This fixes the issue and can even handle unordered acks. --- lib/sender.js | 60 +++++++++++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index d0a3a60..b59b3e2 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -26,10 +26,13 @@ function FluentSender(tag_prefix, options){ this._timeResolution = options.milliseconds ? 1 : 1000; this._socket = null; this._data = null; - this._sendQueue = []; // queue for items waiting for being sent. + this._sendQueue = []; // queue for items waiting for being sent. + this._outstandingChunks = {}; // chunks we haven't gotten a response for, indexed by chunk id this._sendQueueTail = -1; this._eventEmitter = new EventEmitter(); + this._handleAck = this._handleAck.bind(this) this._flushSendQueue = this._flushSendQueue.bind(this) + this._setupAckTimeout = this._setupAckTimeout.bind(this) } FluentSender.prototype.emit = function(/*[label] , [timestamp], [callback] */){ @@ -157,9 +160,7 @@ FluentSender.prototype._connect = function(callback){ self._socket.on('connect', function() { self._handleEvent('connect'); }); - self._socket.on('data', function(data) { - self._data = data; - }); + self._socket.on('data', self._handleAck); if (self.path) { self._socket.connect(self.path, callback); } else { @@ -178,6 +179,29 @@ FluentSender.prototype._connect = function(callback){ } }; +FluentSender.prototype._handleAck = function(data) { + var item, response = msgpack.decode(data, {codec: codec}); + if ((item = this._outstandingChunks[response.ack]) == null) { + var error = new FluentLoggerError.ResponseError( + 'ack in response does not match any outstanding chunks', + {ack: response.ack} + ); + this._handleEvent('error', error, item.callback); + } else { + clearTimeout(item.timeoutId); + delete this._outstandingChunks[response.ack]; + item.callback && item.callback(); + } +} + +FluentSender.prototype._setupAckTimeout = function(item) { + var self = this; + item.timeoutId = setTimeout(function() { + var error = new FluentLoggerError.ResponseTimeout('ack response timeout', item); + self._handleEvent('error', error, item.callback); + }, this.ackResponseTimeout); +} + FluentSender.prototype._flushSendQueue = function() { var self = this; var pos = self._sendQueue.length - self._sendQueueTail - 1; @@ -187,28 +211,12 @@ FluentSender.prototype._flushSendQueue = function() { } else { self._sendQueueTail--; self._sendQueue.shift(); - self._socket.write(new Buffer(item.packet), function(){ - if (self.requireAckResponse) { - var intervalId = setInterval(function() { - if (self._data) { - var response = msgpack.decode(self._data, { codec: codec }); - self._data = null; - clearInterval(intervalId); - clearTimeout(timeoutId); - if (response.ack !== item.options.chunk) { - var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different', - { ack: response.ack, chunk: item.options.chunk }); - self._handleEvent('error', error, item.callback); - } - } - }, 100); - var timeoutId = setTimeout(function() { - var error = new FluentLoggerError.ResponseTimeout('ack response timeout'); - self._handleEvent('error', error, item.callback); - clearInterval(intervalId); - }, self.ackResponseTimeout); - } - item.callback && item.callback(); + if (self.requireAckResponse) { + self._outstandingChunks[item.options.chunk] = item; + self._setupAckTimeout(item); + } + self._socket.write(new Buffer(item.packet), function() { + if (!self.requireAckResponse) item.callback && item.callback(); }); process.nextTick(function() { // socket is still available