From 36567fc9eaa0f968adc2db56c9dc7ca5fb0f5376 Mon Sep 17 00:00:00 2001 From: jjofseattle Date: Mon, 19 Oct 2015 01:18:27 -0700 Subject: [PATCH 1/5] add buffer write --- lib/statsd.js | 42 +++++++++++++++++++++++++++++++++++++++--- test.js | 28 ++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) create mode 100644 test.js diff --git a/lib/statsd.js b/lib/statsd.js index f27383b0..d1826a02 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -27,7 +27,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl globalize : globalize, cacheDns : cacheDns, mock : mock === true, - global_tags : global_tags + global_tags : global_tags, + maxBufferSize : maxBufferSize, + bufferFlushInterval: bufferFlushInteral }; } @@ -38,6 +40,13 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.socket = dgram.createSocket('udp4'); this.mock = options.mock; this.global_tags = options.global_tags || []; + this.maxBufferSize = options.maxBufferSize || 0; + this.bufferFlushInterval = options.bufferFlushInterval || 1000; + this.buffer = ""; + + if(this.maxBufferSize > 0) { + setInterval(this.timeoutCallback.bind(this), this.bufferFlushInterval); + } if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -217,8 +226,12 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) // Only send this stat if we're not a mock Client. if(!this.mock) { - buf = new Buffer(message); - this.socket.send(buf, 0, buf.length, this.port, this.host, callback); + if(this.maxBufferSize === 0) { + this.sendMessage(message); + } + else { + this.enqueue(message); + } } else { if(typeof callback === 'function'){ callback(null, 0); @@ -226,6 +239,29 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) } }; +Client.prototype.enqueue = function(message){ + this.buffer += message + "\n"; + if(this.buffer.length > this.maxBufferSize) { + this.flushQueue(); + } +} + +Client.prototype.flushQueue = function(){ + this.sendMessage(this.buffer); + this.buffer = ""; +} + +Client.prototype.sendMessage = function(message){ + buf = new Buffer(message); + this.socket.send(buf, 0, buf.length, this.port, this.host); +} + +Client.prototype.timeoutCallback = function(){ + if(this.buffer !== "") { + this.flushQueue(); + } +} + /** * Close the underlying socket and stop listening for data on it. */ diff --git a/test.js b/test.js new file mode 100644 index 00000000..1449ce00 --- /dev/null +++ b/test.js @@ -0,0 +1,28 @@ +var statsD = require('node-statsd'); +var count = 0; +var options = { + maxBufferSize: 0 +}; +var statsd = new statsD(options); + +var start = new Date(); + +function sendPacket() { + count++; + statsd.increment('abc.cde.efg.ghk.klm', 1); + //process.nextTick(sendPacket); + if(count %1000000 === 0) { + var stop = new Date(); + console.log(stop - start); + start = stop; + } + setImmediate(sendPacket); +} + +function counting() { + console.log(count); + count = 0; + setInterval(counting, 10000); +} + +sendPacket(); From 0566656dc9f875c30025183727296b0c315e90e7 Mon Sep 17 00:00:00 2001 From: jjofseattle Date: Mon, 19 Oct 2015 16:21:32 -0700 Subject: [PATCH 2/5] add tests --- lib/statsd.js | 58 ++++++++++++++++++++++++------------ test/test_statsd.js | 72 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 18 deletions(-) diff --git a/lib/statsd.js b/lib/statsd.js index d1826a02..86b9a4a8 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -12,9 +12,11 @@ var dgram = require('dgram'), * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. * @option global_tags {Array=} Optional tags that will be added to every metric + * @maxBufferSize {Number} An optional value for aggregating metrics to send, mainly for performance improvement + * @bufferFlushInterval {Number} the time out value to flush out buffer if not * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, maxBufferSize, bufferFlushInterval) { var options = host || {}, self = this; @@ -29,7 +31,7 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl mock : mock === true, global_tags : global_tags, maxBufferSize : maxBufferSize, - bufferFlushInterval: bufferFlushInteral + bufferFlushInterval: bufferFlushInterval }; } @@ -45,7 +47,7 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.buffer = ""; if(this.maxBufferSize > 0) { - setInterval(this.timeoutCallback.bind(this), this.bufferFlushInterval); + this.intervalHandle = setInterval(this.timeoutCallback.bind(this), this.bufferFlushInterval); } if(options.cacheDns === true){ @@ -227,47 +229,67 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) // Only send this stat if we're not a mock Client. if(!this.mock) { if(this.maxBufferSize === 0) { - this.sendMessage(message); + this.sendMessage(message, callback); } else { this.enqueue(message); } - } else { + } + else { if(typeof callback === 'function'){ callback(null, 0); } } }; +/** + * + * @param message {String} + */ Client.prototype.enqueue = function(message){ - this.buffer += message + "\n"; - if(this.buffer.length > this.maxBufferSize) { - this.flushQueue(); - } + this.buffer += message + "\n"; + if(this.buffer.length > this.maxBufferSize) { + this.flushQueue(); + } } +/** + * + */ Client.prototype.flushQueue = function(){ - this.sendMessage(this.buffer); - this.buffer = ""; + this.sendMessage(this.buffer); + this.buffer = ""; } -Client.prototype.sendMessage = function(message){ - buf = new Buffer(message); - this.socket.send(buf, 0, buf.length, this.port, this.host); +/** + * + * @param message {String} + * @param callback {Function} + */ +Client.prototype.sendMessage = function(message, callback){ + buf = new Buffer(message); + this.socket.send(buf, 0, buf.length, this.port, this.host, callback); } +/** + * + */ Client.prototype.timeoutCallback = function(){ - if(this.buffer !== "") { - this.flushQueue(); - } + if(this.buffer !== "") { + this.flushQueue(); + } } /** * Close the underlying socket and stop listening for data on it. */ Client.prototype.close = function(){ - this.socket.close(); + if(this.intervalHandle) { + clearInterval(this.intervalHandle); + } + this.socket.close(); } exports = module.exports = Client; exports.StatsD = Client; + diff --git a/test/test_statsd.js b/test/test_statsd.js index 0fbb314b..f66ce86c 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -679,5 +679,77 @@ describe('StatsD', function(){ assertMockClientMethod('set', finished); }); }); + describe('buffer', function() { + it('should aggregate packets when maxBufferSize is set to non-zero', function (finished) { + udpTest(function (message, server) { + assert.equal(message, 'a:1|c\nb:2|c\n'); + server.close(); + finished(); + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 8 + }; + var statsd = new StatsD(options); + + statsd.increment('a', 1); + statsd.increment('b', 2); + }); + }); + it('should not aggregate packets when maxBufferSize is set to zero', function (finished) { + var results = [ + 'a:1|c', + 'b:2|c' + ]; + var msgCount = 0; + udpTest(function (message, server) { + var index = results.indexOf(message); + assert.equal(index >= 0, true); + results.splice(index, 1); + msgCount++; + if (msgCount >= 2) { + assert.equal(results.length, 0); + server.close(); + finished(); + } + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 0 + }; + var statsd = new StatsD(options); + + statsd.increment('a', 1); + statsd.increment('b', 2); + }); + }); + + it('should flush the buffer when timeout value elapsed', function (finished) { + var timestamp; + udpTest(function (message, server) { + assert.equal(message, 'a:1|c\n'); + var elapsed = Date.now() - timestamp; + assert.equal(elapsed > 1000, true); + server.close(); + finished(); + }, function (server) { + var address = server.address(); + var options = { + host: address.host, + port: address.port, + maxBufferSize: 1220, + bufferFlushInterval: 1100 + }; + var statsd = new StatsD(options); + + timestamp = new Date(); + statsd.increment('a', 1); + }); + }); + }); }); From a0c2cec7c04ec00b29eb0b4165a0a768167313b6 Mon Sep 17 00:00:00 2001 From: jjofseattle Date: Mon, 19 Oct 2015 16:46:34 -0700 Subject: [PATCH 3/5] fix minor bugs --- lib/statsd.js | 5 ++--- test.js => perfTest/test.js | 13 +++---------- 2 files changed, 5 insertions(+), 13 deletions(-) rename test.js => perfTest/test.js (58%) diff --git a/lib/statsd.js b/lib/statsd.js index 86b9a4a8..739142c5 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -204,7 +204,6 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac */ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) { var message = this.prefix + stat + this.suffix + ':' + value + '|' + type, - buf, merged_tags = []; if(sampleRate && sampleRate < 1){ @@ -248,7 +247,7 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) */ Client.prototype.enqueue = function(message){ this.buffer += message + "\n"; - if(this.buffer.length > this.maxBufferSize) { + if(this.buffer.length >= this.maxBufferSize) { this.flushQueue(); } } @@ -267,7 +266,7 @@ Client.prototype.flushQueue = function(){ * @param callback {Function} */ Client.prototype.sendMessage = function(message, callback){ - buf = new Buffer(message); + var buf = new Buffer(message); this.socket.send(buf, 0, buf.length, this.port, this.host, callback); } diff --git a/test.js b/perfTest/test.js similarity index 58% rename from test.js rename to perfTest/test.js index 1449ce00..73ffd6c1 100644 --- a/test.js +++ b/perfTest/test.js @@ -1,7 +1,7 @@ -var statsD = require('node-statsd'); +var statsD = require('../lib/statsd'); var count = 0; var options = { - maxBufferSize: 0 + maxBufferSize: process.argv[2] }; var statsd = new statsD(options); @@ -10,8 +10,7 @@ var start = new Date(); function sendPacket() { count++; statsd.increment('abc.cde.efg.ghk.klm', 1); - //process.nextTick(sendPacket); - if(count %1000000 === 0) { + if(count %100000 === 0) { var stop = new Date(); console.log(stop - start); start = stop; @@ -19,10 +18,4 @@ function sendPacket() { setImmediate(sendPacket); } -function counting() { - console.log(count); - count = 0; - setInterval(counting, 10000); -} - sendPacket(); From c8899664ef2de6d8bcf7fc65bbd7ae9a874fe633 Mon Sep 17 00:00:00 2001 From: Brian Deitte Date: Thu, 22 Oct 2015 10:02:08 -0400 Subject: [PATCH 4/5] Fix jshint --- lib/statsd.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/statsd.js b/lib/statsd.js index ce38d89e..b95b7e28 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -317,7 +317,7 @@ Client.prototype.enqueue = function(message){ if(this.buffer.length >= this.maxBufferSize) { this.flushQueue(); } -} +}; /** * @@ -325,7 +325,7 @@ Client.prototype.enqueue = function(message){ Client.prototype.flushQueue = function(){ this.sendMessage(this.buffer); this.buffer = ""; -} +}; /** * @@ -335,7 +335,7 @@ Client.prototype.flushQueue = function(){ Client.prototype.sendMessage = function(message, callback){ var buf = new Buffer(message); this.socket.send(buf, 0, buf.length, this.port, this.host, callback); -} +}; /** * @@ -344,7 +344,7 @@ Client.prototype.timeoutCallback = function(){ if(this.buffer !== "") { this.flushQueue(); } -} +}; /** * Close the underlying socket and stop listening for data on it. From 09689257130b9b37bba2b33c3bc9450cf4d6ac31 Mon Sep 17 00:00:00 2001 From: Brian Deitte Date: Thu, 22 Oct 2015 12:15:56 -0400 Subject: [PATCH 5/5] Clean up of comments, indentation, and name consistency --- CHANGES.md | 5 +++-- README.md | 4 +++- lib/statsd.js | 50 +++++++++++++++++++++++---------------------- test/test_statsd.js | 14 ++++++------- 4 files changed, 39 insertions(+), 34 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0684c5f8..eb8ca5f3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,8 @@ CHANGELOG ========= ## HEAD (Unreleased) -* +* Add options.maxBufferSize and optinons.bufferFlushInterval +* Change options.global_tags to options.globalTags for conistency -------------------- @@ -10,5 +11,5 @@ CHANGELOG * Thrown error when cacheDNS flag fails to resolve DNS name ## 1.0.1 (2015-09-24) -* Start from the base of https://github.com/sivy/node-statsd * Add the event API used by DogStatsD +* Start from the base of https://github.com/sivy/node-statsd \ No newline at end of file diff --git a/README.md b/README.md index ef19cebf..4f589655 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,9 @@ Parameters (specified as an options hash): * `globalize`: Expose this StatsD instance globally? `default: false` * `cacheDns`: Cache the initial dns lookup to *host* `default: false` * `mock`: Create a mock StatsD instance, sending no stats to the server? `default: false` -* `global_tags`: Optional tags that will be added to every metric `default: []` +* `globalTags`: Tags that will be added to every metric `default: []` +* `maxBufferSize`: If larger than 0, metrics will be buffered and only sent when the string length is greater than the size. `default: 0` +* `bufferFlushInterval`: If buffering is in use, this is the time in ms to always flush any buffered metrics. `default: 1000` All StatsD methods other than event have the same API: * `name`: Stat name `required` diff --git a/lib/statsd.js b/lib/statsd.js index b95b7e28..dfe1f8ee 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -11,12 +11,12 @@ var dgram = require('dgram'), * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. - * @option global_tags {Array=} Optional tags that will be added to every metric + * @option globalTags {Array=} Optional tags that will be added to every metric * @maxBufferSize {Number} An optional value for aggregating metrics to send, mainly for performance improvement * @bufferFlushInterval {Number} the time out value to flush out buffer if not * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, maxBufferSize, bufferFlushInterval) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, globalTags, maxBufferSize, bufferFlushInterval) { var options = host || {}, self = this; @@ -29,7 +29,7 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl globalize : globalize, cacheDns : cacheDns, mock : mock === true, - global_tags : global_tags, + globalTags : globalTags, maxBufferSize : maxBufferSize, bufferFlushInterval: bufferFlushInterval }; @@ -41,13 +41,13 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.suffix = options.suffix || ''; this.socket = dgram.createSocket('udp4'); this.mock = options.mock; - this.global_tags = options.global_tags || []; + this.globalTags = options.globalTags || []; this.maxBufferSize = options.maxBufferSize || 0; this.bufferFlushInterval = options.bufferFlushInterval || 1000; this.buffer = ""; if(this.maxBufferSize > 0) { - this.intervalHandle = setInterval(this.timeoutCallback.bind(this), this.bufferFlushInterval); + this.intervalHandle = setInterval(this.onBufferFlushInterval.bind(this), this.bufferFlushInterval); } if(options.cacheDns === true){ @@ -280,26 +280,26 @@ Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callb */ Client.prototype.send = function (message, tags, callback) { var buf, - merged_tags = []; + mergedTags = []; if(tags && Array.isArray(tags)){ - merged_tags = merged_tags.concat(tags); + mergedTags = mergedTags.concat(tags); } - if(this.global_tags && Array.isArray(this.global_tags)){ - merged_tags = merged_tags.concat(this.global_tags); + if(this.globalTags && Array.isArray(this.globalTags)){ + mergedTags = mergedTags.concat(this.globalTags); } - if(merged_tags.length > 0){ - message += '|#' + merged_tags.join(','); + if(mergedTags.length > 0){ + message += '|#' + mergedTags.join(','); } // Only send this stat if we're not a mock Client. if(!this.mock) { - if(this.maxBufferSize === 0) { - this.sendMessage(message, callback); - } - else { - this.enqueue(message); - } + if(this.maxBufferSize === 0) { + this.sendMessage(message, callback); + } + else { + this.enqueue(message); + } } else { if(typeof callback === 'function'){ @@ -309,18 +309,19 @@ Client.prototype.send = function (message, tags, callback) { }; /** + * Add the message to the buffer and flush the buffer if needed * - * @param message {String} + * @param message {String} The constructed message without tags */ Client.prototype.enqueue = function(message){ this.buffer += message + "\n"; if(this.buffer.length >= this.maxBufferSize) { - this.flushQueue(); + this.flushQueue(); } }; /** - * + * Flush the buffer, sending on the messages */ Client.prototype.flushQueue = function(){ this.sendMessage(this.buffer); @@ -328,9 +329,10 @@ Client.prototype.flushQueue = function(){ }; /** + * Send on the message through the socket * - * @param message {String} - * @param callback {Function} + * @param message {String} The constructed message without tags + * @param callback {Function=} Callback when message is done being delivered. Optional. */ Client.prototype.sendMessage = function(message, callback){ var buf = new Buffer(message); @@ -338,9 +340,9 @@ Client.prototype.sendMessage = function(message, callback){ }; /** - * + * Called every bufferFlushInterval to flush any buffer that is around */ -Client.prototype.timeoutCallback = function(){ +Client.prototype.onBufferFlushInterval = function() { if(this.buffer !== "") { this.flushQueue(); } diff --git a/test/test_statsd.js b/test/test_statsd.js index 733d84a3..33eea864 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -83,7 +83,7 @@ describe('StatsD', function(){ assert.equal(statsd.suffix, ''); assert.equal(global.statsd, undefined); assert.equal(statsd.mock, undefined); - assert.deepEqual(statsd.global_tags, []); + assert.deepEqual(statsd.globalTags, []); assert.ok(!statsd.mock); }); @@ -96,7 +96,7 @@ describe('StatsD', function(){ assert.equal(statsd.suffix, 'suffix'); assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); - assert.deepEqual(statsd.global_tags, ['gtag']); + assert.deepEqual(statsd.globalTags, ['gtag']); }); it('should set the proper values with options hash format', function(){ @@ -108,7 +108,7 @@ describe('StatsD', function(){ suffix: 'suffix', globalize: true, mock: true, - global_tags: ['gtag'] + globalTags: ['gtag'] }); assert.equal(statsd.host, 'host'); assert.equal(statsd.port, 1234); @@ -116,7 +116,7 @@ describe('StatsD', function(){ assert.equal(statsd.suffix, 'suffix'); assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); - assert.deepEqual(statsd.global_tags, ['gtag']); + assert.deepEqual(statsd.globalTags, ['gtag']); }); it('should attempt to cache a dns record if dnsCache is specified', function(done){ @@ -180,7 +180,7 @@ describe('StatsD', function(){ }); - describe('#global_tags', function(){ + describe('#globalTags', function(){ it('should not add global tags if they are not specified', function(finished){ udpTest(function(message, server){ assert.equal(message, 'test:1|c'); @@ -204,7 +204,7 @@ describe('StatsD', function(){ statsd = new StatsD({ host: address.address, port: address.port, - global_tags: ['gtag'] + globalTags: ['gtag'] }); statsd.increment('test'); @@ -221,7 +221,7 @@ describe('StatsD', function(){ statsd = new StatsD({ host: address.address, port: address.port, - global_tags: ['gtag'] + globalTags: ['gtag'] }); statsd.increment('test', 1337, ['foo']);