Skip to content

Commit

Permalink
cleanup ack handling
Browse files Browse the repository at this point in the history
Previously, receiving multiple acks in a 100ms would cause all but the
last one to be dropped. This fixes the issue and can even handle
unordered acks.
  • Loading branch information
notslang committed Jul 24, 2017
1 parent eaed988 commit 2fcee25
Showing 1 changed file with 30 additions and 26 deletions.
56 changes: 30 additions & 26 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ function FluentSender(tag_prefix, options){
this._timeResolution = options.milliseconds ? 1 : 1000;
this._socket = null;
this._data = null;
this._sendQueue = []; // queue for items waiting for being sent.
this._sendQueue = []; // queue for items waiting for being sent.
this._outstandingChunks = {}; // chunks we haven't gotten a response for, indexed by chunk id
this._sendQueueTail = -1;
this._eventEmitter = new EventEmitter();
this._handleAck = this._handleAck.bind(this)
this._flushSendQueue = this._flushSendQueue.bind(this)
this._setupAckTimeout = this._setupAckTimeout.bind(this)
}

FluentSender.prototype.emit = function(/*[label] <data>, [timestamp], [callback] */){
Expand Down Expand Up @@ -157,9 +160,7 @@ FluentSender.prototype._connect = function(callback){
self._socket.on('connect', function() {
self._handleEvent('connect');
});
self._socket.on('data', function(data) {
self._data = data;
});
self._socket.on('data', self._handleAck);
if (self.path) {
self._socket.connect(self.path, callback);
} else {
Expand All @@ -178,6 +179,25 @@ FluentSender.prototype._connect = function(callback){
}
};

FluentSender.prototype._handleAck = function(data) {
var item, response = msgpack.decode(data, {codec: codec});
if ((item = this._outstandingChunks[response.ack]) == null) {
throw new Error('unmatched chunk id ' + response.ack)
} else {
clearTimeout(item.timeoutId);
delete this._outstandingChunks[response.ack];
item.callback && item.callback();
}
}

FluentSender.prototype._setupAckTimeout = function(item) {
var self = this;
item.timeoutId = setTimeout(function() {
var error = new FluentLoggerError.ResponseTimeout('ack response timeout');
self._handleEvent('error', error, item.callback);
}, this.ackResponseTimeout);
}

FluentSender.prototype._flushSendQueue = function() {
var self = this;
var pos = self._sendQueue.length - self._sendQueueTail - 1;
Expand All @@ -187,28 +207,12 @@ FluentSender.prototype._flushSendQueue = function() {
} else {
self._sendQueueTail--;
self._sendQueue.shift();
self._socket.write(new Buffer(item.packet), function(){
if (self.requireAckResponse) {
var intervalId = setInterval(function() {
if (self._data) {
var response = msgpack.decode(self._data, { codec: codec });
self._data = null;
clearInterval(intervalId);
clearTimeout(timeoutId);
if (response.ack !== item.options.chunk) {
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: item.options.chunk });
self._handleEvent('error', error, item.callback);
}
}
}, 100);
var timeoutId = setTimeout(function() {
var error = new FluentLoggerError.ResponseTimeout('ack response timeout');
self._handleEvent('error', error, item.callback);
clearInterval(intervalId);
}, self.ackResponseTimeout);
}
item.callback && item.callback();
if (self.requireAckResponse) {
self._outstandingChunks[item.options.chunk] = item;
self._setupAckTimeout(item);
}
self._socket.write(new Buffer(item.packet), function() {
if (!self.requireAckResponse) item.callback && item.callback();
});
process.nextTick(function() {
// socket is still available
Expand Down

0 comments on commit 2fcee25

Please sign in to comment.