Skip to content

Commit 2a55663

Browse files
authored
Merge pull request #5 from nsone/sample
Add sampling concept and ability to limit rate of "deep" sampling
2 parents e6f2443 + 16255d4 commit 2a55663

File tree

13 files changed

+940
-60
lines changed

13 files changed

+940
-60
lines changed

3rd/rng/randutils.hpp

Lines changed: 810 additions & 0 deletions
Large diffs are not rendered by default.

src/main.cpp

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
static const char USAGE[] =
2626
R"(pktvisord.
2727
Usage:
28-
pktvisord [-b BPF] [-p PORT] [-H HOSTSPEC] [--periods P] [--summary] [--geo-city FILE] [--geo-asn FILE] TARGET
28+
pktvisord [-b BPF] [-p PORT] [-H HOSTSPEC] [--periods P] [--summary] [--geo-city FILE] [--geo-asn FILE]
29+
[--max-deep-sample N]
30+
TARGET
2931
pktvisord (-h | --help)
3032
pktvisord --version
3133
@@ -34,18 +36,19 @@ static const char USAGE[] =
3436
TARGET is either a network interface, an IP address (4 or 6) or a pcap file (ending in .pcap or .cap)
3537
3638
Options:
37-
-p PORT Run metrics webserver on the given localhost port [default: 10853]
38-
-b BPF Filter packets using the given BPF string
39-
--geo-city FILE GeoLite2 City database to use for IP to Geo mapping (if enabled)
40-
--geo-asn FILE GeoLite2 ASN database to use for IP to ASN mapping (if enabled)
41-
--periods P Hold this many 60 second time periods of history in memory [default: 5]
42-
--summary Instead of a time window with P periods, summarize all packets into one bucket for entire time period.
43-
Useful for executive summary of (and applicable only to) a pcap file. [default: false]
44-
-H HOSTSPEC Specify subnets (comma separated) to consider HOST, in CIDR form. In live capture this /may/ be detected automatically
45-
from capture device but /must/ be specified for pcaps. Example: "10.0.1.0/24,10.0.2.1/32,2001:db8::/64"
46-
Specifying this for live capture will append to any automatic detection.
47-
-h --help Show this screen
48-
--version Show version
39+
-p PORT Run metrics webserver on the given localhost port [default: 10853]
40+
-b BPF Filter packets using the given BPF string
41+
--geo-city FILE GeoLite2 City database to use for IP to Geo mapping (if enabled)
42+
--geo-asn FILE GeoLite2 ASN database to use for IP to ASN mapping (if enabled)
43+
--max-deep-sample N Never deep sample more than N% of packets (an int between 0 and 100) [default: 100]
44+
--periods P Hold this many 60 second time periods of history in memory [default: 5]
45+
--summary Instead of a time window with P periods, summarize all packets into one bucket for entire time period.
46+
Useful for executive summary of (and applicable only to) a pcap file. [default: false]
47+
-H HOSTSPEC Specify subnets (comma separated) to consider HOST, in CIDR form. In live capture this /may/ be detected automatically
48+
from capture device but /must/ be specified for pcaps. Example: "10.0.1.0/24,10.0.2.1/32,2001:db8::/64"
49+
Specifying this for live capture will append to any automatic detection.
50+
-h --help Show this screen
51+
--version Show version
4952
)";
5053

