From 4877f7721372919ee9cb842c8e42c363e1145006 Mon Sep 17 00:00:00 2001 From: Vanessa Zhang Date: Tue, 18 Feb 2025 17:17:36 -0800 Subject: [PATCH 1/4] lib: new apis to support creating processor unit for input and output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch introduces new APIs to create a single processor unit for input and output instances: 1. flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, …) Create a single input processor unit for a specific event type and unit name 2. flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, …) Create a single output processor unit for a specific event type and unit name Signed-off-by: Vanessa Zhang --- include/fluent-bit/flb_lib.h | 6 ++ src/flb_lib.c | 116 +++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 88ff8cb0af3..ecb51f3dc24 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -52,8 +52,14 @@ FLB_EXPORT void flb_init_env(); FLB_EXPORT flb_ctx_t *flb_create(); FLB_EXPORT void flb_destroy(flb_ctx_t *ctx); FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data); +FLB_EXPORT int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, + ...); FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb); +FLB_EXPORT int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, + ...); FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data); FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...); diff --git a/src/flb_lib.c b/src/flb_lib.c index 30d4d99dd40..8e01f7c2749 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -329,6 +330,72 @@ int flb_input_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +static int flb_processor_event_type_get(const char *event_type) +{ + if (strcasecmp(event_type, "logs") == 0) { + return FLB_PROCESSOR_LOGS; + } + else if (strcasecmp(event_type, "metrics") == 0) { + return FLB_PROCESSOR_METRICS; + } + else if (strcasecmp(event_type, "traces") == 0) { + return FLB_PROCESSOR_TRACES; + } + else if (strcasecmp(event_type, "profiles") == 0) { + return FLB_PROCESSOR_PROFILES; + } + else { + return -1; + } +} + +/* Create a single processor unit for the input processor */ +int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, ...) +{ + int ret; + int type; + struct flb_input_instance *i_ins; + struct flb_processor *proc; + struct flb_processor_unit *pu; + char *key; + char *value; + va_list va; + + i_ins = in_instance_get(ctx, ffd); + if (!i_ins) { + return -1; + } + proc = i_ins->processor; + if (!proc) { + return -1; + } + type = flb_processor_event_type_get(event_type); + if (type == -1) { + return -1; + } + pu = flb_processor_unit_create(proc, type, processor_unit_name); + va_start(va, ffd); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + if (!value) { + /* Wrong parameter */ + va_end(va); + return -1; + } + struct cfl_variant cfl_value = { + .type = CFL_VARIANT_STRING, + .data.as_string = value, + }; + ret = flb_processor_unit_set_property(pu, key, &cfl_value); + if (ret != 0) { + va_end(va); + return -1; + } + } + return 0; +} + int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) { struct flb_input_instance *i_ins; @@ -545,6 +612,55 @@ int flb_output_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +/* Create a single processor unit for the input processor */ +int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, ...) +{ + int ret; + int type; + struct flb_output_instance *o_ins; + struct flb_processor *proc; + struct flb_processor_unit *pu; + char *key; + char *value; + va_list va; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + proc = o_ins->processor; + if (!proc) { + return -1; + } + type = flb_processor_event_type_get(event_type); + if (type == -1) { + return -1; + } + pu = flb_processor_unit_create(proc, type, processor_unit_name); + va_start(va, ffd); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + if (!value) { + /* Wrong parameter */ + va_end(va); + return -1; + } + + struct cfl_variant cfl_value = { + .type = CFL_VARIANT_STRING, + .data.as_string = value, + }; + + ret = flb_processor_unit_set_property(pu, key, &cfl_value); + if (ret != 0) { + va_end(va); + return -1; + } + } + return 0; +} + int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) { struct flb_output_instance *o_ins; From 6f17e0b9913edaff52498e6ecdd1a4788c3fc7f6 Mon Sep 17 00:00:00 2001 From: Vanessa Zhang Date: Tue, 18 Feb 2025 15:47:32 -0800 Subject: [PATCH 2/4] processor: update function signature of flb_processor_unit_create to use const char* for unit_name Signed-off-by: Vanessa Zhang --- include/fluent-bit/flb_processor.h | 2 +- src/flb_processor.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 93af7923bba..691e51bebdd 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -229,7 +229,7 @@ int flb_processor_run(struct flb_processor *proc, struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, int event_type, - char *unit_name); + const char *unit_name); void flb_processor_unit_destroy(struct flb_processor_unit *pu); int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v); diff --git a/src/flb_processor.c b/src/flb_processor.c index 1bd2a10541b..87b4e7cfb17 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -161,7 +161,7 @@ struct flb_processor *flb_processor_create(struct flb_config *config, struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, int event_type, - char *unit_name) + const char *unit_name) { int result; struct mk_list *head; From eba4bc816aa1b7b49a35ea95a9241e153c44c532 Mon Sep 17 00:00:00 2001 From: Vanessa Zhang Date: Mon, 14 Apr 2025 17:18:22 -0700 Subject: [PATCH 3/4] processor: add unit tests for the new lib processor apis: flb_input_processor_unit and flb_output_processor_unit Signed-off-by: Vanessa Zhang --- tests/internal/processor.c | 92 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/tests/internal/processor.c b/tests/internal/processor.c index 679fd131b8d..f63d2a988d8 100644 --- a/tests/internal/processor.c +++ b/tests/internal/processor.c @@ -119,7 +119,99 @@ static void processor() flb_sds_destroy(hostname_prop_key); } +static void input_processor() +{ + flb_ctx_t *ctx; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct mk_list *head; + int ret; + int in_ffd; + int found = 0; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + in_ffd = flb_input(ctx, "dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + ret = flb_input_processor_unit(ctx, "logs", "opentelemetry_envelope", + in_ffd, NULL); + TEST_CHECK(ret == 0); + + ret = flb_input_get_processor(ctx, in_ffd, &proc); + TEST_CHECK(ret == 0); + TEST_CHECK(proc != NULL); + + /* Walk through logs processor units and verify one was added */ + mk_list_foreach(head, &proc->logs) { + pu = mk_list_entry(head, struct flb_processor_unit, _head); + if (strcmp(pu->name, "opentelemetry_envelope") == 0) { + found++; + break; + } + } + TEST_CHECK(found == 1); + + flb_destroy(ctx); +} + +static void output_processor() +{ + flb_ctx_t *ctx; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct flb_processor_instance *pi; + struct mk_list *head; + const char *val; + int ret; + int out_ffd; + int found = 0; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + out_ffd = flb_output(ctx, "stdout", NULL); + TEST_CHECK(out_ffd >= 0); + + ret = flb_output_processor_unit(ctx, "metrics", + "metrics_selector", out_ffd, + "metric_name", "/storage/", + "action", "includeNULL", NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_get_processor(ctx, out_ffd, &proc); + TEST_CHECK(ret == 0); + TEST_CHECK(proc != NULL); + + /* Walk through logs processor units and verify one was added */ + mk_list_foreach(head, &proc->metrics) { + pu = mk_list_entry(head, struct flb_processor_unit, _head); + if (strcmp(pu->name, "metrics_selector") == 0) { + found++; + + pi = (struct flb_processor_instance *) pu->ctx; + TEST_CHECK(pi != NULL); + + val = flb_processor_instance_get_property("metric_name", pi); + TEST_CHECK(val != NULL); + TEST_CHECK(strcmp(val, "/storage/") == 0); + + val = flb_processor_instance_get_property("action", pi); + TEST_CHECK(val != NULL); + TEST_CHECK(strcmp(val, "includeNULL") == 0); + + break; + } + } + TEST_CHECK(found == 1); + + flb_destroy(ctx); +} + TEST_LIST = { { "processor", processor }, + {"input_processor", input_processor}, + {"output_processor", output_processor}, { 0 } }; From 3fdd84a887fd1a01e4d99d9d84d24eba8720c463 Mon Sep 17 00:00:00 2001 From: Vanessa Zhang Date: Mon, 14 Apr 2025 17:22:54 -0700 Subject: [PATCH 4/4] lib: new apis to retrieve processor of a given input/output plugin This patch introduces new apis to retrieve processor associated with a given input or output plugin: int flb_input_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc) int flb_output_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc) Signed-off-by: Vanessa Zhang --- include/fluent-bit/flb_lib.h | 2 ++ src/flb_lib.c | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index ecb51f3dc24..b2409c489e0 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -56,11 +56,13 @@ FLB_EXPORT int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, ...); FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); +FLB_EXPORT int flb_input_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc); FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb); FLB_EXPORT int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, ...); FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); +FLB_EXPORT int flb_output_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc); FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data); FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...); FLB_EXPORT int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name, diff --git a/src/flb_lib.c b/src/flb_lib.c index 8e01f7c2749..6e8a1b20f51 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -414,6 +414,23 @@ int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) return 0; } +/* Retrieve the processor associated with a given input plugin. */ +int flb_input_get_processor(flb_ctx_t *ctx, int ffd, + struct flb_processor **proc) +{ + struct flb_input_instance *i_ins; + + i_ins = in_instance_get(ctx, ffd); + if (!i_ins) { + return -1; + } + if (!i_ins->processor) { + return -1; + } + *proc = i_ins->processor; + return 0; +} + int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void (*in_callback) (void *, int, int, void *, size_t, void *), void *in_callback_data) @@ -679,6 +696,23 @@ int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc return 0; } +/* Retrieve the processor associated with a given output plugin. */ +int flb_output_get_processor(flb_ctx_t *ctx, int ffd, + struct flb_processor **proc) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + if (!o_ins->processor) { + return -1; + } + *proc = o_ins->processor; + return 0; +} + int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)) {