diff --git a/lib/sender.js b/lib/sender.js index d0a3a60..93380ed 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -26,10 +26,13 @@ function FluentSender(tag_prefix, options){ this._timeResolution = options.milliseconds ? 1 : 1000; this._socket = null; this._data = null; - this._sendQueue = []; // queue for items waiting for being sent. + this._sendQueue = []; // queue for items waiting for being sent. + this._outstandingChunks = {}; // chunks we haven't gotten a response for, indexed by chunk id this._sendQueueTail = -1; this._eventEmitter = new EventEmitter(); + this._handleAck = this._handleAck.bind(this) this._flushSendQueue = this._flushSendQueue.bind(this) + this._setupAckTimeout = this._setupAckTimeout.bind(this) } FluentSender.prototype.emit = function(/*[label] , [timestamp], [callback] */){ @@ -157,9 +160,7 @@ FluentSender.prototype._connect = function(callback){ self._socket.on('connect', function() { self._handleEvent('connect'); }); - self._socket.on('data', function(data) { - self._data = data; - }); + self._socket.on('data', self._handleAck); if (self.path) { self._socket.connect(self.path, callback); } else { @@ -178,6 +179,25 @@ FluentSender.prototype._connect = function(callback){ } }; +FluentSender.prototype._handleAck = function(data) { + var item, response = msgpack.decode(data, {codec: codec}); + if ((item = this._outstandingChunks[response.ack]) == null) { + throw new Error('unmatched chunk id ' + response.ack) + } else { + clearTimeout(item.timeoutId); + delete this._outstandingChunks[response.ack]; + item.callback && item.callback(); + } +} + +FluentSender.prototype._setupAckTimeout = function(item) { + var self = this; + item.timeoutId = setTimeout(function() { + var error = new FluentLoggerError.ResponseTimeout('ack response timeout'); + self._handleEvent('error', error, item.callback); + }, this.ackResponseTimeout); +} + FluentSender.prototype._flushSendQueue = function() { var self = this; var pos = self._sendQueue.length - self._sendQueueTail - 1; @@ -187,28 +207,12 @@ FluentSender.prototype._flushSendQueue = function() { } else { self._sendQueueTail--; self._sendQueue.shift(); - self._socket.write(new Buffer(item.packet), function(){ - if (self.requireAckResponse) { - var intervalId = setInterval(function() { - if (self._data) { - var response = msgpack.decode(self._data, { codec: codec }); - self._data = null; - clearInterval(intervalId); - clearTimeout(timeoutId); - if (response.ack !== item.options.chunk) { - var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different', - { ack: response.ack, chunk: item.options.chunk }); - self._handleEvent('error', error, item.callback); - } - } - }, 100); - var timeoutId = setTimeout(function() { - var error = new FluentLoggerError.ResponseTimeout('ack response timeout'); - self._handleEvent('error', error, item.callback); - clearInterval(intervalId); - }, self.ackResponseTimeout); - } - item.callback && item.callback(); + if (self.requireAckResponse) { + self._outstandingChunks[item.options.chunk] = item; + self._setupAckTimeout(item); + } + self._socket.write(new Buffer(item.packet), function() { + if (!self.requireAckResponse) item.callback && item.callback(); }); process.nextTick(function() { // socket is still available