forked from DataDog/datadog-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregator.c
446 lines (374 loc) · 15 KB
/
aggregator.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2019-present Datadog, Inc.
#include "aggregator.h"
#include "rtloader_mem.h"
#include "stringutils.h"
// these must be set by the Agent
static cb_submit_metric_t cb_submit_metric = NULL;
static cb_submit_service_check_t cb_submit_service_check = NULL;
static cb_submit_event_t cb_submit_event = NULL;
static cb_submit_histogram_bucket_t cb_submit_histogram_bucket = NULL;
static cb_submit_event_platform_event_t cb_submit_event_platform_event = NULL;
// forward declarations
static PyObject *submit_metric(PyObject *self, PyObject *args);
static PyObject *submit_service_check(PyObject *self, PyObject *args);
static PyObject *submit_event(PyObject *self, PyObject *args);
static PyObject *submit_histogram_bucket(PyObject *self, PyObject *args);
static PyObject *submit_event_platform_event(PyObject *self, PyObject *args);
static PyMethodDef methods[] = {
{ "submit_metric", (PyCFunction)submit_metric, METH_VARARGS, "Submit metrics." },
{ "submit_service_check", (PyCFunction)submit_service_check, METH_VARARGS, "Submit service checks." },
{ "submit_event", (PyCFunction)submit_event, METH_VARARGS, "Submit events." },
{ "submit_histogram_bucket", (PyCFunction)submit_histogram_bucket, METH_VARARGS, "Submit histogram bucket." },
{ "submit_event_platform_event", (PyCFunction)submit_event_platform_event, METH_VARARGS, "Submit event platform event." },
{ NULL, NULL } // guards
};
/*! \fn add_constants(PyObject *m)
\brief A helper function to add a a set of constants to a python module.
\param m A PyObject * pointer to the module you wish to add the constant to.
The returned char ** string array pointer is heap allocated here and should
be subsequently freed by the caller. This function may set and raise python
interpreter errors. The function is static and not in the builtin's API.
*/
static void add_constants(PyObject *m)
{
PyModule_AddIntConstant(m, "GAUGE", DATADOG_AGENT_RTLOADER_GAUGE);
PyModule_AddIntConstant(m, "RATE", DATADOG_AGENT_RTLOADER_RATE);
PyModule_AddIntConstant(m, "COUNT", DATADOG_AGENT_RTLOADER_COUNT);
PyModule_AddIntConstant(m, "MONOTONIC_COUNT", DATADOG_AGENT_RTLOADER_MONOTONIC_COUNT);
PyModule_AddIntConstant(m, "COUNTER", DATADOG_AGENT_RTLOADER_COUNTER);
PyModule_AddIntConstant(m, "HISTOGRAM", DATADOG_AGENT_RTLOADER_HISTOGRAM);
PyModule_AddIntConstant(m, "HISTORATE", DATADOG_AGENT_RTLOADER_HISTORATE);
}
#ifdef DATADOG_AGENT_THREE
static struct PyModuleDef module_def = { PyModuleDef_HEAD_INIT, AGGREGATOR_MODULE_NAME, NULL, -1, methods };
PyMODINIT_FUNC PyInit_aggregator(void)
{
PyObject *m = PyModule_Create(&module_def);
add_constants(m);
return m;
}
#elif defined(DATADOG_AGENT_TWO)
// module object storage
static PyObject *module;
void Py2_init_aggregator()
{
module = Py_InitModule(AGGREGATOR_MODULE_NAME, methods);
add_constants(module);
}
#endif
void _set_submit_metric_cb(cb_submit_metric_t cb)
{
cb_submit_metric = cb;
}
void _set_submit_service_check_cb(cb_submit_service_check_t cb)
{
cb_submit_service_check = cb;
}
void _set_submit_event_cb(cb_submit_event_t cb)
{
cb_submit_event = cb;
}
void _set_submit_histogram_bucket_cb(cb_submit_histogram_bucket_t cb)
{
cb_submit_histogram_bucket = cb;
}
void _set_submit_event_platform_event_cb(cb_submit_event_platform_event_t cb)
{
cb_submit_event_platform_event = cb;
}
/*! \fn py_tag_to_c(PyObject *py_tags)
\brief A function to convert a list of python strings (tags) into an
array of C-strings.
\return a char ** pointer to the C-representation of the provided python
tag list. In the event of failure NULL is returned.
The returned char ** string array pointer is heap allocated here and should
be subsequently freed by the caller. This function may set and raise python
interpreter errors. The function is static and not in the builtin's API.
*/
static char **py_tag_to_c(PyObject *py_tags)
{
char **tags = NULL;
PyObject *py_tags_list = NULL; // new reference
if (!PySequence_Check(py_tags)) {
PyErr_SetString(PyExc_TypeError, "tags must be a sequence");
return NULL;
}
int len = PySequence_Length(py_tags);
if (len == -1) {
PyErr_SetString(PyExc_RuntimeError, "could not compute tags length");
return NULL;
} else if (len == 0) {
if (!(tags = _malloc(sizeof(*tags)))) {
PyErr_SetString(PyExc_RuntimeError, "could not allocate memory for tags");
return NULL;
}
tags[0] = NULL;
return tags;
}
py_tags_list = PySequence_Fast(py_tags, "py_tags is not a sequence"); // new reference
if (py_tags_list == NULL) {
goto done;
}
if (!(tags = _malloc(sizeof(*tags) * (len + 1)))) {
PyErr_SetString(PyExc_RuntimeError, "could not allocate memory for tags");
goto done;
}
int nb_valid_tag = 0;
int i;
for (i = 0; i < len; i++) {
// `item` is borrowed, no need to decref
PyObject *item = PySequence_Fast_GET_ITEM(py_tags_list, i);
char *ctag = as_string(item);
if (ctag == NULL) {
continue;
}
tags[nb_valid_tag] = ctag;
nb_valid_tag++;
}
tags[nb_valid_tag] = NULL;
done:
Py_XDECREF(py_tags_list);
return tags;
}
/*! \fn free_tags(char **tags)
\brief A helper function to free the memory allocated by the py_tag_to_c() function.
This function is for internal use and expects the tag array to be properly intialized,
and have a NULL canary at the end of the array, just like py_tag_to_c() initializes and
populates the array. Be mindful if using this function in any other context.
*/
static void free_tags(char **tags)
{
int i;
for (i = 0; tags[i] != NULL; i++) {
_free(tags[i]);
}
_free(tags);
}
/*! \fn submit_metric(PyObject *self, PyObject *args)
\brief Aggregator builtin class method for metric submission.
\param self A PyObject * pointer to self - the aggregator module.
\param args A PyObject * pointer to the python args or kwargs.
\return This function returns a new reference to None (already INCREF'd), or NULL in case of error.
This function implements the `submit_metric` python callable in C and is used from the python code.
More specifically, in the context of rtloader and datadog-agent, this is called from our python base check
class to submit metrics to the aggregator.
*/
static PyObject *submit_metric(PyObject *self, PyObject *args)
{
if (cb_submit_metric == NULL) {
Py_RETURN_NONE;
}
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject *check = NULL; // borrowed
PyObject *py_tags = NULL; // borrowed
char *name = NULL;
char *hostname = NULL;
char *check_id = NULL;
char **tags = NULL;
int mt;
double value;
bool flush_first_value = false;
// Python call: aggregator.submit_metric(self, check_id, aggregator.metric_type.GAUGE, name, value, tags, hostname, flush_first_value)
if (!PyArg_ParseTuple(args, "OsisdOs|b", &check, &check_id, &mt, &name, &value, &py_tags, &hostname, &flush_first_value)) {
goto error;
}
if ((tags = py_tag_to_c(py_tags)) == NULL)
goto error;
cb_submit_metric(check_id, mt, name, value, tags, hostname, flush_first_value);
free_tags(tags);
PyGILState_Release(gstate);
Py_RETURN_NONE;
error:
PyGILState_Release(gstate);
return NULL;
}
/*! \fn submit_service_check(PyObject *self, PyObject *args)
\brief Aggregator builtin class method for service_check submission.
\param self A PyObject * pointer to self - the aggregator module.
\param args A PyObject * pointer to the python args or kwargs.
\return This function returns a new reference to None (already INCREF'd), or NULL in case of error.
This function implements the `submit_service_check` python callable in C and is used from the python code.
More specifically, in the context of rtloader and datadog-agent, this is called from our python base check
class to submit service_checks to the aggregator.
*/
static PyObject *submit_service_check(PyObject *self, PyObject *args)
{
if (cb_submit_service_check == NULL) {
Py_RETURN_NONE;
}
// acquiring GIL to be able to raise exception
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject *check = NULL; // borrowed
PyObject *py_tags = NULL; // borrowed
char *name = NULL;
int status;
char *hostname = NULL;
char *message = NULL;
char *check_id = NULL;
char **tags = NULL;
// aggregator.submit_service_check(self, check_id, name, status, tags, hostname, message)
if (!PyArg_ParseTuple(args, "OssiOss", &check, &check_id, &name, &status, &py_tags, &hostname, &message)) {
goto error;
}
if ((tags = py_tag_to_c(py_tags)) == NULL)
goto error;
cb_submit_service_check(check_id, name, status, tags, hostname, message);
free_tags(tags);
PyGILState_Release(gstate);
Py_RETURN_NONE;
error:
PyGILState_Release(gstate);
return NULL;
}
/*! \fn submit_event(PyObject *self, PyObject *args)
\brief Aggregator builtin class method for event submission.
\param self A PyObject * pointer to self - the aggregator module.
\param args A PyObject * pointer to the python args or kwargs.
\return This function returns a new reference to None (already INCREF'd), or NULL in case of error.
This function implements the `submit_event` python callable in C and is used from the python code.
More specifically, in the context of rtloader and datadog-agent, this is called from our python base check
class to submit events to the aggregator.
*/
static PyObject *submit_event(PyObject *self, PyObject *args)
{
if (cb_submit_event == NULL) {
Py_RETURN_NONE;
}
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject *check = NULL; // borrowed
PyObject *event_dict = NULL; // borrowed
PyObject *py_tags = NULL; // borrowed
char *check_id = NULL;
event_t *ev = NULL;
PyObject * retval = NULL;
// aggregator.submit_event(self, check_id, event)
if (!PyArg_ParseTuple(args, "OsO", &check, &check_id, &event_dict)) {
// error is set by PyArg_ParseTuple but we return NULL to raise
retval = NULL;
goto gstate_cleanup;
}
if (!PyDict_Check(event_dict)) {
PyErr_SetString(PyExc_TypeError, "event must be a dict");
// returning NULL to raise error
retval = NULL;
goto gstate_cleanup;
}
if (!(ev = (event_t *)_malloc(sizeof(event_t)))) {
PyErr_SetString(PyExc_RuntimeError, "could not allocate memory for event");
retval = NULL;
goto gstate_cleanup;
}
// notice: PyDict_GetItemString returns a borrowed ref or NULL if key was not found
ev->title = as_string(PyDict_GetItemString(event_dict, "msg_title"));
ev->text = as_string(PyDict_GetItemString(event_dict, "msg_text"));
// PyLong_AsLong will fail if called passing a NULL argument, be safe
if (PyDict_GetItemString(event_dict, "timestamp") != NULL) {
ev->ts = PyLong_AsLong(PyDict_GetItemString(event_dict, "timestamp"));
if (ev->ts == -1) {
// we ignore the error and set the timestamp to 0 (magic value that
// will result in the current time) to ensure backward compatibility
// with the pre-rtloader API
PyErr_Clear();
ev->ts = 0;
}
} else {
ev->ts = 0;
}
ev->priority = as_string(PyDict_GetItemString(event_dict, "priority"));
ev->host = as_string(PyDict_GetItemString(event_dict, "host"));
ev->alert_type = as_string(PyDict_GetItemString(event_dict, "alert_type"));
ev->aggregation_key = as_string(PyDict_GetItemString(event_dict, "aggregation_key"));
ev->source_type_name = as_string(PyDict_GetItemString(event_dict, "source_type_name"));
ev->event_type = as_string(PyDict_GetItemString(event_dict, "event_type"));
// process the list of tags, set ev->tags = NULL if tags are missing
py_tags = PyDict_GetItemString(event_dict, "tags");
if (py_tags != NULL) {
ev->tags = py_tag_to_c(py_tags);
if (ev->tags == NULL) {
// we need to return NULL to raise the exception set by PyErr_SetString in py_tag_to_c
retval = NULL;
goto ev_cleanup;
}
} else {
ev->tags = NULL;
}
// send the event
cb_submit_event(check_id, ev);
//Success
Py_INCREF(Py_None); //Increment, sice we are not using the macro Py_RETURN_NONE that does it for us
retval = Py_None;
ev_cleanup:
if (ev->tags != NULL) {
free_tags(ev->tags);
}
_free(ev->title);
_free(ev->text);
_free(ev->priority);
_free(ev->host);
_free(ev->alert_type);
_free(ev->aggregation_key);
_free(ev->source_type_name);
_free(ev->event_type);
_free(ev);
gstate_cleanup:
PyGILState_Release(gstate);
return retval;
}
static PyObject *submit_histogram_bucket(PyObject *self, PyObject *args)
{
if (cb_submit_histogram_bucket == NULL) {
Py_RETURN_NONE;
}
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject *check = NULL; // borrowed
PyObject *py_tags = NULL; // borrowed
char *check_id = NULL;
char *name = NULL;
long long value;
float lower_bound;
float upper_bound;
int monotonic;
char *hostname = NULL;
char **tags = NULL;
bool flush_first_value = false;
// Python call: aggregator.submit_histogram_bucket(self, metric string, value, lowerBound, upperBound, monotonic, hostname, tags, flush_first_value)
if (!PyArg_ParseTuple(args, "OssLffisO|b", &check, &check_id, &name, &value, &lower_bound, &upper_bound, &monotonic, &hostname, &py_tags, &flush_first_value)) {
goto error;
}
if ((tags = py_tag_to_c(py_tags)) == NULL)
goto error;
cb_submit_histogram_bucket(check_id, name, value, lower_bound, upper_bound, monotonic, hostname, tags, flush_first_value);
free_tags(tags);
PyGILState_Release(gstate);
Py_RETURN_NONE;
error:
PyGILState_Release(gstate);
return NULL;
}
static PyObject *submit_event_platform_event(PyObject *self, PyObject *args)
{
if (cb_submit_event_platform_event == NULL) {
Py_RETURN_NONE;
}
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject *check = NULL;
char *check_id = NULL;
char *raw_event_ptr = NULL;
Py_ssize_t raw_event_sz = 0;
char *event_type = NULL;
if (!PyArg_ParseTuple(args, "Oss#s", &check, &check_id, &raw_event_ptr, &raw_event_sz, &event_type)) {
PyGILState_Release(gstate);
return NULL;
}
if (raw_event_sz > INT_MAX) {
PyErr_SetString(PyExc_ValueError, "event is too large");
PyGILState_Release(gstate);
return NULL;
}
cb_submit_event_platform_event(check_id, raw_event_ptr, raw_event_sz, event_type);
PyGILState_Release(gstate);
Py_RETURN_NONE;
}