diff --git a/statsd-influxdb-backend-v0.12 b/statsd-influxdb-backend-v0.12 new file mode 100644 index 0000000..149f9f4 --- /dev/null +++ b/statsd-influxdb-backend-v0.12 @@ -0,0 +1,641 @@ +/* + * Flush stats to InfluxDB (http://influxdb.org/) + * + * To enable this backend, include 'statsd-influxdb-backend' in the backends + * configuration array: + * + * backends: ['statsd-influxdb-backend'] + * + * The backend will read the configuration options from the following + * 'influxdb' hash defined in the main statsd config file: + * + * influxdb: { + * host: '127.0.0.1', // InfluxDB host. (default 127.0.0.1) + * port: 8086, // InfluxDB port. (default 8086) + * version: 0.11, // influxdb version + * ssl: false, // InfluxDB is hosted over SSL. (default false) + * database: 'dbname', // InfluxDB database instance. (required) + * username: 'user', // InfluxDB database username. + * password: 'pass', // InfluxDB database password. + * retentionPolicy: 'default', // InfluxDB database Retention policy (default default). + * flush: { + * enable: true // Enable regular flush strategy. (default true) + * }, + * proxy: { + * enable: false, // Enable the proxy strategy. (default false) + * suffix: 'raw', // Metric name suffix. (default 'raw') + * flushInterval: 1000 // Flush interval for the internal buffer. + * // (default 1000) + * }, + * includeStatsdMetrics: false, // Send internal statsd metrics to InfluxDB. (default false) + * includeInfluxdbMetrics: false // Send internal backend metrics to InfluxDB. (default false) + * // Requires includeStatsdMetrics to be enabled. + * } + * + */ +var util = require('util'), + querystring = require('querystring'), + http = require('http'), + https = require('https'); + +function InfluxdbBackend(startupTime, config, events) { + var self = this; + + self.debug = config.debug; + self.registry = {}; + self.influxdbStats = {}; + + self.defaultHost = '127.0.0.1'; + self.defaultPort = 8086; + self.defaultVersion = 0.11; + self.defaultFlushEnable = true; + self.defaultProxyEnable = false; + self.defaultProxySuffix = 'raw'; + self.defaultProxyFlushInterval = 1000; + + self.defaultRetentionPolicy='default';//add retention Policy + + self.host = self.defaultHost; + self.port = self.defaultPort; + self.version = self.defaultVersion; + self.protocol = http; + + self.flushEnable = self.defaultFlushEnable; + self.proxyEnable = self.defaultProxyEnable; + self.proxySuffix = self.defaultProxySuffix; + self.proxyFlushInterval = self.defaultProxyFlushInterval; + self.includeStatsdMetrics = false; + self.includeInfluxdbMetrics = false; + + /* XXX Hardcoding default prefix here because it is not accessible otherwise. */ + self.prefixStats = config.prefixStats !== undefined ? config.prefixStats : 'statsd'; + + if (config.influxdb) { + self.host = config.influxdb.host || self.defaultHost; + self.port = config.influxdb.port || self.defaultPort; + self.version = config.influxdb.version || self.defaultVersion; + self.retentionPolicy=config.influxdb.retentionPolicy || self.defaultRetentionPolicy; + self.user = config.influxdb.username; + self.pass = config.influxdb.password; + self.database = config.influxdb.database; + self.includeStatsdMetrics = config.influxdb.includeStatsdMetrics; + self.includeInfluxdbMetrics = config.influxdb.includeInfluxdbMetrics; + + if (config.influxdb.ssl) { + self.protocol = https; + } + + if (config.influxdb.flush) { + self.flushEnable = config.influxdb.flush.enable; + } + + if (config.influxdb.proxy) { + self.proxyEnable = config.influxdb.proxy.enable || self.defaultProxyEnable; + self.proxySuffix = config.influxdb.proxy.suffix || self.defaultProxySuffix; + self.proxyFlushInterval = config.influxdb.proxy.flushInterval || self.defaultProxyFlushInterval; + } + } +/* + if (self.version >= 0.9) { + self.assembleEvent = self.assembleEvent_v09; + self.httpPOST = self.httpPOST_v09; + } else { + self.assembleEvent = self.assembleEvent_v08; + self.httpPOST = self.httpPOST_v08; + } +*/ + //upgrade to 0.11 which will remove json protocal + if (self.version >= 0.11) { + self.assembleEvent = self.assembleEvent_v11; + self.httpPOST = self.httpPOST_v11; + } + else if(self.version >= 0.9|| self.version< 0.11){ + self.assembleEvent = self.assembleEvent_v09; + self.httpPOST = self.httpPOST_v09; + } + else { + self.assembleEvent = self.assembleEvent_v08; + self.httpPOST = self.httpPOST_v08; + } + + + if (self.proxyEnable) { + self.log('Starting the buffer flush interval. (every ' + self.proxyFlushInterval + 'ms)'); + setInterval(function () { + self.flushQueue(); + }, self.proxyFlushInterval); + + events.on('packet', function (packet, rinfo) { + try { + self.processPacket(packet, rinfo); + } catch (e) { + self.log(e); + } + }); + } + + if (self.flushEnable) { + events.on('flush', function (timestamp, metrics) { + try { + self.processFlush(timestamp, metrics); + } catch (e) { + self.log(e); + } + }); + } + + events.on('status', function (writeCb) { + for (var stat in self.influxdbStats) { + writeCb(null, 'influxdb', stat, self.influxdbStats[stat]); + } + }); + + return true; +} // end of InfluxdbBackend + +function millisecondsSince(start) { + diff = process.hrtime(start); + return diff[0] * 1000 + diff[1] / 1000000; +} + +InfluxdbBackend.prototype.log = function (msg) { + util.log('[influxdb] ' + msg); +} + +InfluxdbBackend.prototype.logDebug = function (msg) { + if (this.debug) { + var string; + + if (msg instanceof Function) { + string = msg(); + } else { + string = msg; + } + + util.log('[influxdb] (DEBUG) ' + string); + } +} + +/** + * Flush strategy handler + * + * @param {Number} timestamp + * @param {Object} stats metric + */ +InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) { + var self = this, + counters = metrics.counters, + gauges = metrics.gauges, + timerData = metrics.timer_data, + statsdMetrics = metrics.statsd_metrics, + points = [], + sets = function (vals) { + var ret = {}; + for (var val in vals) { + ret[val] = vals[val].values(); + } + return ret; + }(metrics.sets), + startTime = process.hrtime(), + key, timerKey, + statsPrefixRegexp = new RegExp('^' + self.prefixStats + '\\.'); + + /* Convert timestamp from seconds to milliseconds. */ + timestamp = (timestamp * 1000); + + for (key in counters) { + /* Do not include statsd counters. */ + if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; } + + var value = counters[key], + k = key + '.counter'; + + if (value) { + points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + } + } + + for (set in sets) { + sets[set].map(function (v) { + points.push(self.assembleEvent(set, [{value: v, time: timestamp}])); + }) + points.push(self.assembleEvent(set + "_count", [{value: sets[set].length, time: timestamp}])); + } + + for (key in gauges) { + /* Do not include statsd gauges. */ + if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; } + + var value = gauges[key], + k = key + '.gauge'; + + if (!isNaN(parseFloat(value)) && isFinite(value)) { + points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + } + } + + for (key in timerData) { + var timerMetrics = timerData[key]; + + // Try to add histogram data, if it is there: + if (timerMetrics.histogram) { + var histoMetrics = timerMetrics.histogram + , histoKey; + + for (histoKey in histoMetrics) { + var value = histoMetrics[histoKey], + k = key + '.timer.histogram.' + histoKey; + + points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + } + + // Delete here so it isn't iterated over later: + delete timerMetrics.histogram; + } + + // Iterate over normal metrics: + for (timerKey in timerMetrics) { + var value = timerMetrics[timerKey], + k = key + '.timer' + '.' + timerKey; + + points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + } + } + + if (self.includeStatsdMetrics) { + // Include backend metrics for the previous flush + if (self.includeInfluxdbMetrics) { + statsdMetrics['influxdbStats.flush_time'] = self.influxdbStats.flushTime; + statsdMetrics['influxdbStats.http_response_time'] = self.influxdbStats.httpResponseTime; + statsdMetrics['influxdbStats.payload_size'] = self.influxdbStats.payloadSize; + statsdMetrics['influxdbStats.num_stats'] = self.influxdbStats.numStats; + } + + for (key in statsdMetrics) { + var value = statsdMetrics[key], + k = self.prefixStats + '.' + key; + + if (!isNaN(parseFloat(value)) && isFinite(value)) { + points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + } + } + } + + self.httpPOST(points); + self.influxdbStats.flushTime = millisecondsSince(startTime); +} + +InfluxdbBackend.prototype.processPacket = function (packet, rinfo) { + var self = this, + ts = (new Date()).valueOf(); + + /* Stolen from statsd's stats.js. */ + var packet_data = packet.toString(), + metrics; + + if (packet_data.indexOf("\n") > -1) { + metrics = packet_data.split("\n"); + } else { + metrics = [packet_data]; + } + + for (var midx in metrics) { + if (metrics[midx].length === 0) { + continue; + } + var bits = metrics[midx].toString().split(':'); + var key = bits.shift() + .replace(/\s+/g, '_') + .replace(/\//g, '-') + .replace(/[^a-zA-Z_\-0-9\.]/g, ''); + + if (bits.length === 0) { + bits.push("1"); + } + + for (var i = 0; i < bits.length; i++) { + var fields = bits[i].split("|"); + + if (fields[1] === undefined) { + self.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"'); + continue; + } + + var metric_type = fields[1].trim(); + + /* Timer */ + if (metric_type === "ms") { + self.enqueue('timer', ts, key, Number(fields[0] || 0)); + /* Gauge */ + } else if (metric_type === "g") { + if (fields[0].match(/^[-+]/)) { + self.logDebug('Sending gauges with +/- is not supported yet.'); + } else { + self.enqueue('gauge', ts, key, Number(fields[0] || 0)); + } + /* Set */ + } else if (metric_type === "s") { + self.logDebug('Sets not supported yet.'); + /* Counter */ + } else { + /* XXX Handle sampling. */ + self.enqueue('counter', ts, key, Number(fields[0] || 1)); + } + } + } +} + +InfluxdbBackend.prototype.enqueue = function (type, ts, key, value) { + var self = this; + + key = key + '.' + type + '.' + self.proxySuffix; + + if (!self.registry[key]) { + self.registry[key] = []; + } + + self.registry[key].push({value: value, time: ts}); +} + +InfluxdbBackend.prototype.flushQueue = function () { + var self = this, + registry = self.clearRegistry(), + points = []; + + for (var key in registry) { + var payload = self.assembleEvent(key, registry[key]); + + self.logDebug(function () { + return 'Flush ' + registry[key].length + ' values for ' + key; + }); + + points.push(payload); + } + + self.httpPOST(points); + + self.logDebug('Queue flushed'); +} + + +InfluxdbBackend.prototype.clearRegistry = function () { + var self = this, + registry = self.registry; + + self.registry = {}; + + return registry; +} + +InfluxdbBackend.prototype.assembleEvent_v08 = function (name, events) { + var self = this; + + var payload = { + name: name, + columns: Object.keys(events[0]), + points: [] + }; + + for (var idx in events) { + var event = events[idx], + points = []; + + for (var cidx in payload.columns) { + var column = payload.columns[cidx]; + + points.push(event[column]); + } + + payload.points.push(points); + } + + return payload; +} + +InfluxdbBackend.prototype.assembleEvent_v09 = function (name, events) { + var self = this; + + var payload = { + measurement: name, + fields: { value: events[0]['value'] } + } + + return payload; +} + +InfluxdbBackend.prototype.httpPOST_v08 = function (points) { + /* Do not send if there are no points. */ + if (!points.length) { return; } + + var self = this, + query = {u: self.user, p: self.pass, time_precision: 'ms'}, + protocolName = self.protocol == http ? 'HTTP' : 'HTTPS', + startTime; + + self.logDebug(function () { + return 'Sending ' + points.length + ' different points via ' + protocolName; + }); + + self.influxdbStats.numStats = points.length; + + var options = { + hostname: self.host, + port: self.port, + path: '/db/' + self.database + '/series?' + querystring.stringify(query), + method: 'POST', + agent: false // Is it okay to use "undefined" here? (keep-alive) + }; + + var req = self.protocol.request(options); + + req.on('socket', function (res) { + startTime = process.hrtime(); + }); + + req.on('response', function (res) { + var status = res.statusCode; + + self.influxdbStats.httpResponseTime = millisecondsSince(startTime); + + if (status !== 200) { + self.log(protocolName + ' Error: ' + status); + } + }); + + req.on('error', function (e, i) { + self.log(e); + }); + + var payload = JSON.stringify(points) + self.influxdbStats.payloadSize = Buffer.byteLength(payload); + + self.logDebug(function () { + var size = (self.influxdbStats.payloadSize / 1024).toFixed(2); + return 'Payload size ' + size + ' KB'; + }); + + req.write(payload); + req.end(); +} + +InfluxdbBackend.prototype.httpPOST_v09 = function (points) { + /* Do not send if there are no points. */ + if (!points.length) { return; } + + var self = this, + query = {u: self.user, p: self.pass}, + protocolName = self.protocol == http ? 'HTTP' : 'HTTPS', + startTime; + + self.logDebug(function () { + return 'Sending ' + points.length + ' different points via ' + protocolName; + }); + + self.influxdbStats.numStats = points.length; + + var options = { + hostname: self.host, + port: self.port, + path: '/write?' + querystring.stringify(query), + method: 'POST', + agent: false // Is it okay to use "undefined" here? (keep-alive) + }; + + var req = self.protocol.request(options); + + req.on('socket', function (res) { + startTime = process.hrtime(); + }); + + req.on('response', function (res) { + var status = res.statusCode; + + self.influxdbStats.httpResponseTime = millisecondsSince(startTime); + + if (status >= 400) { + self.log(protocolName + ' Error: ' + status); + } + }); + + req.on('error', function (e, i) { + self.log(e); + }); + + var payload = JSON.stringify({ + database: self.database, + points: points + }); + + self.influxdbStats.payloadSize = Buffer.byteLength(payload); + + self.logDebug(function () { + var size = (self.influxdbStats.payloadSize / 1024).toFixed(2); + return 'Payload size ' + size + ' KB'; + }); + + req.write(payload); + req.end(); +} //end of InfluxdbBackend.prototype.httpPOST_v09 + + +InfluxdbBackend.prototype.assembleEvent_v11 = function (name, events) { + var self = this; + +// var payload = { +// measurement: name, +// fields: { value: events[0]['value'] } +// } + var payload=name+" value="+events[0]['value']+" "+events[0]['time']*1000000; + return payload; +} +/*** + * @ Vita Zhao add new version httpPOST of version 0.11 with http call + */ +InfluxdbBackend.prototype.httpPOST_v11 = function (points) { + /* Do not send if there are no points. */ + if (!points.length) { return; } + + var self = this, + query = {u: self.user, p: self.pass}, + protocolName = self.protocol == http ? 'HTTP' : 'HTTPS', + startTime; + + self.logDebug(function () { + return 'Sending ' + points.length + ' different points via ' + protocolName; + }); + + self.influxdbStats.numStats = points.length; + + var options = { + hostname: self.host, + port: self.port, + //path: '/write?' + querystring.stringify(query),//TODO:/write?db=dbname + path: '/write?db=' + self.database+'&rp='+self.retentionPolicy,//TODO:/write?db=dbname + method: 'POST', + headers:{ +// 'content-type': 'binary' + 'content-type': 'x-www-form-urlencoded' + }, + agent: false // Is it okay to use "undefined" here? (keep-alive) + }; + + var req = self.protocol.request(options); + + req.on('socket', function (res) { + startTime = process.hrtime(); + }); + + req.on('response', function (res) { + var status = res.statusCode; + + self.influxdbStats.httpResponseTime = millisecondsSince(startTime); + + if (status >= 400) { + self.log(protocolName + ' Error: ' + status); + } + }); + + req.on('error', function (e, i) { + self.log(e); + }); + +// var payload = JSON.stringify({ +// database: self.database, +// points: points +// }); + + var payload=points[0]; + for(var t=1;t