diff --git a/CHANGES.md b/CHANGES.md index 0684c5f8..eb8ca5f3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,8 @@ CHANGELOG ========= ## HEAD (Unreleased) -* +* Add options.maxBufferSize and optinons.bufferFlushInterval +* Change options.global_tags to options.globalTags for conistency -------------------- @@ -10,5 +11,5 @@ CHANGELOG * Thrown error when cacheDNS flag fails to resolve DNS name ## 1.0.1 (2015-09-24) -* Start from the base of https://github.com/sivy/node-statsd * Add the event API used by DogStatsD +* Start from the base of https://github.com/sivy/node-statsd \ No newline at end of file diff --git a/README.md b/README.md index ef19cebf..4f589655 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,9 @@ Parameters (specified as an options hash): * `globalize`: Expose this StatsD instance globally? `default: false` * `cacheDns`: Cache the initial dns lookup to *host* `default: false` * `mock`: Create a mock StatsD instance, sending no stats to the server? `default: false` -* `global_tags`: Optional tags that will be added to every metric `default: []` +* `globalTags`: Tags that will be added to every metric `default: []` +* `maxBufferSize`: If larger than 0, metrics will be buffered and only sent when the string length is greater than the size. `default: 0` +* `bufferFlushInterval`: If buffering is in use, this is the time in ms to always flush any buffered metrics. `default: 1000` All StatsD methods other than event have the same API: * `name`: Stat name `required` diff --git a/lib/statsd.js b/lib/statsd.js index 71523e8a..dfe1f8ee 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -11,10 +11,12 @@ var dgram = require('dgram'), * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. - * @option global_tags {Array=} Optional tags that will be added to every metric + * @option globalTags {Array=} Optional tags that will be added to every metric + * @maxBufferSize {Number} An optional value for aggregating metrics to send, mainly for performance improvement + * @bufferFlushInterval {Number} the time out value to flush out buffer if not * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, globalTags, maxBufferSize, bufferFlushInterval) { var options = host || {}, self = this; @@ -27,7 +29,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl globalize : globalize, cacheDns : cacheDns, mock : mock === true, - global_tags : global_tags + globalTags : globalTags, + maxBufferSize : maxBufferSize, + bufferFlushInterval: bufferFlushInterval }; } @@ -37,7 +41,14 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.suffix = options.suffix || ''; this.socket = dgram.createSocket('udp4'); this.mock = options.mock; - this.global_tags = options.global_tags || []; + this.globalTags = options.globalTags || []; + this.maxBufferSize = options.maxBufferSize || 0; + this.bufferFlushInterval = options.bufferFlushInterval || 1000; + this.buffer = ""; + + if(this.maxBufferSize > 0) { + this.intervalHandle = setInterval(this.onBufferFlushInterval.bind(this), this.bufferFlushInterval); + } if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -247,7 +258,8 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac * @param callback {Function=} Callback when message is done being delivered. Optional. */ Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callback) { - var message = this.prefix + stat + this.suffix + ':' + value + '|' + type; + var message = this.prefix + stat + this.suffix + ':' + value + '|' + type, + buf; if(sampleRate && sampleRate < 1){ if(Math.random() < sampleRate){ @@ -268,35 +280,84 @@ Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callb */ Client.prototype.send = function (message, tags, callback) { var buf, - merged_tags = []; + mergedTags = []; if(tags && Array.isArray(tags)){ - merged_tags = merged_tags.concat(tags); + mergedTags = mergedTags.concat(tags); } - if(this.global_tags && Array.isArray(this.global_tags)){ - merged_tags = merged_tags.concat(this.global_tags); + if(this.globalTags && Array.isArray(this.globalTags)){ + mergedTags = mergedTags.concat(this.globalTags); } - if(merged_tags.length > 0){ - message += '|#' + merged_tags.join(','); + if(mergedTags.length > 0){ + message += '|#' + mergedTags.join(','); } // Only send this stat if we're not a mock Client. if(!this.mock) { - buf = new Buffer(message); - this.socket.send(buf, 0, buf.length, this.port, this.host, callback); - } else { + if(this.maxBufferSize === 0) { + this.sendMessage(message, callback); + } + else { + this.enqueue(message); + } + } + else { if(typeof callback === 'function'){ callback(null, 0); } } }; +/** + * Add the message to the buffer and flush the buffer if needed + * + * @param message {String} The constructed message without tags + */ +Client.prototype.enqueue = function(message){ + this.buffer += message + "\n"; + if(this.buffer.length >= this.maxBufferSize) { + this.flushQueue(); + } +}; + +/** + * Flush the buffer, sending on the messages + */ +Client.prototype.flushQueue = function(){ + this.sendMessage(this.buffer); + this.buffer = ""; +}; + +/** + * Send on the message through the socket + * + * @param message {String} The constructed message without tags + * @param callback {Function=} Callback when message is done being delivered. Optional. + */ +Client.prototype.sendMessage = function(message, callback){ + var buf = new Buffer(message); + this.socket.send(buf, 0, buf.length, this.port, this.host, callback); +}; + +/** + * Called every bufferFlushInterval to flush any buffer that is around + */ +Client.prototype.onBufferFlushInterval = function() { + if(this.buffer !== "") { + this.flushQueue(); + } +}; + /** * Close the underlying socket and stop listening for data on it. */ Client.prototype.close = function(){ - this.socket.close(); + if(this.intervalHandle) { + clearInterval(this.intervalHandle); + } + this.socket.close(); }; exports = module.exports = Client; exports.StatsD = Client; + diff --git a/perfTest/test.js b/perfTest/test.js new file mode 100644 index 00000000..73ffd6c1 --- /dev/null +++ b/perfTest/test.js @@ -0,0 +1,21 @@ +var statsD = require('../lib/statsd'); +var count = 0; +var options = { + maxBufferSize: process.argv[2] +}; +var statsd = new statsD(options); + +var start = new Date(); + +function sendPacket() { + count++; + statsd.increment('abc.cde.efg.ghk.klm', 1); + if(count %100000 === 0) { + var stop = new Date(); + console.log(stop - start); + start = stop; + } + setImmediate(sendPacket); +} + +sendPacket(); diff --git a/test/test_statsd.js b/test/test_statsd.js index bc454c71..33eea864 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -83,7 +83,7 @@ describe('StatsD', function(){ assert.equal(statsd.suffix, ''); assert.equal(global.statsd, undefined); assert.equal(statsd.mock, undefined); - assert.deepEqual(statsd.global_tags, []); + assert.deepEqual(statsd.globalTags, []); assert.ok(!statsd.mock); }); @@ -96,7 +96,7 @@ describe('StatsD', function(){ assert.equal(statsd.suffix, 'suffix'); assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); - assert.deepEqual(statsd.global_tags, ['gtag']); + assert.deepEqual(statsd.globalTags, ['gtag']); }); it('should set the proper values with options hash format', function(){ @@ -108,7 +108,7 @@ describe('StatsD', function(){ suffix: 'suffix', globalize: true, mock: true, - global_tags: ['gtag'] + globalTags: ['gtag'] }); assert.equal(statsd.host, 'host'); assert.equal(statsd.port, 1234); @@ -116,7 +116,7 @@ describe('StatsD', function(){ assert.equal(statsd.suffix, 'suffix'); assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); - assert.deepEqual(statsd.global_tags, ['gtag']); + assert.deepEqual(statsd.globalTags, ['gtag']); }); it('should attempt to cache a dns record if dnsCache is specified', function(done){ @@ -180,7 +180,7 @@ describe('StatsD', function(){ }); - describe('#global_tags', function(){ + describe('#globalTags', function(){ it('should not add global tags if they are not specified', function(finished){ udpTest(function(message, server){ assert.equal(message, 'test:1|c'); @@ -204,7 +204,7 @@ describe('StatsD', function(){ statsd = new StatsD({ host: address.address, port: address.port, - global_tags: ['gtag'] + globalTags: ['gtag'] }); statsd.increment('test'); @@ -221,7 +221,7 @@ describe('StatsD', function(){ statsd = new StatsD({ host: address.address, port: address.port, - global_tags: ['gtag'] + globalTags: ['gtag'] }); statsd.increment('test', 1337, ['foo']); @@ -679,7 +679,6 @@ describe('StatsD', function(){ assertMockClientMethod('set', finished); }); }); - describe('#event', function(finished) { it('should send proper event format for title and text', function (finished) { udpTest(function (message, server) { @@ -766,5 +765,78 @@ describe('StatsD', function(){ it('should send no event stat when a mock Client is used', function(finished){ assertMockClientMethod('event', finished); }); + }); + describe('buffer', function() { + it('should aggregate packets when maxBufferSize is set to non-zero', function (finished) { + udpTest(function (message, server) { + assert.equal(message, 'a:1|c\nb:2|c\n'); + server.close(); + finished(); + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 8 + }; + var statsd = new StatsD(options); + + statsd.increment('a', 1); + statsd.increment('b', 2); + }); + }); + + it('should not aggregate packets when maxBufferSize is set to zero', function (finished) { + var results = [ + 'a:1|c', + 'b:2|c' + ]; + var msgCount = 0; + udpTest(function (message, server) { + var index = results.indexOf(message); + assert.equal(index >= 0, true); + results.splice(index, 1); + msgCount++; + if (msgCount >= 2) { + assert.equal(results.length, 0); + server.close(); + finished(); + } + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 0 + }; + var statsd = new StatsD(options); + + statsd.increment('a', 1); + statsd.increment('b', 2); + }); + }); + + it('should flush the buffer when timeout value elapsed', function (finished) { + var timestamp; + udpTest(function (message, server) { + assert.equal(message, 'a:1|c\n'); + var elapsed = Date.now() - timestamp; + assert.equal(elapsed > 1000, true); + server.close(); + finished(); + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 1220, + bufferFlushInterval: 1100 + }; + var statsd = new StatsD(options); + + timestamp = new Date(); + statsd.increment('a', 1); + }); + }); }); });