Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ack handling #73

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 57 additions & 56 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,59 +26,59 @@ function FluentSender(tag_prefix, options){
this._timeResolution = options.milliseconds ? 1 : 1000;
this._socket = null;
this._data = null;
this._sendQueue = []; // queue for items waiting for being sent.
this._sendQueue = []; // queue for items waiting for being sent.
this._outstandingChunks = {}; // chunks we haven't gotten a response for, indexed by chunk id
this._sendQueueTail = -1;
this._eventEmitter = new EventEmitter();
this._handleAck = this._handleAck.bind(this)
this._flushSendQueue = this._flushSendQueue.bind(this)
this._setupAckTimeout = this._setupAckTimeout.bind(this)
}

FluentSender.prototype.emit = function(/*[label] <data>, [timestamp], [callback] */){
var label, data, timestamp, callback;
var args = Array.prototype.slice.call(arguments);
// Label must be string always
if (typeof args[0] === "string") label = args.shift();
if (typeof args[0] === 'string') label = args.shift();

// Data can be almost anything
data = args.shift();

// Date can be either timestamp number or Date object
if (typeof args[0] !== "function") timestamp = args.shift();
if (typeof args[0] !== 'function') timestamp = args.shift();

// Last argument is an optional callback
if (typeof args[0] === "function") callback = args.shift();
if (typeof args[0] === 'function') callback = args.shift();


var self = this;
var item = self._makePacketItem(label, data, timestamp);
var item = this._makePacketItem(label, data, timestamp);

var error;
var options;
if (item.tag === null) {
options = {
tag_prefix: self.tag_prefix,
tag_prefix: this.tag_prefix,
label: label
};
error = new FluentLoggerError.MissingTag('tag is missing', options);
self._handleEvent('error', error, callback);
this._handleEvent('error', error, callback);
return;
}
if (typeof item.data !== "object") {
if (typeof item.data !== 'object') {
options = {
tag_prefix: self.tag_prefix,
tag_prefix: this.tag_prefix,
label: label,
record: item.data
};
error = new FluentLoggerError.DataTypeError('data must be an object', options);
self._handleEvent('error', error, callback);
this._handleEvent('error', error, callback);
return;
}

item.callback = callback;

self._sendQueue.push(item);
self._sendQueueTail++;
self._connect(function(){
self._flushSendQueue();
});
this._sendQueue.push(item);
this._sendQueueTail++;
this._connect(this._flushSendQueue);
};

['addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'].forEach(function(attr, i){
Expand Down Expand Up @@ -115,23 +115,22 @@ FluentSender.prototype._close = function() {


FluentSender.prototype._makePacketItem = function(label, data, time){
var self = this;
var tag = null;
if (self.tag_prefix && label) {
tag = [self.tag_prefix, label].join('.');
} else if (self.tag_prefix) {
tag = self.tag_prefix;
if (this.tag_prefix && label) {
tag = this.tag_prefix + '.' + label;
} else if (this.tag_prefix) {
tag = this.tag_prefix;
} else if (label) {
tag = label;
}

if (typeof time !== "number" && !(time instanceof EventTime)) {
if (typeof time !== 'number' && !(time instanceof EventTime)) {
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution);
}

var packet = [tag, time, data];
var options = {};
if (self.requireAckResponse) {
if (this.requireAckResponse) {
options = {
chunk: crypto.randomBytes(16).toString('base64')
};
Expand Down Expand Up @@ -161,9 +160,7 @@ FluentSender.prototype._connect = function(callback){
self._socket.on('connect', function() {
self._handleEvent('connect');
});
self._socket.on('data', function(data) {
self._data = data;
});
self._socket.on('data', self._handleAck);
if (self.path) {
self._socket.connect(self.path, callback);
} else {
Expand All @@ -177,13 +174,34 @@ FluentSender.prototype._connect = function(callback){
self._connect(callback);
});
} else {
process.nextTick(function() {
callback();
});
process.nextTick(callback);
}
}
};

FluentSender.prototype._handleAck = function(data) {
var item, response = msgpack.decode(data, {codec: codec});
if ((item = this._outstandingChunks[response.ack]) == null) {
var error = new FluentLoggerError.ResponseError(
'ack in response does not match any outstanding chunks',
{ack: response.ack}
);
this._handleEvent('error', error, item.callback);
} else {
clearTimeout(item.timeoutId);
delete this._outstandingChunks[response.ack];
item.callback && item.callback();
}
}

FluentSender.prototype._setupAckTimeout = function(item) {
var self = this;
item.timeoutId = setTimeout(function() {
var error = new FluentLoggerError.ResponseTimeout('ack response timeout', item);
self._handleEvent('error', error, item.callback);
}, this.ackResponseTimeout);
}

FluentSender.prototype._flushSendQueue = function() {
var self = this;
var pos = self._sendQueue.length - self._sendQueueTail - 1;
Expand All @@ -193,28 +211,12 @@ FluentSender.prototype._flushSendQueue = function() {
} else {
self._sendQueueTail--;
self._sendQueue.shift();
self._socket.write(new Buffer(item.packet), function(){
if (self.requireAckResponse) {
var intervalId = setInterval(function() {
if (self._data) {
var response = msgpack.decode(self._data, { codec: codec });
self._data = null;
clearInterval(intervalId);
clearTimeout(timeoutId);
if (response.ack !== item.options.chunk) {
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: item.options.chunk });
self._handleEvent('error', error, item.callback);
}
}
}, 100);
var timeoutId = setTimeout(function() {
var error = new FluentLoggerError.ResponseTimeout('ack response timeout');
self._handleEvent('error', error, item.callback);
clearInterval(intervalId);
}, self.ackResponseTimeout);
}
item.callback && item.callback();
if (self.requireAckResponse) {
self._outstandingChunks[item.options.chunk] = item;
self._setupAckTimeout(item);
}
self._socket.write(new Buffer(item.packet), function() {
if (!self.requireAckResponse) item.callback && item.callback();
});
process.nextTick(function() {
// socket is still available
Expand All @@ -227,10 +229,9 @@ FluentSender.prototype._flushSendQueue = function() {
};

FluentSender.prototype._handleEvent = function _handleEvent(signal, data, callback) {
var self = this;
callback && callback(data);
if (self._eventEmitter.listenerCount(signal) > 0) {
self._eventEmitter.emit(signal, data);
if (this._eventEmitter.listenerCount(signal) > 0) {
this._eventEmitter.emit(signal, data);
}
};

Expand All @@ -243,7 +244,7 @@ FluentSender.prototype._setupErrorHandler = function _setupErrorHandler() {
self.internalLogger.error('Fluentd error', error);
self.internalLogger.info('Fluentd will reconnect after ' + self.reconnectInterval / 1000 + ' seconds');
setTimeout(function() {
self.internalLogger.info("Fluentd is reconnecting...");
self.internalLogger.info('Fluentd is reconnecting...');
self._connect(function() {
self.internalLogger.info('Fluentd reconnection finished!!');
});
Expand Down