5154
static std::unique_ptr<pktvisor::MetricsMgr> metricsManager;
@@ -81,7 +84,7 @@ static void onGotTcpDnsMessage(pcpp::DnsLayer *dnsLayer, pktvisor::Direction dir
8184
*/
8285
static void onApplicationInterrupted(void *cookie)
8386
{
84-
std::cout << "stopping..." << std::endl;
87+
std::cerr << "stopping..." << std::endl;
8588
devCookie *dC = (devCookie *)cookie;
8689
dC->second = true;
8790
}
@@ -209,7 +212,7 @@ void openIface(pcpp::PcapLiveDevice *dev, pktvisor::TcpDnsReassembly &tcpReassem
209212
if (bpfFilter != "") {
210213
if (!dev->setFilter(bpfFilter))
211214
throw std::runtime_error("Cannot set BPF filter to interface");
212-
std::cout << "BPF: " << bpfFilter << std::endl;
215+
std::cerr << "BPF: " << bpfFilter << std::endl;
213216
}
214217

215218
printf("Starting packet capture on '%s'...\n", dev->getName());
@@ -388,11 +391,18 @@ int main(int argc, char *argv[])
388391

389392
pktvisor::TcpDnsReassembly tcpDnsReassembly(onGotTcpDnsMessage);
390393
int result = 0;
394+
int sampleRate = 100;
395+
if (args["--max-sample"]) {
396+
sampleRate = (int)args["--max-sample"].asLong();
397+
if (sampleRate != 100) {
398+
std::cerr << "Using maximum deep sample rate: " << sampleRate << "%" << std::endl;
399+
}
400+
}
391401

392402
if ((args["TARGET"].asString().rfind(".pcap") != std::string::npos) || (args["TARGET"].asString().rfind(".cap") != std::string::npos)) {
393403
showHosts();
394404
try {
395-
metricsManager = std::make_unique<pktvisor::MetricsMgr>(args["--summary"].asBool());
405+
metricsManager = std::make_unique<pktvisor::MetricsMgr>(args["--summary"].asBool(), 5, sampleRate);
396406
handleGeo(args["--geo-city"], args["--geo-asn"]);
397407
openPcap(args["TARGET"].asString(), tcpDnsReassembly, bpf);
398408
if (args["--summary"].asBool()) {
@@ -408,7 +418,7 @@ int main(int argc, char *argv[])
408418
return -1;
409419
}
410420
} else {
411-
metricsManager = std::make_unique<pktvisor::MetricsMgr>(false, periods);
421+
metricsManager = std::make_unique<pktvisor::MetricsMgr>(false, periods, sampleRate);
412422
handleGeo(args["--geo-city"], args["--geo-asn"]);
413423
pcpp::PcapLiveDevice *dev(nullptr);
414424
// extract pcap live device by interface name or IP address
@@ -438,7 +448,7 @@ int main(int argc, char *argv[])
438448
svr.listen("localhost", port);
439449
});
440450
try {
441-
std::cout << "Interface " << dev->getName() << std::endl;
451+
std::cerr << "Interface " << dev->getName() << std::endl;
442452
getHostsFromIface(dev);
443453
showHosts();
444454
openIface(dev, tcpDnsReassembly, bpf);

src/metrics.cpp

Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
namespace pktvisor {
1717

18-
Metrics::Metrics()
18+
Metrics::Metrics(MetricsMgr& mmgr) : _mmgr(mmgr)
1919
{
2020
gettimeofday(&_bucketTS, nullptr);
2121

@@ -28,6 +28,8 @@ Metrics::Metrics()
2828
void Metrics::merge(Metrics &other)
2929
{
3030

31+
_numSamples += other._numSamples;
32+
3133
_numPackets += other._numPackets;
3234
_numPackets_UDP += other._numPackets_UDP;
3335
_numPackets_TCP += other._numPackets_TCP;
@@ -104,13 +106,9 @@ void Metrics::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::ProtocolTyp
104106
_DNS_TCP++;
105107
}
106108

107-
// lock for write
108-
std::unique_lock lock(_sketchMutex);
109-
110109
// only count response codes on responses (not queries)
111110
if (dns->getDnsHeader()->queryOrResponse == response) {
112111
_DNS_replies++;
113-
_sketches->_dns_topRCode.update(dns->getDnsHeader()->responseCode);
114112
switch (dns->getDnsHeader()->responseCode) {
115113
case 0:
116114
_DNS_NOERROR++;
@@ -129,6 +127,18 @@ void Metrics::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::ProtocolTyp
129127
_DNS_queries++;
130128
}
131129

130+
// sampler
131+
if (!_mmgr.shouldDeepSample()) {
132+
return;
133+
}
134+
135+
// lock for write
136+
std::unique_lock lock(_sketchMutex);
137+
138+
if (dns->getDnsHeader()->queryOrResponse == response) {
139+
_sketches->_dns_topRCode.update(dns->getDnsHeader()->responseCode);
140+
}
141+
132142
auto query = dns->getFirstQuery();
133143
if (query) {
134144

@@ -161,42 +171,57 @@ void Metrics::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::ProtocolTyp
161171

162172
void Metrics::newDNSXact(pcpp::DnsLayer *dns, Direction dir, DnsTransaction xact)
163173
{
164-
// lock for write
165-
std::unique_lock lock(_sketchMutex);
174+
175+
// sampler
176+
bool chosen = _mmgr.shouldDeepSample();
166177

167178
_DNS_xacts_total++;
179+
168180
uint64_t xactTime = (xact.totalTS.tv_sec * 1000000) + xact.totalTS.tv_usec; // microseconds
169181
// dir is the direction of the last packet, meaning the reply so from a transaction perspective
170182
// we look at it from the direction of the query, so the opposite side than we have here
171183
float to90th = 0.0;
172184
float from90th = 0.0;
173185
uint64_t sample_threshold = 10;
186+
187+
if (chosen) {
188+
// lock for write
189+
std::unique_lock lock(_sketchMutex);
190+
}
191+
174192
if (dir == toHost) {
175193
_DNS_xacts_out++;
176-
_sketches->_dnsXactFromTimeUs.update(xactTime);
177-
// wait for N samples
178-
if (_sketches->_dnsXactFromTimeUs.get_n() > sample_threshold) {
179-
from90th = _sketches->_dnsXactFromTimeUs.get_quantile(0.90);
194+
if (chosen) {
195+
_sketches->_dnsXactFromTimeUs.update(xactTime);
196+
// wait for N samples
197+
if (_sketches->_dnsXactFromTimeUs.get_n() > sample_threshold) {
198+
from90th = _sketches->_dnsXactFromTimeUs.get_quantile(0.90);
199+
}
180200
}
181201
} else if (dir == fromHost) {
182202
_DNS_xacts_in++;
183-
_sketches->_dnsXactToTimeUs.update(xactTime);
184-
// wait for N samples
185-
if (_sketches->_dnsXactToTimeUs.get_n() > sample_threshold) {
186-
to90th = _sketches->_dnsXactToTimeUs.get_quantile(0.90);
203+
if (chosen) {
204+
_sketches->_dnsXactToTimeUs.update(xactTime);
205+
// wait for N samples
206+
if (_sketches->_dnsXactToTimeUs.get_n() > sample_threshold) {
207+
to90th = _sketches->_dnsXactToTimeUs.get_quantile(0.90);
208+
}
187209
}
188210
}
189211

190-
auto query = dns->getFirstQuery();
191-
if (query) {
192-
auto name = query->getName();
193-
// see comment in MetricsMgr::newDNSxact on direction and why "toHost" is used with "from"
194-
if (dir == toHost && from90th > 0 && xactTime >= from90th) {
195-
_sketches->_dns_slowXactOut.update(name);
196-
} else if (dir == fromHost && to90th > 0 && xactTime >= to90th) {
197-
_sketches->_dns_slowXactIn.update(name);
212+
if (chosen) {
213+
auto query = dns->getFirstQuery();
214+
if (query) {
215+
auto name = query->getName();
216+
// see comment in MetricsMgr::newDNSxact on direction and why "toHost" is used with "from"
217+
if (dir == toHost && from90th > 0 && xactTime >= from90th) {
218+
_sketches->_dns_slowXactOut.update(name);
219+
} else if (dir == fromHost && to90th > 0 && xactTime >= to90th) {
220+
_sketches->_dns_slowXactIn.update(name);
221+
}
198222
}
199223
}
224+
200225
}
201226

202227
void MetricsMgr::newDNSXact(pcpp::DnsLayer *dns, Direction dir, DnsTransaction xact)
@@ -209,10 +234,13 @@ void MetricsMgr::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::Protocol
209234
_metrics.back()->newDNSPacket(dns, dir, l3, l4);
210235
}
211236

212-
void Metrics::newPacket(MetricsMgr &mmgr, const pcpp::Packet &packet, pcpp::ProtocolType l3, pcpp::ProtocolType l4, Direction dir)
237+
void Metrics::newPacket(const pcpp::Packet &packet, pcpp::ProtocolType l3, pcpp::ProtocolType l4, Direction dir)
213238
{
214239

215240
_numPackets++;
241+
if (_mmgr.shouldDeepSample()) {
242+
_numSamples++;
243+
}
216244

217245
switch (dir) {
218246
case fromHost:
@@ -244,12 +272,17 @@ void Metrics::newPacket(MetricsMgr &mmgr, const pcpp::Packet &packet, pcpp::Prot
244272
break;
245273
}
246274

275+
// sampler
276+
if (!_mmgr.shouldDeepSample()) {
277+
return;
278+
}
279+
247280
// lock for write
248281
std::unique_lock lock(_sketchMutex);
249282

250283
#ifdef MMDB_ENABLE
251-
const GeoDB* geoCityDB = mmgr.getGeoCityDB();
252-
const GeoDB* geoASNDB = mmgr.getGeoASNDB();
284+
const GeoDB* geoCityDB = _mmgr.getGeoCityDB();
285+
const GeoDB* geoASNDB = _mmgr.getGeoASNDB();
253286
#endif
254287

255288
auto IP4layer = packet.getLayerOfType<pcpp::IPv4Layer>();
@@ -334,7 +367,7 @@ void MetricsMgr::_periodShift()
334367
_instantRates->resetQuantiles();
335368

336369
// add new bucket
337-
_metrics.emplace_back(std::make_unique<Metrics>());
370+
_metrics.emplace_back(std::make_unique<Metrics>(*this));
338371
if (_metrics.size() > _numPeriods) {
339372
// if we're at our period history length, pop the oldest
340373
_metrics.pop_front();
@@ -352,6 +385,11 @@ void MetricsMgr::setInitialShiftTS(const pcpp::Packet &packet) {
352385

353386
void MetricsMgr::newPacket(const pcpp::Packet &packet, QueryResponsePairMgr &pairMgr, pcpp::ProtocolType l4, Direction dir, pcpp::ProtocolType l3)
354387
{
388+
// at each new packet, we determine if we are sampling, to limit collection of more detailed (expensive) statistics
389+
_shouldDeepSample = true;
390+
if (_deepSampleRate != 100) {
391+
_shouldDeepSample = (_rng.uniform(0, 100) <= _deepSampleRate);
392+
}
355393
if (!_singleSummaryMode) {
356394
// use packet timestamps to track when PERIOD_SEC passes so we don't have to hit system clock
357395
auto pkt_ts = packet.getRawPacketReadOnly()->getPacketTimeStamp();
@@ -372,7 +410,7 @@ void MetricsMgr::newPacket(const pcpp::Packet &packet, QueryResponsePairMgr &pai
372410
break;
373411
}
374412
}
375-
_metrics.back()->newPacket(*this, packet, l3, l4, dir);
413+
_metrics.back()->newPacket(packet, l3, l4, dir);
376414
}
377415

378416
void Metrics::toJSON(nlohmann::json &j, const std::string &key)
@@ -382,6 +420,8 @@ void Metrics::toJSON(nlohmann::json &j, const std::string &key)
382420
std::shared_lock lock_sketch(_sketchMutex);
383421
std::shared_lock lock_rate(_rateSketchMutex);
384422

423+
j[key]["packets"]["deep_samples"] = _numSamples.load();
424+
385425
j[key]["packets"]["total"] = _numPackets.load();
386426
j[key]["packets"]["udp"] = _numPackets_UDP.load();
387427
j[key]["packets"]["tcp"] = _numPackets_TCP.load();
@@ -588,6 +628,7 @@ std::string MetricsMgr::getAppMetrics()
588628
{
589629
nlohmann::json j;
590630
j["app"]["version"] = PKTVISOR_VERSION_NUM;
631+
j["app"]["deep_sample_rate_pct"] = _deepSampleRate;
591632
j["app"]["periods"] = _numPeriods;
592633
j["app"]["single_summary"] = _singleSummaryMode;
593634
j["app"]["up_time_min"] = float(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - _startTime).count()) / 60;
@@ -660,7 +701,7 @@ std::string MetricsMgr::getMetricsMerged(uint64_t period)
660701
}
661702

662703
auto period_length = 0;
663-
Metrics merged;
704+
Metrics merged(*this);
664705

665706
auto p = period;
666707
for (auto &m : _metrics) {

0 commit comments

Comments
 (0)