Skip to content

metrics: add process CPU and memory metrics #10427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Fluent Bit runs on x86_64, x86, arm32v7, and arm64v8 architectures.
- Extensibility
- Write any input, filter or output plugin in C language
- Write [Filters in Lua](https://docs.fluentbit.io/manual/filter/lua) or [Output plugins in Golang](https://docs.fluentbit.io/manual/development/golang-output-plugins)
- [Monitoring](https://docs.fluentbit.io/manual/administration/monitoring): expose internal metrics over HTTP in JSON and [Prometheus](https://prometheus.io/) format
- [Monitoring](https://docs.fluentbit.io/manual/administration/monitoring): expose internal metrics, including Fluent Bit process CPU and memory usage, over HTTP in JSON and [Prometheus](https://prometheus.io/) format
- [Stream Processing](https://docs.fluentbit.io/manual/stream-processing/introduction): Perform data selection and transformation using simple SQL queries
- Create new streams of data using query results
- Aggregation Windows
Expand Down
100 changes: 100 additions & 0 deletions src/flb_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_metrics.h>
#include <msgpack.h>
#ifdef FLB_SYSTEM_WINDOWS
#include <windows.h>
#include <psapi.h>
#else
#include <sys/resource.h>
#include <unistd.h>
#endif

static int id_exists(int id, struct flb_metrics *metrics)
{
Expand Down Expand Up @@ -352,6 +359,97 @@ static int attach_hot_reload_info(struct flb_config *ctx, struct cmt *cmt, uint6
return 0;
}

static int attach_process_cpu_usage(struct flb_config *ctx, struct cmt *cmt,
uint64_t ts, char *hostname)
{
double val;
struct cmt_gauge *g;

#ifdef FLB_SYSTEM_WINDOWS
FILETIME creation_time;
FILETIME exit_time;
FILETIME kernel_time;
FILETIME user_time;
ULARGE_INTEGER kt;
ULARGE_INTEGER ut;

if (!GetProcessTimes(GetCurrentProcess(), &creation_time, &exit_time,
&kernel_time, &user_time)) {
return -1;
}

kt.LowPart = kernel_time.dwLowDateTime;
kt.HighPart = kernel_time.dwHighDateTime;
ut.LowPart = user_time.dwLowDateTime;
ut.HighPart = user_time.dwHighDateTime;

val = ((double) (kt.QuadPart + ut.QuadPart)) / 10000000.0;
#else
struct rusage ru;

if (getrusage(RUSAGE_SELF, &ru) == -1) {
return -1;
}
val = (double) ru.ru_utime.tv_sec + ((double) ru.ru_utime.tv_usec / 1e6) +
(double) ru.ru_stime.tv_sec + ((double) ru.ru_stime.tv_usec / 1e6);
#endif

g = cmt_gauge_create(cmt, "fluentbit", "", "process_cpu_seconds_total",
"Total user and system CPU time spent in seconds.",
1, (char *[]) {"hostname"});
if (!g) {
return -1;
}

cmt_gauge_set(g, ts, val, 1, (char *[]) {hostname});

return 0;
}

static int attach_process_memory_usage(struct flb_config *ctx, struct cmt *cmt,
uint64_t ts, char *hostname)
{
double val;
struct cmt_gauge *g;

#ifdef FLB_SYSTEM_WINDOWS
PROCESS_MEMORY_COUNTERS pmc;

if (!GetProcessMemoryInfo(GetCurrentProcess(), &pmc, sizeof(pmc))) {
return -1;
}

val = (double) pmc.WorkingSetSize;
#else
long rss = 0;
long page_size = sysconf(_SC_PAGE_SIZE);
FILE *fp = fopen("/proc/self/statm", "r");

if (!fp) {
return -1;
}

if (fscanf(fp, "%*s %ld", &rss) != 1) {
fclose(fp);
return -1;
}
fclose(fp);

val = (double) rss * (double) page_size;
#endif

g = cmt_gauge_create(cmt, "fluentbit", "", "process_resident_memory_bytes",
"Resident memory size in bytes.",
1, (char *[]) {"hostname"});
if (!g) {
return -1;
}

cmt_gauge_set(g, ts, val, 1, (char *[]) {hostname});

return 0;
}

/* Append internal Fluent Bit metrics to context */
int flb_metrics_fluentbit_add(struct flb_config *ctx, struct cmt *cmt)
{
Expand All @@ -371,6 +469,8 @@ int flb_metrics_fluentbit_add(struct flb_config *ctx, struct cmt *cmt)
/* Attach metrics to cmetrics context */
attach_uptime(ctx, cmt, ts, hostname);
attach_process_start_time_seconds(ctx, cmt, ts, hostname);
attach_process_cpu_usage(ctx, cmt, ts, hostname);
attach_process_memory_usage(ctx, cmt, ts, hostname);
attach_build_info(ctx, cmt, ts, hostname);
attach_hot_reload_info(ctx, cmt, ts, hostname);

Expand Down
87 changes: 86 additions & 1 deletion tests/runtime/in_fluentbit_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,51 @@ static void test_ctx_destroy(struct test_ctx *ctx)
}

#ifdef FLB_HAVE_METRICS
char *basic_expected_strs[] = {"\"uptime\"", "\"records_total\"", "\"bytes_total\"", "\"proc_records_total\"", "\"proc_bytes_total\"", "\"errors_total\"", "\"retries_total\"", "\"retries_failed_total\"", "\"dropped_records_total\"", "\"retried_records_total\""};
char *basic_expected_strs[] = {"\"uptime\"", "\"records_total\"", "\"bytes_total\"", "\"proc_records_total\"", "\"proc_bytes_total\"", "\"errors_total\"", "\"retries_total\"", "\"retries_failed_total\"", "\"dropped_records_total\"", "\"retried_records_total\"", "\"process_cpu_seconds_total\"", "\"process_resident_memory_bytes\""};

static double get_metric_value(const char *json, const char *name)
{
char search[128];
char *p;

/* locate metric entry by name */
snprintf(search, sizeof(search), "\"name\":\"%s\"", name);
p = strstr(json, search);
if (!p) {
return -1.0;
}

/* find the value field after the name */
p = strstr(p, "\"value\":");
if (!p) {
return -1.0;
}
p += strlen("\"value\":");
return atof(p);
}

static int cb_check_metric_values(void *record, size_t size, void *data)
{
char *result = (char *) record;
double cpu;
double mem;


cpu = get_metric_value(result, "process_cpu_seconds_total");
mem = get_metric_value(result, "process_resident_memory_bytes");

if (!TEST_CHECK(cpu >= 0)) {
TEST_MSG("invalid cpu seconds value");
}

if (!TEST_CHECK(mem > 0)) {
TEST_MSG("invalid memory value");
}

set_output_num(get_output_num() + 1);
flb_free(record);
return 0;
}

static void test_basic(void)
{
Expand Down Expand Up @@ -216,11 +260,52 @@ static void test_basic(void)

test_ctx_destroy(ctx);
}

static void test_values(void)
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
int ret;
int num;

clear_output_num();

cb_data.cb = cb_check_metric_values;
cb_data.data = NULL;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_input_set(ctx->flb, ctx->i_ffd,
"scrape_interval", "1",
NULL);
TEST_CHECK(ret == 0);

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"format", "json",
NULL);
TEST_CHECK(ret == 0);

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

wait_with_timeout(3000, &num);

if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}

test_ctx_destroy(ctx);
}
#endif

TEST_LIST = {
#ifdef FLB_HAVE_METRICS
{"basic", test_basic},
{"values", test_values},
#endif
{NULL, NULL}
};
Loading