From 11ec649799bb0b6e69e4737faac3a38a17d377a8 Mon Sep 17 00:00:00 2001 From: Petr Pchelko Date: Wed, 16 Dec 2015 14:33:22 -0800 Subject: [PATCH 1/4] Added batching mode --- lib/statsd.js | 71 +++++++++++++++++++++++++++++++++++++++++++-- package.json | 2 +- test/test_statsd.js | 58 ++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 4 deletions(-) diff --git a/lib/statsd.js b/lib/statsd.js index f27383b..060d5c9 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -1,6 +1,16 @@ var dgram = require('dgram'), dns = require('dns'); +/** + * @const + */ +var MAX_BATCH_INTERVAL = 1000; + +/** + * @const + */ +var MAX_MESSAGES_IN_BATCH = 20; + /** * The UDP Client for StatsD * @param options @@ -12,9 +22,10 @@ var dgram = require('dgram'), * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. * @option global_tags {Array=} Optional tags that will be added to every metric + * @option batch {boolean} Whether to send metrics in batches * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, batch) { var options = host || {}, self = this; @@ -27,7 +38,8 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl globalize : globalize, cacheDns : cacheDns, mock : mock === true, - global_tags : global_tags + global_tags : global_tags, + batch : batch }; } @@ -38,6 +50,7 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.socket = dgram.createSocket('udp4'); this.mock = options.mock; this.global_tags = options.global_tags || []; + this.batch = options.batch; if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -50,6 +63,46 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl if(options.globalize){ global.statsd = this; } + + if (options.batch) { + this._initBatching(); + } +}; + +Client.prototype._initBatching = function() { + this.pendingMessages = []; + this.sendInterval = setInterval(Client.prototype._sendBatch.bind(this), MAX_BATCH_INTERVAL); +}; + +Client.prototype._sendBatch = function() { + if (this.pendingMessages.length) { + var batchMessage = this.pendingMessages.map(function(msg) { return msg.message; }).join('\n'); + var callbacks = this.pendingMessages.map(function(msg) { return msg.callback; }); + this._send(batchMessage, function(err) { + callbacks.forEach(function(callback) { + if (typeof callback === 'function') { + callback(err); + } + }); + }); + this.pendingMessages = []; + } +}; + +Client.prototype._addToBatch = function(message, callback) { + if (!!message) { + this.pendingMessages.push({ + message: message, + callback: callback + }); + } + + if (this.pendingMessages.length > MAX_MESSAGES_IN_BATCH) { + this._sendBatch(); + // Restart the timer as we've just sent the batches + clearInterval(this.sendInterval); + this._initBatching(); + } }; /** @@ -215,6 +268,14 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) message += '|#' + merged_tags.join(','); } + if (this.batch) { + this._addToBatch(message, callback); + } else { + this._send(message, callback); + } +}; + +Client.prototype._send = function(message, callback) { // Only send this stat if we're not a mock Client. if(!this.mock) { buf = new Buffer(message); @@ -230,8 +291,12 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) * Close the underlying socket and stop listening for data on it. */ Client.prototype.close = function(){ + if (this.batch) { + clearInterval(this.sendInterval); + this.sendInterval = null; + } this.socket.close(); -} +}; exports = module.exports = Client; exports.StatsD = Client; diff --git a/package.json b/package.json index 5a672b3..f47efd7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name" : "node-statsd" , "description" : "node client for Etsy'd StatsD server" -, "version" : "0.1.1" +, "version" : "0.1.2" , "author" : "Steve Ivy" , "contributors": [ "Russ Bradberry " ] , "repository" : diff --git a/test/test_statsd.js b/test/test_statsd.js index 0fbb314..4827086 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -680,4 +680,62 @@ describe('StatsD', function(){ }); }); + describe('#batching', function() { + it('should send batches by 20 messages', function(finished) { + var message = 'a:1|c'; + var batch = []; + for(var i = 0; i < 21; i++) { batch.push(message); } + + udpTest(function(message, server) { + assert.equal(message, batch.join('\n')); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true + }); + for (var i = 0; i < 21; i++) { + statsd.increment('a', 1); + } + }); + }); + it('should send a batch after one second', function(finished) { + udpTest(function(message, server) { + assert.equal(message, 'a:1|c'); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true + }); + statsd.increment('a', 1); + }); + }); + it('should send batches of different messages', function(finished) { + udpTest(function(message, server) { + assert.equal(message, 'a:1|c\nb:-1|c\nc:1|g\nd:2|h\ne:3|ms\nf:5|s'); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true + }); + statsd.increment('a', 1); + statsd.decrement('b', 1); + statsd.gauge('c', 1); + statsd.histogram('d', 2); + statsd.timing('e', 3); + statsd.set('f', 5); + }); + }); + }); }); From 579b749e219050878bede069c1ef45c8161b946e Mon Sep 17 00:00:00 2001 From: Petr Pchelko Date: Wed, 16 Dec 2015 17:35:21 -0800 Subject: [PATCH 2/4] Renamed the constants and replaced an interval with a timeout to save some CPU cycles --- lib/statsd.js | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/lib/statsd.js b/lib/statsd.js index 060d5c9..d243e6b 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -4,12 +4,12 @@ var dgram = require('dgram'), /** * @const */ -var MAX_BATCH_INTERVAL = 1000; +var MAX_BATCH_DELAY = 1000; /** * @const */ -var MAX_MESSAGES_IN_BATCH = 20; +var MAX_BATCH_MESSAGES = 20; /** * The UDP Client for StatsD @@ -65,15 +65,10 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl } if (options.batch) { - this._initBatching(); + this.pendingMessages = []; } }; -Client.prototype._initBatching = function() { - this.pendingMessages = []; - this.sendInterval = setInterval(Client.prototype._sendBatch.bind(this), MAX_BATCH_INTERVAL); -}; - Client.prototype._sendBatch = function() { if (this.pendingMessages.length) { var batchMessage = this.pendingMessages.map(function(msg) { return msg.message; }).join('\n'); @@ -90,18 +85,25 @@ Client.prototype._sendBatch = function() { }; Client.prototype._addToBatch = function(message, callback) { - if (!!message) { + if (!! message) { this.pendingMessages.push({ message: message, callback: callback }); } - if (this.pendingMessages.length > MAX_MESSAGES_IN_BATCH) { + if (this.pendingMessages.length > MAX_BATCH_MESSAGES) { this._sendBatch(); - // Restart the timer as we've just sent the batches - clearInterval(this.sendInterval); - this._initBatching(); + // Reset the timer as we've just sent the batches + if (this.sendTimeout) { + clearTimeout(this.sendTimeout); + this.sendTimeout = null; + } + } else if (!this.sendTimeout) { + // If we are not sending a batch now and didn't have a send timeout yet + // we need to init it to ensure this message will be sent not more that + // after MAX_BATCH_DELAY + this.sendTimeout = setTimeout(this._sendBatch.bind(this), MAX_BATCH_DELAY); } }; @@ -291,9 +293,9 @@ Client.prototype._send = function(message, callback) { * Close the underlying socket and stop listening for data on it. */ Client.prototype.close = function(){ - if (this.batch) { - clearInterval(this.sendInterval); - this.sendInterval = null; + if (this.batch && this.sendTimeout) { + clearTimeout(this.sendTimeout); + this.sendTimeout = null; } this.socket.close(); }; From 1d77076fb265ed8d00e7d62256185bdb99946609 Mon Sep 17 00:00:00 2001 From: Petr Pchelko Date: Thu, 17 Dec 2015 09:47:07 -0800 Subject: [PATCH 3/4] Made batching parameters configurable, send pending messages on client close --- lib/statsd.js | 80 ++++++++++++++++++++++++++++----------------- test/test_statsd.js | 16 +++++++++ 2 files changed, 66 insertions(+), 30 deletions(-) diff --git a/lib/statsd.js b/lib/statsd.js index d243e6b..d96c704 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -4,42 +4,46 @@ var dgram = require('dgram'), /** * @const */ -var MAX_BATCH_DELAY = 1000; +var DEFAULT_MAX_BATCH_DELAY = 1000; /** * @const */ -var MAX_BATCH_MESSAGES = 20; +var DEFAULT_MAX_BATCH_SIZE = 20; /** * The UDP Client for StatsD * @param options - * @option host {String} The host to connect to default: localhost - * @option port {String|Integer} The port to connect to default: 8125 - * @option prefix {String} An optional prefix to assign to each stat name sent - * @option suffix {String} An optional suffix to assign to each stat name sent - * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace - * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once - * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. - * @option global_tags {Array=} Optional tags that will be added to every metric - * @option batch {boolean} Whether to send metrics in batches + * @option host {String} The host to connect to default: localhost + * @option port {String|Integer} The port to connect to default: 8125 + * @option prefix {String} An optional prefix to assign to each stat name sent + * @option suffix {String} An optional suffix to assign to each stat name sent + * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace + * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once + * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. + * @option global_tags {Array=} Optional tags that will be added to every metric + * @option batch {boolean} Whether to send metrics in batches + * @option maxBatchDelay {Integer} Maximum period for batch accumulation in millisecond + * @option maxBatchSize {Integer} Maximum number of messages in a batch * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, batch) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, batch, maxBatchDelay, maxBatchSize) { var options = host || {}, self = this; if(arguments.length > 1 || typeof(host) === 'string'){ options = { - host : host, - port : port, - prefix : prefix, - suffix : suffix, - globalize : globalize, - cacheDns : cacheDns, - mock : mock === true, - global_tags : global_tags, - batch : batch + host : host, + port : port, + prefix : prefix, + suffix : suffix, + globalize : globalize, + cacheDns : cacheDns, + mock : mock === true, + global_tags : global_tags, + batch : batch, + maxBatchDelay : maxBatchDelay, + maxBatchSize : maxBatchSize }; } @@ -51,6 +55,8 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.mock = options.mock; this.global_tags = options.global_tags || []; this.batch = options.batch; + this.maxBatchDelay = options.maxBatchDelay || DEFAULT_MAX_BATCH_DELAY; + this.maxBatchSize = options.maxBatchSize || DEFAULT_MAX_BATCH_SIZE; if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -69,11 +75,14 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl } }; -Client.prototype._sendBatch = function() { +Client.prototype._sendBatch = function(batchCallback) { if (this.pendingMessages.length) { var batchMessage = this.pendingMessages.map(function(msg) { return msg.message; }).join('\n'); var callbacks = this.pendingMessages.map(function(msg) { return msg.callback; }); this._send(batchMessage, function(err) { + if (typeof batchCallback === 'function') { + batchCallback(err); + } callbacks.forEach(function(callback) { if (typeof callback === 'function') { callback(err); @@ -81,6 +90,8 @@ Client.prototype._sendBatch = function() { }); }); this.pendingMessages = []; + } else if (typeof batchCallback === 'function') { + batchCallback(); } }; @@ -92,7 +103,7 @@ Client.prototype._addToBatch = function(message, callback) { }); } - if (this.pendingMessages.length > MAX_BATCH_MESSAGES) { + if (this.pendingMessages.length > this.maxBatchSize) { this._sendBatch(); // Reset the timer as we've just sent the batches if (this.sendTimeout) { @@ -102,8 +113,8 @@ Client.prototype._addToBatch = function(message, callback) { } else if (!this.sendTimeout) { // If we are not sending a batch now and didn't have a send timeout yet // we need to init it to ensure this message will be sent not more that - // after MAX_BATCH_DELAY - this.sendTimeout = setTimeout(this._sendBatch.bind(this), MAX_BATCH_DELAY); + // after maxBatchDelay + this.sendTimeout = setTimeout(this._sendBatch.bind(this), this.maxBatchDelay); } }; @@ -292,12 +303,21 @@ Client.prototype._send = function(message, callback) { /** * Close the underlying socket and stop listening for data on it. */ -Client.prototype.close = function(){ - if (this.batch && this.sendTimeout) { - clearTimeout(this.sendTimeout); - this.sendTimeout = null; +Client.prototype.close = function() { + var self = this; + if (self.batch) { + // Clear the send timeout + if (self.sendTimeout) { + clearTimeout(this.sendTimeout); + self.sendTimeout = null; + } + // And send pending messages immediately + self._sendBatch(function() { + self.socket.close(); + }); + } else { + self.socket.close(); } - this.socket.close(); }; exports = module.exports = Client; diff --git a/test/test_statsd.js b/test/test_statsd.js index 4827086..b50a303 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -737,5 +737,21 @@ describe('StatsD', function(){ statsd.set('f', 5); }); }); + it('should send pending messages after client is closed', function(finished) { + udpTest(function(message, server) { + assert.equal(message, 'a:1|c'); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true + }); + statsd.increment('a', 1); + statsd.close(); + }); + }); }); }); From d20e17309493216447df43dcdb52879e60d8d4f6 Mon Sep 17 00:00:00 2001 From: Petr Pchelko Date: Fri, 18 Dec 2015 08:44:29 -0800 Subject: [PATCH 4/4] Control the max batch length in bytes, not the max num of messages --- lib/statsd.js | 83 +++++++++++++++++++++++---------------------- test/test_statsd.js | 15 ++++---- 2 files changed, 49 insertions(+), 49 deletions(-) diff --git a/lib/statsd.js b/lib/statsd.js index d96c704..86ff3cc 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -9,41 +9,41 @@ var DEFAULT_MAX_BATCH_DELAY = 1000; /** * @const */ -var DEFAULT_MAX_BATCH_SIZE = 20; +var DEFAULT_MAX_BATCH_LENGTH = 1500; /** * The UDP Client for StatsD * @param options - * @option host {String} The host to connect to default: localhost - * @option port {String|Integer} The port to connect to default: 8125 - * @option prefix {String} An optional prefix to assign to each stat name sent - * @option suffix {String} An optional suffix to assign to each stat name sent - * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace - * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once - * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. - * @option global_tags {Array=} Optional tags that will be added to every metric - * @option batch {boolean} Whether to send metrics in batches - * @option maxBatchDelay {Integer} Maximum period for batch accumulation in millisecond - * @option maxBatchSize {Integer} Maximum number of messages in a batch + * @option host {String} The host to connect to default: localhost + * @option port {String|Integer} The port to connect to default: 8125 + * @option prefix {String} An optional prefix to assign to each stat name sent + * @option suffix {String} An optional suffix to assign to each stat name sent + * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace + * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once + * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. + * @option global_tags {Array=} Optional tags that will be added to every metric + * @option batch {boolean} Whether to send metrics in batches + * @option maxBatchDelay {Integer} Maximum period for batch accumulation in millisecond + * @option maxBatchLength {Integer} Maximum length of a packet, should not be more then the network MTU * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, batch, maxBatchDelay, maxBatchSize) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, batch, maxBatchDelay, maxBatchLength) { var options = host || {}, self = this; if(arguments.length > 1 || typeof(host) === 'string'){ options = { - host : host, - port : port, - prefix : prefix, - suffix : suffix, - globalize : globalize, - cacheDns : cacheDns, - mock : mock === true, - global_tags : global_tags, - batch : batch, - maxBatchDelay : maxBatchDelay, - maxBatchSize : maxBatchSize + host : host, + port : port, + prefix : prefix, + suffix : suffix, + globalize : globalize, + cacheDns : cacheDns, + mock : mock === true, + global_tags : global_tags, + batch : batch, + maxBatchDelay : maxBatchDelay, + maxBatchLength : maxBatchLength }; } @@ -56,7 +56,7 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.global_tags = options.global_tags || []; this.batch = options.batch; this.maxBatchDelay = options.maxBatchDelay || DEFAULT_MAX_BATCH_DELAY; - this.maxBatchSize = options.maxBatchSize || DEFAULT_MAX_BATCH_SIZE; + this.maxBatchLength = options.maxBatchLength || DEFAULT_MAX_BATCH_LENGTH; if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -72,6 +72,7 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl if (options.batch) { this.pendingMessages = []; + this.batchLength = 0; } }; @@ -90,31 +91,33 @@ Client.prototype._sendBatch = function(batchCallback) { }); }); this.pendingMessages = []; + this.batchLength = 0; } else if (typeof batchCallback === 'function') { batchCallback(); } }; Client.prototype._addToBatch = function(message, callback) { - if (!! message) { + if (!!message) { + if (this.batchLength + message.length > this.maxBatchLength) { + this._sendBatch(); + // Reset the timer as we've just sent the batches + if (this.sendTimeout) { + clearTimeout(this.sendTimeout); + this.sendTimeout = null; + } + } else if (!this.sendTimeout) { + // If we are not sending a batch now and didn't have a send timeout yet + // we need to init it to ensure this message will be sent not more that + // after maxBatchDelay + this.sendTimeout = setTimeout(this._sendBatch.bind(this), this.maxBatchDelay); + } + this.pendingMessages.push({ message: message, callback: callback }); - } - - if (this.pendingMessages.length > this.maxBatchSize) { - this._sendBatch(); - // Reset the timer as we've just sent the batches - if (this.sendTimeout) { - clearTimeout(this.sendTimeout); - this.sendTimeout = null; - } - } else if (!this.sendTimeout) { - // If we are not sending a batch now and didn't have a send timeout yet - // we need to init it to ensure this message will be sent not more that - // after maxBatchDelay - this.sendTimeout = setTimeout(this._sendBatch.bind(this), this.maxBatchDelay); + this.batchLength += message.length; } }; diff --git a/test/test_statsd.js b/test/test_statsd.js index b50a303..7063141 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -681,13 +681,9 @@ describe('StatsD', function(){ }); describe('#batching', function() { - it('should send batches by 20 messages', function(finished) { - var message = 'a:1|c'; - var batch = []; - for(var i = 0; i < 21; i++) { batch.push(message); } - + it('should send batches not longer than the max length', function(finished) { udpTest(function(message, server) { - assert.equal(message, batch.join('\n')); + assert.equal(message, 'a:1|c\na:1|c'); server.close(); finished(); }, function(server) { @@ -695,9 +691,10 @@ describe('StatsD', function(){ statsd = new StatsD({ host: address.address, port: address.port, - batch: true + batch: true, + maxBatchLength: 10 }); - for (var i = 0; i < 21; i++) { + for (var i = 0; i < 5; i++) { statsd.increment('a', 1); } }); @@ -727,7 +724,7 @@ describe('StatsD', function(){ statsd = new StatsD({ host: address.address, port: address.port, - batch: true + batch: true, }); statsd.increment('a', 1); statsd.decrement('b', 1);