Skip to content

lib: add new apis to support creating processor unit for input and output #9957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_lib.h
Original file line number Diff line number Diff line change
@@ -52,9 +52,17 @@ FLB_EXPORT void flb_init_env();
FLB_EXPORT flb_ctx_t *flb_create();
FLB_EXPORT void flb_destroy(flb_ctx_t *ctx);
FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data);
FLB_EXPORT int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type,
const char *processor_unit_name, int ffd,
...);
FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc);
FLB_EXPORT int flb_input_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc);
FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb);
FLB_EXPORT int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type,
const char *processor_unit_name, int ffd,
...);
FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc);
FLB_EXPORT int flb_output_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc);
FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data);
FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...);
FLB_EXPORT int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name,
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
@@ -229,7 +229,7 @@ int flb_processor_run(struct flb_processor *proc,

struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
int event_type,
char *unit_name);
const char *unit_name);
void flb_processor_unit_destroy(struct flb_processor_unit *pu);
int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v);

150 changes: 150 additions & 0 deletions src/flb_lib.c
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_downstream.h>
#include <fluent-bit/tls/flb_tls.h>
#include <fluent-bit/flb_processor.h>

#include <signal.h>
#include <stdarg.h>
@@ -329,6 +330,72 @@ int flb_input_set(flb_ctx_t *ctx, int ffd, ...)
return 0;
}

static int flb_processor_event_type_get(const char *event_type)
{
if (strcasecmp(event_type, "logs") == 0) {
return FLB_PROCESSOR_LOGS;
}
else if (strcasecmp(event_type, "metrics") == 0) {
return FLB_PROCESSOR_METRICS;
}
else if (strcasecmp(event_type, "traces") == 0) {
return FLB_PROCESSOR_TRACES;
}
else if (strcasecmp(event_type, "profiles") == 0) {
return FLB_PROCESSOR_PROFILES;
}
else {
return -1;
}
}

/* Create a single processor unit for the input processor */
int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type,
const char *processor_unit_name, int ffd, ...)
{
int ret;
int type;
struct flb_input_instance *i_ins;
struct flb_processor *proc;
struct flb_processor_unit *pu;
char *key;
char *value;
va_list va;

i_ins = in_instance_get(ctx, ffd);
if (!i_ins) {
return -1;
}
proc = i_ins->processor;
if (!proc) {
return -1;
}
type = flb_processor_event_type_get(event_type);
if (type == -1) {
return -1;
}
pu = flb_processor_unit_create(proc, type, processor_unit_name);
va_start(va, ffd);
while ((key = va_arg(va, char *))) {
value = va_arg(va, char *);
if (!value) {
/* Wrong parameter */
va_end(va);
return -1;
}
struct cfl_variant cfl_value = {
.type = CFL_VARIANT_STRING,
.data.as_string = value,
};
ret = flb_processor_unit_set_property(pu, key, &cfl_value);
if (ret != 0) {
va_end(va);
return -1;
}
}
return 0;
}

int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc)
{
struct flb_input_instance *i_ins;
@@ -347,6 +414,23 @@ int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc)
return 0;
}

/* Retrieve the processor associated with a given input plugin. */
int flb_input_get_processor(flb_ctx_t *ctx, int ffd,
struct flb_processor **proc)
{
struct flb_input_instance *i_ins;

i_ins = in_instance_get(ctx, ffd);
if (!i_ins) {
return -1;
}
if (!i_ins->processor) {
return -1;
}
*proc = i_ins->processor;
return 0;
}

int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name,
void (*in_callback) (void *, int, int, void *, size_t, void *),
void *in_callback_data)
@@ -545,6 +629,55 @@ int flb_output_set(flb_ctx_t *ctx, int ffd, ...)
return 0;
}

/* Create a single processor unit for the input processor */
int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type,
const char *processor_unit_name, int ffd, ...)
{
int ret;
int type;
struct flb_output_instance *o_ins;
struct flb_processor *proc;
struct flb_processor_unit *pu;
char *key;
char *value;
va_list va;

o_ins = out_instance_get(ctx, ffd);
if (!o_ins) {
return -1;
}
proc = o_ins->processor;
if (!proc) {
return -1;
}
type = flb_processor_event_type_get(event_type);
if (type == -1) {
return -1;
}
pu = flb_processor_unit_create(proc, type, processor_unit_name);
va_start(va, ffd);
while ((key = va_arg(va, char *))) {
value = va_arg(va, char *);
if (!value) {
/* Wrong parameter */
va_end(va);
return -1;
}

struct cfl_variant cfl_value = {
.type = CFL_VARIANT_STRING,
.data.as_string = value,
};

ret = flb_processor_unit_set_property(pu, key, &cfl_value);
if (ret != 0) {
va_end(va);
return -1;
}
}
return 0;
}

