-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathakvo-top-asn
executable file
·298 lines (233 loc) · 9.61 KB
/
akvo-top-asn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python3
import clickhouse_connect
from collections import namedtuple
import math
import numpy as np
import argparse
import os
import datetime
import configparser
import requests
import yaml
import logging
from typing import Tuple, List, Dict
def setup_logging(args):
logger = logging.getLogger("akvo-top-asn")
if args.debug:
logger.setLevel(logging.DEBUG)
elif args.quiet:
logger.setLevel(logging.WARNING)
else:
logger.setLevel(logging.INFO)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
return logger
def check_conf_filename(fn):
"""
Some simple (Race condition!) config filename checks to create
polite error messages.
"""
if not os.path.exists(fn):
raise argparse.ArgumentTypeError(f"{fn} does not exist")
if not os.path.isfile(fn):
raise argparse.ArgumentTypeError(f"{fn} is not a file")
return fn
def check_time_stamp(ts):
return datetime.datetime.fromisoformat(ts)
# Instantiate the parser
parser = argparse.ArgumentParser(description="Akvorado top ASN stats")
parser.add_argument(
"-c", "--config", type=check_conf_filename, default="akvo-top-asn.conf", help="configuration filename"
)
parser.add_argument("--starttime", type=check_time_stamp)
parser.add_argument("--endtime", type=check_time_stamp)
parser.add_argument("--days", type=int, default=0)
parser.add_argument("--weeks", type=int, default=1)
parser.add_argument("--months", type=int, default=0)
parser.add_argument("--filename", default=argparse.SUPPRESS, help="filename strftime pattern for saving and uploading results" )
group = parser.add_argument_group("export actions")
group.add_argument("--save", action="store_true", help="save result to local file")
group.add_argument("--upload", action="store_true", help="upload result to remote http url")
group.add_argument("--print", action="store_true", help="print result to stdout")
parser.add_argument("--query-limit", type=int, default=50)
parser.add_argument("--value-limit", type=int, default=1000)
# logging settings
logging_group = parser.add_mutually_exclusive_group()
logging_group.add_argument("--debug", action="store_true", help="enable debug logging")
logging_group.add_argument("--quiet", action="store_true", help="suppress informational logging")
args = parser.parse_args()
logger = setup_logging(args)
if not (args.print or args.upload or args.save):
parser.error("You need to specify at least one action!")
config = configparser.ConfigParser(interpolation=None)
config.read(args.config)
starttime = None
endtime = None
# if no starttime is given we will take a timestamp thats in the past by some default offset
if args.starttime is None:
delta = datetime.timedelta(
days=args.days + args.weeks * 7 + args.months * 30,
)
starttime = datetime.datetime.utcnow() - delta
else:
startime = args.startime
# if no endtime is given we take the current time as endtime
if args.endtime is None:
endtime = datetime.datetime.utcnow()
else:
endtime = args.endtime
time_range = [starttime.strftime("%Y-%m-%d %H:%M:%S"), endtime.strftime("%Y-%m-%d %H:%M:%S")]
si_prefixes = ["", "K", "M", "G"]
directions = ["Out", "In"]
def format_bps(n):
"""
Format value with bps unit and SI prefix.
"""
idx = max(0, min(len(si_prefixes) - 1, int(math.floor(0 if n == 0 else math.log10(abs(n)) / 3))))
return "{:.0f}{}bps".format(n / 10 ** (3 * idx), si_prefixes[idx])
def get_connection(config):
"""
Create a clickhouse client instance based on the config
parameters of the [clickhouse] section.
"""
params = {"host": config["clickhouse"].get("host", "localhost")}
# add optional parameters
for param_name, param_type in [
("secure", bool),
("port", int),
("username", str),
("password", str),
]:
if param_name in config["clickhouse"]:
try:
params[param_name] = param_type(config["clickhouse"][param_name])
except ValueError as ex:
print(f"Bad config value for clickhouse.{param_name}: {ex}")
exit(1)
logger.info("connecting to clickhouse at %s", params["host"])
return clickhouse_connect.get_client(**params)
def query_clickhouse(time_range: Tuple[str, str], direction: str, offset:int = 0, limit: int = 50):
query = f"""
WITH
source AS (SELECT * FROM flows_5m0s SETTINGS asterisk_include_alias_columns = 1),
rows AS (SELECT SrcAS FROM source WHERE TimeReceived BETWEEN toDateTime('{time_range[0]}', 'UTC') AND toDateTime('{time_range[1]}', 'UTC') AND (InIfBoundary = 'external') AND WHERE SUM(Bytes) >= GROUP BY SrcAS ORDER BY SUM(Bytes) DESC LIMIT {limit} OFFSET {offset})
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived + INTERVAL 900 second, INTERVAL 900 second) - INTERVAL 900 second AS time,
SUM(Bytes*SamplingRate*8)/900 AS xps,
if((SrcAS) IN rows, [concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???'))], ['0: Other']) AS dimensions
FROM source
WHERE TimeReceived BETWEEN toDateTime('{time_range[0]}', 'UTC') AND toDateTime('{time_range[1]}', 'UTC') AND ({direction}IfBoundary = 'external')
GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toDateTime('{time_range[0]}', 'UTC')
TO toDateTime('{time_range[1]}', 'UTC') + INTERVAL 1 second
STEP 900
INTERPOLATE (dimensions AS ['0: Other']))
"""
logger.debug("clickhouse query: %s", query)
return client.query(query).result_rows
# connect to clickhouse
try:
client = get_connection(config)
except clickhouse_connect.driver.exceptions.DatabaseError as ex:
logger.error("Failed to connect to clickhouse: %s", ex)
exit(2)
logger.info("query time range: %s - %s", *time_range)
value_limit = config["query"].get("value_limit", 1000)
query_limit = config["query"].get("query_limit", 100)
# build list of bandwidths per ASN
asn_xps = {}
for direction in directions: # in bound and outbound
i = 0
while True:
logger.info(f"query results from {i * query_limit} until {(i + 1) * query_limit} ... ")
query_result = query_clickhouse(time_range, direction, limit=query_limit, offset= i * query_limit)
for axis, ts, xps, _asn in query_result:
asn = _asn[0]
if asn in asn_xps:
if direction in asn_xps[asn]:
asn_xps[asn][direction].append(xps)
else:
asn_xps[asn][direction] = [xps]
else:
asn_xps[asn] = {direction: [xps]}
if query_result < query_limit and (i * query_limit) < value_limit:
i += 1
else:
break
logger.info("building statistics...")
# build stats for each ASN
asn_stats = {}
for _asn, xps in asn_xps.items():
asn, org = map(str.strip, _asn.split(':', 1))
asn = int(asn)
# skip 'Other'
if asn == 0:
continue
stats = {
'org': org,
}
for direction in directions:
d = direction.lower()
if direction in xps:
arr = np.array(xps[direction])
stats[f"{d}_avg"] = np.mean(arr)
stats[f"{d}_p95"] = np.percentile(arr, 95)
stats[f"{d}_max"] = np.max(arr)
else:
stats[f"{d}_avg"] = 0
stats[f"{d}_p95"] = 0
stats[f"{d}_max"] = 0
asn_stats[asn] = stats
if args.save or args.upload:
fn = starttime.strftime(getattr(args, 'filename', config['upload'].get('filename', 'export-%Y-W%W.yml')))
# dump result to string
yaml.add_representer(np.float64, lambda dumper, data: dumper.represent_scalar('tag:yaml.org,2002:float', str(data)))
result = yaml.dump({
'from': time_range[0],
'to': time_range[1],
# 'local_asn': local_asn,
'top_peers': asn_stats,
})
# save result to local file
if args.save:
logger.info("saving to file: %s", fn)
with open(fn, 'w') as fh:
fh.write(result)
# upload result to http remote location
if args.upload:
params = {
"headers": {
"X-Requested-With": "XMLHttpRequest",
},
"data": result,
}
if "username" in config["upload"] or "password" in config["upload"]:
params["auth"] = (config["upload"].get("username"), config["upload"].get("password"))
url = config["upload"]["url"] + "/" + fn
logger.info("uploading to: %s", url)
response = requests.put(url, **params)
if response.status_code in range(200, 299):
logger.info("upload succeeded")
else:
logging.error("upload failed: %d - %s", response.status_code, response.text)
# print to stdout
if args.print:
# poor-mans table headers
print("{asn: <7} {org: <36}".format(asn="ASN", org="ORG"), end="")
columns = []
for direction in directions:
for metric in ["avg", "p95", "max"]:
columns.append(f"{{{direction.lower()}_{metric}: >9}}")
print(" {metric: <9}".format(metric=f"{direction.lower()}_{metric}"), end="")
# sort & print ASN stats
for asn, stats in sorted(asn_stats.items(), key=lambda i: i[1]["out_p95"] + i[1]["in_p95"], reverse=True):
print(f'{{asn: <7}} {{org: <36}} {" ".join(columns)}'.format(asn=asn, org=stats['org'], **{k: format_bps(v) for k, v in stats.items() if k != "org"}))