Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ CHANGELOG
=========

## HEAD (Unreleased)
*
* Add options.maxBufferSize and optinons.bufferFlushInterval
* Change options.global_tags to options.globalTags for conistency

--------------------

## 1.0.2 (2015-09-25)
* Thrown error when cacheDNS flag fails to resolve DNS name

## 1.0.1 (2015-09-24)
* Start from the base of https://github.com/sivy/node-statsd
* Add the event API used by DogStatsD
* Start from the base of https://github.com/sivy/node-statsd
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ Parameters (specified as an options hash):
* `globalize`: Expose this StatsD instance globally? `default: false`
* `cacheDns`: Cache the initial dns lookup to *host* `default: false`
* `mock`: Create a mock StatsD instance, sending no stats to the server? `default: false`
* `global_tags`: Optional tags that will be added to every metric `default: []`
* `globalTags`: Tags that will be added to every metric `default: []`
* `maxBufferSize`: If larger than 0, metrics will be buffered and only sent when the string length is greater than the size. `default: 0`
* `bufferFlushInterval`: If buffering is in use, this is the time in ms to always flush any buffered metrics. `default: 1000`

All StatsD methods other than event have the same API:
* `name`: Stat name `required`
Expand Down
91 changes: 76 additions & 15 deletions lib/statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ var dgram = require('dgram'),
* @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace
* @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once
* @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent.
* @option global_tags {Array=} Optional tags that will be added to every metric
* @option globalTags {Array=} Optional tags that will be added to every metric
* @maxBufferSize {Number} An optional value for aggregating metrics to send, mainly for performance improvement
* @bufferFlushInterval {Number} the time out value to flush out buffer if not
* @constructor
*/
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) {
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, globalTags, maxBufferSize, bufferFlushInterval) {
var options = host || {},
self = this;

Expand All @@ -27,7 +29,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
globalize : globalize,
cacheDns : cacheDns,
mock : mock === true,
global_tags : global_tags
globalTags : globalTags,
maxBufferSize : maxBufferSize,
bufferFlushInterval: bufferFlushInterval
};
}

Expand All @@ -37,7 +41,14 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
this.suffix = options.suffix || '';
this.socket = dgram.createSocket('udp4');
this.mock = options.mock;
this.global_tags = options.global_tags || [];
this.globalTags = options.globalTags || [];
this.maxBufferSize = options.maxBufferSize || 0;
this.bufferFlushInterval = options.bufferFlushInterval || 1000;
this.buffer = "";

if(this.maxBufferSize > 0) {
this.intervalHandle = setInterval(this.onBufferFlushInterval.bind(this), this.bufferFlushInterval);
}