int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc)
{
struct flb_output_instance *o_ins;
@@ -563,6 +696,23 @@ int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc
return 0;
}

/* Retrieve the processor associated with a given output plugin. */
int flb_output_get_processor(flb_ctx_t *ctx, int ffd,
struct flb_processor **proc)
{
struct flb_output_instance *o_ins;

o_ins = out_instance_get(ctx, ffd);
if (!o_ins) {
return -1;
}
if (!o_ins->processor) {
return -1;
}
*proc = o_ins->processor;
return 0;
}

int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name,
void (*cb)(char *, void *, void *))
{
2 changes: 1 addition & 1 deletion src/flb_processor.c
Original file line number Diff line number Diff line change
@@ -161,7 +161,7 @@ struct flb_processor *flb_processor_create(struct flb_config *config,

struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
int event_type,
char *unit_name)
const char *unit_name)
{
int result;
struct mk_list *head;
92 changes: 92 additions & 0 deletions tests/internal/processor.c
Original file line number Diff line number Diff line change
@@ -119,7 +119,99 @@ static void processor()
flb_sds_destroy(hostname_prop_key);
}

static void input_processor()
{
flb_ctx_t *ctx;
struct flb_processor *proc;
struct flb_processor_unit *pu;
struct mk_list *head;
int ret;
int in_ffd;
int found = 0;

ctx = flb_create();
TEST_CHECK(ctx != NULL);

in_ffd = flb_input(ctx, "dummy", NULL);
TEST_CHECK(in_ffd >= 0);

ret = flb_input_processor_unit(ctx, "logs", "opentelemetry_envelope",
in_ffd, NULL);
TEST_CHECK(ret == 0);

ret = flb_input_get_processor(ctx, in_ffd, &proc);
TEST_CHECK(ret == 0);
TEST_CHECK(proc != NULL);

/* Walk through logs processor units and verify one was added */
mk_list_foreach(head, &proc->logs) {
pu = mk_list_entry(head, struct flb_processor_unit, _head);
if (strcmp(pu->name, "opentelemetry_envelope") == 0) {
found++;
break;
}
}
TEST_CHECK(found == 1);

flb_destroy(ctx);
}

static void output_processor()
{
flb_ctx_t *ctx;
struct flb_processor *proc;
struct flb_processor_unit *pu;
struct flb_processor_instance *pi;
struct mk_list *head;
const char *val;
int ret;
int out_ffd;
int found = 0;

ctx = flb_create();
TEST_CHECK(ctx != NULL);

out_ffd = flb_output(ctx, "stdout", NULL);
TEST_CHECK(out_ffd >= 0);

ret = flb_output_processor_unit(ctx, "metrics",
"metrics_selector", out_ffd,
"metric_name", "/storage/",
"action", "includeNULL", NULL);
TEST_CHECK(ret == 0);

ret = flb_output_get_processor(ctx, out_ffd, &proc);
TEST_CHECK(ret == 0);
TEST_CHECK(proc != NULL);

/* Walk through logs processor units and verify one was added */
mk_list_foreach(head, &proc->metrics) {
pu = mk_list_entry(head, struct flb_processor_unit, _head);
if (strcmp(pu->name, "metrics_selector") == 0) {
found++;

pi = (struct flb_processor_instance *) pu->ctx;
TEST_CHECK(pi != NULL);

val = flb_processor_instance_get_property("metric_name", pi);
TEST_CHECK(val != NULL);
TEST_CHECK(strcmp(val, "/storage/") == 0);

val = flb_processor_instance_get_property("action", pi);
TEST_CHECK(val != NULL);
TEST_CHECK(strcmp(val, "includeNULL") == 0);

break;
}
}
TEST_CHECK(found == 1);

flb_destroy(ctx);
}

TEST_LIST = {
{ "processor", processor },
{"input_processor", input_processor},
{"output_processor", output_processor},
{ 0 }
};