if(options.cacheDns === true){
dns.lookup(options.host, function(err, address, family){
Expand Down Expand Up @@ -247,7 +258,8 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac
* @param callback {Function=} Callback when message is done being delivered. Optional.
*/
Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callback) {
var message = this.prefix + stat + this.suffix + ':' + value + '|' + type;
var message = this.prefix + stat + this.suffix + ':' + value + '|' + type,
buf;

if(sampleRate && sampleRate < 1){
if(Math.random() < sampleRate){
Expand All @@ -268,35 +280,84 @@ Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callb
*/
Client.prototype.send = function (message, tags, callback) {
var buf,
merged_tags = [];
mergedTags = [];

if(tags && Array.isArray(tags)){
merged_tags = merged_tags.concat(tags);
mergedTags = mergedTags.concat(tags);
}
if(this.global_tags && Array.isArray(this.global_tags)){
merged_tags = merged_tags.concat(this.global_tags);
if(this.globalTags && Array.isArray(this.globalTags)){
mergedTags = mergedTags.concat(this.globalTags);
}
if(merged_tags.length > 0){
message += '|#' + merged_tags.join(',');
if(mergedTags.length > 0){
message += '|#' + mergedTags.join(',');
}

// Only send this stat if we're not a mock Client.
if(!this.mock) {
buf = new Buffer(message);
this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
} else {
if(this.maxBufferSize === 0) {
this.sendMessage(message, callback);
}
else {
this.enqueue(message);
}
}
else {
if(typeof callback === 'function'){
callback(null, 0);
}
}
};

/**
* Add the message to the buffer and flush the buffer if needed
*
* @param message {String} The constructed message without tags
*/
Client.prototype.enqueue = function(message){
this.buffer += message + "\n";
if(this.buffer.length >= this.maxBufferSize) {
this.flushQueue();
}
};

/**
* Flush the buffer, sending on the messages
*/
Client.prototype.flushQueue = function(){
this.sendMessage(this.buffer);
this.buffer = "";
};

/**
* Send on the message through the socket
*
* @param message {String} The constructed message without tags
* @param callback {Function=} Callback when message is done being delivered. Optional.
*/
Client.prototype.sendMessage = function(message, callback){
var buf = new Buffer(message);
this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
};

/**
* Called every bufferFlushInterval to flush any buffer that is around
*/
Client.prototype.onBufferFlushInterval = function() {
if(this.buffer !== "") {
this.flushQueue();
}
};

/**
* Close the underlying socket and stop listening for data on it.
*/
Client.prototype.close = function(){
this.socket.close();
if(this.intervalHandle) {
clearInterval(this.intervalHandle);
}
this.socket.close();
};

exports = module.exports = Client;
exports.StatsD = Client;

21 changes: 21 additions & 0 deletions perfTest/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
var statsD = require('../lib/statsd');
var count = 0;
var options = {
maxBufferSize: process.argv[2]
};
var statsd = new statsD(options);

var start = new Date();

function sendPacket() {
count++;
statsd.increment('abc.cde.efg.ghk.klm', 1);
if(count %100000 === 0) {
var stop = new Date();
console.log(stop - start);
start = stop;
}
setImmediate(sendPacket);
}

sendPacket();
88 changes: 80 additions & 8 deletions test/test_statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe('StatsD', function(){
assert.equal(statsd.suffix, '');
assert.equal(global.statsd, undefined);
assert.equal(statsd.mock, undefined);
assert.deepEqual(statsd.global_tags, []);
assert.deepEqual(statsd.globalTags, []);
assert.ok(!statsd.mock);
});

Expand All @@ -96,7 +96,7 @@ describe('StatsD', function(){
assert.equal(statsd.suffix, 'suffix');
assert.equal(statsd, global.statsd);
assert.equal(statsd.mock, true);
assert.deepEqual(statsd.global_tags, ['gtag']);
assert.deepEqual(statsd.globalTags, ['gtag']);
});

it('should set the proper values with options hash format', function(){
Expand All @@ -108,15 +108,15 @@ describe('StatsD', function(){
suffix: 'suffix',
globalize: true,
mock: true,
global_tags: ['gtag']
globalTags: ['gtag']
});
assert.equal(statsd.host, 'host');
assert.equal(statsd.port, 1234);
assert.equal(statsd.prefix, 'prefix');
assert.equal(statsd.suffix, 'suffix');
assert.equal(statsd, global.statsd);
assert.equal(statsd.mock, true);
assert.deepEqual(statsd.global_tags, ['gtag']);
assert.deepEqual(statsd.globalTags, ['gtag']);
});

it('should attempt to cache a dns record if dnsCache is specified', function(done){
Expand Down Expand Up @@ -180,7 +180,7 @@ describe('StatsD', function(){

});

describe('#global_tags', function(){
describe('#globalTags', function(){
it('should not add global tags if they are not specified', function(finished){
udpTest(function(message, server){
assert.equal(message, 'test:1|c');
Expand All @@ -204,7 +204,7 @@ describe('StatsD', function(){
statsd = new StatsD({
host: address.address,
port: address.port,
global_tags: ['gtag']
globalTags: ['gtag']
});

statsd.increment('test');
Expand All @@ -221,7 +221,7 @@ describe('StatsD', function(){
statsd = new StatsD({
host: address.address,
port: address.port,
global_tags: ['gtag']
globalTags: ['gtag']
});

statsd.increment('test', 1337, ['foo']);
Expand Down Expand Up @@ -679,7 +679,6 @@ describe('StatsD', function(){
assertMockClientMethod('set', finished);
});
});

describe('#event', function(finished) {
it('should send proper event format for title and text', function (finished) {
udpTest(function (message, server) {
Expand Down Expand Up @@ -766,5 +765,78 @@ describe('StatsD', function(){
it('should send no event stat when a mock Client is used', function(finished){
assertMockClientMethod('event', finished);
});
});
describe('buffer', function() {
it('should aggregate packets when maxBufferSize is set to non-zero', function (finished) {
udpTest(function (message, server) {
assert.equal(message, 'a:1|c\nb:2|c\n');
server.close();
finished();
}, function (server) {
var address = server.address();
var options = {
host: address.host,
port: address.port,
maxBufferSize: 8
};
var statsd = new StatsD(options);

statsd.increment('a', 1);
statsd.increment('b', 2);
});
});

it('should not aggregate packets when maxBufferSize is set to zero', function (finished) {
var results = [
'a:1|c',
'b:2|c'
];
var msgCount = 0;
udpTest(function (message, server) {
var index = results.indexOf(message);
assert.equal(index >= 0, true);
results.splice(index, 1);
msgCount++;
if (msgCount >= 2) {
assert.equal(results.length, 0);
server.close();
finished();
}
}, function (server) {
var address = server.address();
var options = {
host: address.host,
port: address.port,
maxBufferSize: 0
};
var statsd = new StatsD(options);

statsd.increment('a', 1);
statsd.increment('b', 2);
});
});

it('should flush the buffer when timeout value elapsed', function (finished) {
var timestamp;
udpTest(function (message, server) {
assert.equal(message, 'a:1|c\n');
var elapsed = Date.now() - timestamp;
assert.equal(elapsed > 1000, true);
server.close();
finished();
}, function (server) {
var address = server.address();
var options = {
host: address.host,
port: address.port,
maxBufferSize: 1220,
bufferFlushInterval: 1100
};
var statsd = new StatsD(options);

timestamp = new Date();
statsd.increment('a', 1);
});
});
});
});