From c3e3edd485be77ccf92a9f935d3dbd8197ddaf9d Mon Sep 17 00:00:00 2001 From: Jeff Luo Date: Mon, 8 Jun 2020 11:21:03 -0400 Subject: [PATCH 01/12] out_stackdriver: Add new resource type: k8s_container Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 161 ++++++++++++++++++++- plugins/out_stackdriver/stackdriver.h | 7 + plugins/out_stackdriver/stackdriver_conf.c | 32 +++- 3 files changed, 198 insertions(+), 2 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 9d546e7fb75..cf129068737 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -277,6 +277,81 @@ static char *get_google_token(struct flb_stackdriver *ctx) return ctx->o->access_token; } +static int process_local_resource_id(const void *data, size_t bytes, + struct flb_stackdriver *ctx, char *type) +{ + int i; + int len; + int ret_code = -1; + int flag = 0; + size_t off = 0; + msgpack_object k; + msgpack_object v; + msgpack_object root; + msgpack_object_map map; + msgpack_unpacked result; + char *local_resource_id; + char *ptr; + const char delim[] = "."; + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS && !flag) { + root = result.data; + flag = 0; + + if (root.type != MSGPACK_OBJECT_ARRAY || + root.via.array.size != 2 || + root.via.array.ptr[1].type != MSGPACK_OBJECT_MAP) { + flb_plg_warn(ctx->ins, "unexpected record format"); + continue; + } + + map = root.via.array.ptr[1].via.map; + len = strlen(LOCAL_RESOURCE_ID_KEY); + for (i = 0; i < map.size; i++) { + k = map.ptr[i].key; + v = map.ptr[i].val; + + if (k.type == MSGPACK_OBJECT_STR && + strncmp(k.via.str.ptr, LOCAL_RESOURCE_ID_KEY, len) == 0) { + local_resource_id = malloc(v.via.str.size+1); + strncpy(local_resource_id, v.via.str.ptr, v.via.str.size); + local_resource_id[v.via.str.size] = '\0'; + + ptr = strtok(local_resource_id, delim); + /* Skip the prefix of tag */ + ptr = strtok(NULL, delim); + + if (strncmp(type, "k8s_container", strlen("k8s_container")) == 0) { + while (ptr != NULL) + { + /* Follow the order of fields in local_resource_id */ + if (!ctx->namespace_name) { + ctx->namespace_name = flb_sds_create(ptr); + } + else if (!ctx->pod_name) { + ctx->pod_name = flb_sds_create(ptr); + } + else if (!ctx->container_name) { + ctx->container_name = flb_sds_create(ptr); + } + + ptr = strtok(NULL, delim); + } + } + + ret_code = 0; + flag = 1; + free(local_resource_id); + break; + } + } + } + + msgpack_unpacked_destroy(&result); + return ret_code; +} + static int cb_stackdriver_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { @@ -453,7 +528,11 @@ static int stackdriver_format(struct flb_config *config, const void *data, size_t bytes, void **out_data, size_t *out_size) { + int i; int len; + int ret; + int map_size; + int new_map_size; int array_size = 0; size_t s; size_t off = 0; @@ -530,6 +609,57 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_str_body(&mp_pck, ctx->instance_id, flb_sds_len(ctx->instance_id)); } + else if (strcmp(ctx->resource, "k8s_container") == 0) { + /* k8s_container resource has fields project_id, location, cluster_name, + namespace_name, pod_name, container_name */ + + ret = process_local_resource_id(data, bytes, ctx, "k8s_container"); + if (ret != 0) { + flb_plg_error(ctx->ins, "can't fetch and process local_resource_id from log entry"); + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + flb_sds_destroy(ctx->container_name); + return -1; + } + + msgpack_pack_map(&mp_pck, 6); + + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "project_id", 10); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); + msgpack_pack_str_body(&mp_pck, + ctx->project_id, flb_sds_len(ctx->project_id)); + + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "location", 8); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); + msgpack_pack_str_body(&mp_pck, + ctx->cluster_location, flb_sds_len(ctx->cluster_location)); + + msgpack_pack_str(&mp_pck, 12); + msgpack_pack_str_body(&mp_pck, "cluster_name", 12); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); + msgpack_pack_str_body(&mp_pck, + ctx->cluster_name, flb_sds_len(ctx->cluster_name)); + + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "namespace_name", 14); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name)); + msgpack_pack_str_body(&mp_pck, + ctx->namespace_name, flb_sds_len(ctx->namespace_name)); + + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "pod_name", 8); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name)); + msgpack_pack_str_body(&mp_pck, + ctx->pod_name, flb_sds_len(ctx->pod_name)); + + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "container_name", 14); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name)); + msgpack_pack_str_body(&mp_pck, + ctx->container_name, flb_sds_len(ctx->container_name)); + } msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "entries", 7); @@ -538,6 +668,7 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_array(&mp_pck, array_size); off = 0; + len = strlen(LOCAL_RESOURCE_ID_KEY); msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { /* Get timestamp */ @@ -567,7 +698,35 @@ static int stackdriver_format(struct flb_config *config, /* jsonPayload */ msgpack_pack_str(&mp_pck, 11); msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); - msgpack_pack_object(&mp_pck, *obj); + + if (ctx->namespace_name && ctx->pod_name && ctx->container_name) { + map_size = obj->via.map.size; + new_map_size = map_size; + msgpack_object_kv *kv = NULL; + + for (i = 0; i < map_size; ++i) { + kv = &obj->via.map.ptr[i]; + if (strncmp(kv->key.via.str.ptr, LOCAL_RESOURCE_ID_KEY, len) == 0) { + new_map_size -= 1; + } + } + + msgpack_pack_map(&mp_pck, new_map_size); + + for (i = 0; i < map_size; ++i) { + kv = &obj->via.map.ptr[i]; + if (strncmp(kv->key.via.str.ptr, LOCAL_RESOURCE_ID_KEY, len) != 0) { + msgpack_pack_str(&mp_pck, kv->key.via.str.size); + msgpack_pack_str_body(&mp_pck, kv->key.via.str.ptr, kv->key.via.str.size); + msgpack_pack_object(&mp_pck, kv->val); + } + } + } + else { + msgpack_pack_object(&mp_pck, *obj); + } + + /* logName */ len = snprintf(path, sizeof(path) - 1, diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 8e76a7dfebf..5c39f0bde67 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -46,6 +46,8 @@ /* Default Resource type */ #define FLB_SDS_RESOURCE_TYPE "global" +#define LOCAL_RESOURCE_ID_KEY "logging.googleapis.com/local_resource_id" + struct flb_stackdriver { /* credentials */ flb_sds_t credentials_file; @@ -65,6 +67,11 @@ struct flb_stackdriver { flb_sds_t zone; flb_sds_t instance_id; flb_sds_t instance_name; + flb_sds_t cluster_name; + flb_sds_t cluster_location; + flb_sds_t namespace_name; + flb_sds_t pod_name; + flb_sds_t container_name; /* other */ flb_sds_t resource; diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index c9b54a36f2e..d04aa77f4e7 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -43,7 +43,8 @@ static inline int key_cmp(const char *str, int len, const char *cmp) { static int validate_resource(const char *res) { if (strcasecmp(res, "global") != 0 && - strcasecmp(res, "gce_instance") != 0) { + strcasecmp(res, "gce_instance") != 0 && + strcasecmp(res, "k8s_container") != 0) { return -1; } @@ -286,6 +287,26 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * ctx->severity_key = flb_sds_create(tmp); } + tmp = flb_output_get_property("k8s_cluster_name", ins); + if (tmp) { + ctx->cluster_name = flb_sds_create(tmp); + } + + tmp = flb_output_get_property("k8s_cluster_location", ins); + if (tmp) { + ctx->cluster_location = flb_sds_create(tmp); + } + + if (flb_sds_cmp(ctx->resource, "k8s_container", + flb_sds_len(ctx->resource)) == 0) { + if (!ctx->cluster_name || !ctx->cluster_location) { + flb_plg_error(ctx->ins, "Missing k8s_cluster_name " + "or k8s_cluster_location on configuration"); + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + } + return ctx; } @@ -295,6 +316,15 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) return -1; } + if (flb_sds_cmp(ctx->resource, "k8s_container", + flb_sds_len(ctx->resource)) == 0) { + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + flb_sds_destroy(ctx->container_name); + flb_sds_destroy(ctx->cluster_name); + flb_sds_destroy(ctx->cluster_location); + } + flb_sds_destroy(ctx->credentials_file); flb_sds_destroy(ctx->type); flb_sds_destroy(ctx->project_id); From 738d73210a12d840a9ff4f8e51b6be1d45c43781 Mon Sep 17 00:00:00 2001 From: "jeffluoo@google.com" Date: Mon, 8 Jun 2020 17:07:16 -0400 Subject: [PATCH 02/12] out_stackdriver: Add tests Signed-off-by: Jeff Luo --- tests/runtime/CMakeLists.txt | 2 + .../k8s_container_test_cases.h | 39 +++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 tests/runtime/data/stackdriver_test/k8s_container_test_cases.h diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 429867b00dc..d2218d49477 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -57,6 +57,8 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_STDOUT "out_stdout.c") FLB_RT_TEST(FLB_OUT_STACKDRIVER "out_stackdriver.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") + FLB_RT_TEST(FLB_OUT_STACKDRIVER "out_stackdriver.c") + endif() diff --git a/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h b/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h new file mode 100644 index 00000000000..b4eee7d8506 --- /dev/null +++ b/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h @@ -0,0 +1,39 @@ +#define K8S_CONTAINER_VALID_LOCAL_RESOURCE_ID "[" \ + "1591649196," \ + "{" \ + "\"message\": \"K8S_CONTAINER_VALID_LOCAL_RESOURCE_ID\"," \ + "\"logging.googleapis.com/local_resource_id\": \"k8s_container.testnamespace.testpodname.testcontainername\"," \ + "\"key_0\": false," \ + "\"key_1\": true," \ + "\"key_2\": \"some string\"," \ + "\"key_3\": 0.12345678," \ + "\"key_4\": 5000," \ + "\"END_KEY\": \"JSON_END\"" \ + "}]" + +#define K8S_CONTAINER_INVALID_JSON "[" \ + "1591649196," \ + "{{{{" \ + "\"message\": \"K8S_CONTAINER_VALID_LOCAL_RESOURCE_ID\"," \ + "\"logging.googleapis.com/local_resource_id\": \"k8s_container.testnamespace.testpodname.testcontainername\"," \ + "\"key_0\": false," \ + "\"key_1\": true," \ + "\"key_2\": \"some string\"," \ + "\"key_3\": 0.12345678," \ + "\"key_4\": 5000," \ + "\"END_KEY\": \"JSON_END\"" \ + "}]" + +#define K8S_CONTAINER_MISSING_PARTIAL_LOCAL_RESROUCE_ID "[" \ + "1591649196," \ + "{" \ + "\"message\": \"K8S_CONTAINER_MISSING_PARTIAL_LOCAL_RESROUCE_ID\"," \ + "\"logging.googleapis.com/local_resource_id\": \"k8s_container.testnamespace.testpodname\"," \ + "\"key_0\": false," \ + "\"key_1\": true," \ + "\"key_2\": \"some string\"," \ + "\"key_3\": 0.12345678," \ + "\"key_4\": 5000," \ + "\"END_KEY\": \"JSON_END\"" \ + "}]" + From c6fe98ec1750031055c0070196b78071188bae96 Mon Sep 17 00:00:00 2001 From: "jeffluoo@google.com" Date: Mon, 8 Jun 2020 17:07:58 -0400 Subject: [PATCH 03/12] out_stackdriver: Modify if confition to avoid memory leak Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index cf129068737..de4704b4d47 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -318,6 +318,7 @@ static int process_local_resource_id(const void *data, size_t bytes, strncpy(local_resource_id, v.via.str.ptr, v.via.str.size); local_resource_id[v.via.str.size] = '\0'; + ptr = strtok(local_resource_id, delim); /* Skip the prefix of tag */ ptr = strtok(NULL, delim); @@ -614,8 +615,13 @@ static int stackdriver_format(struct flb_config *config, namespace_name, pod_name, container_name */ ret = process_local_resource_id(data, bytes, ctx, "k8s_container"); - if (ret != 0) { - flb_plg_error(ctx->ins, "can't fetch and process local_resource_id from log entry"); + if (ret != 0 || + !ctx->namespace_name || + !ctx->pod_name || + !ctx->container_name) { + + flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); + msgpack_sbuffer_destroy(&mp_sbuf); flb_sds_destroy(ctx->namespace_name); flb_sds_destroy(ctx->pod_name); flb_sds_destroy(ctx->container_name); From 3b213c55c65fd59b98c173932151c26270491474 Mon Sep 17 00:00:00 2001 From: "jeffluoo@google.com" Date: Wed, 10 Jun 2020 10:07:44 -0400 Subject: [PATCH 04/12] out_stackdriver: Add new resource type: k8s_node Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 47 +++++++++++++++++++++- plugins/out_stackdriver/stackdriver.h | 2 + plugins/out_stackdriver/stackdriver_conf.c | 14 +++++-- 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index de4704b4d47..ccbeca172fd 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -340,6 +340,10 @@ static int process_local_resource_id(const void *data, size_t bytes, ptr = strtok(NULL, delim); } } + else if (strncmp(type, "k8s_node", strlen("k8s_node")) == 0 && + !ctx->node_name) { + ctx->node_name = flb_sds_create(ptr); + } ret_code = 0; flag = 1; @@ -666,6 +670,45 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_str_body(&mp_pck, ctx->container_name, flb_sds_len(ctx->container_name)); } + else if (strcmp(ctx->resource, "k8s_node") == 0) { + /* k8s_container resource has fields project_id, location, cluster_name, node_name */ + + ret = process_local_resource_id(data, bytes, ctx, "k8s_node"); + if (ret != 0 || + !ctx->node_name) { + + flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(ctx->node_name); + return -1; + } + + msgpack_pack_map(&mp_pck, 4); + + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "project_id", 10); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); + msgpack_pack_str_body(&mp_pck, + ctx->project_id, flb_sds_len(ctx->project_id)); + + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "location", 8); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); + msgpack_pack_str_body(&mp_pck, + ctx->cluster_location, flb_sds_len(ctx->cluster_location)); + + msgpack_pack_str(&mp_pck, 12); + msgpack_pack_str_body(&mp_pck, "cluster_name", 12); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); + msgpack_pack_str_body(&mp_pck, + ctx->cluster_name, flb_sds_len(ctx->cluster_name)); + + msgpack_pack_str(&mp_pck, 9); + msgpack_pack_str_body(&mp_pck, "node_name", 9); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name)); + msgpack_pack_str_body(&mp_pck, + ctx->node_name, flb_sds_len(ctx->node_name)); + } msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "entries", 7); @@ -705,7 +748,9 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_str(&mp_pck, 11); msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); - if (ctx->namespace_name && ctx->pod_name && ctx->container_name) { + /* if resource type is k8s_container / k8s_node */ + if ((ctx->namespace_name && ctx->pod_name && ctx->container_name) || + ctx->node_name) { map_size = obj->via.map.size; new_map_size = map_size; msgpack_object_kv *kv = NULL; diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 5c39f0bde67..d960d861372 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -72,6 +72,8 @@ struct flb_stackdriver { flb_sds_t namespace_name; flb_sds_t pod_name; flb_sds_t container_name; + flb_sds_t node_name; + /* other */ flb_sds_t resource; diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index d04aa77f4e7..a89417cedef 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -44,7 +44,8 @@ static int validate_resource(const char *res) { if (strcasecmp(res, "global") != 0 && strcasecmp(res, "gce_instance") != 0 && - strcasecmp(res, "k8s_container") != 0) { + strcasecmp(res, "k8s_container") != 0 && + strcasecmp(res, "k8s_node")) { return -1; } @@ -298,6 +299,8 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * } if (flb_sds_cmp(ctx->resource, "k8s_container", + flb_sds_len(ctx->resource)) == 0 || + flb_sds_cmp(ctx->resource, "k8s_node", flb_sds_len(ctx->resource)) == 0) { if (!ctx->cluster_name || !ctx->cluster_location) { flb_plg_error(ctx->ins, "Missing k8s_cluster_name " @@ -321,8 +324,11 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) flb_sds_destroy(ctx->namespace_name); flb_sds_destroy(ctx->pod_name); flb_sds_destroy(ctx->container_name); - flb_sds_destroy(ctx->cluster_name); - flb_sds_destroy(ctx->cluster_location); + } + + if (flb_sds_cmp(ctx->resource, "k8s_node", + flb_sds_len(ctx->resource)) == 0) { + flb_sds_destroy(ctx->node_name); } flb_sds_destroy(ctx->credentials_file); @@ -336,6 +342,8 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) flb_sds_destroy(ctx->token_uri); flb_sds_destroy(ctx->resource); flb_sds_destroy(ctx->severity_key); + flb_sds_destroy(ctx->cluster_name); + flb_sds_destroy(ctx->cluster_location); if (ctx->o) { flb_oauth2_destroy(ctx->o); From a471008fc246c13239e9d2538eace51af011f6dd Mon Sep 17 00:00:00 2001 From: "jeffluoo@google.com" Date: Fri, 12 Jun 2020 10:14:30 -0400 Subject: [PATCH 05/12] out_stackdriver: Add k8s_pod to supported resource type Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 101 ++++++++++++++++++--- plugins/out_stackdriver/stackdriver.h | 1 + plugins/out_stackdriver/stackdriver_conf.c | 19 ++-- 3 files changed, 98 insertions(+), 23 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index ccbeca172fd..ce287ef0fdc 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -314,6 +314,8 @@ static int process_local_resource_id(const void *data, size_t bytes, if (k.type == MSGPACK_OBJECT_STR && strncmp(k.via.str.ptr, LOCAL_RESOURCE_ID_KEY, len) == 0) { + + ret_code = 0; local_resource_id = malloc(v.via.str.size+1); strncpy(local_resource_id, v.via.str.ptr, v.via.str.size); local_resource_id[v.via.str.size] = '\0'; @@ -339,13 +341,39 @@ static int process_local_resource_id(const void *data, size_t bytes, ptr = strtok(NULL, delim); } + + if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) { + ret_code = -1; + } } - else if (strncmp(type, "k8s_node", strlen("k8s_node")) == 0 && - !ctx->node_name) { - ctx->node_name = flb_sds_create(ptr); + else if (strncmp(type, "k8s_node", strlen("k8s_node")) == 0) { + if (ptr != NULL) { + ctx->node_name = flb_sds_create(ptr); + } + + if (!ctx->node_name) { + ret_code = -1; + } + } + else if (strncmp(type, "k8s_pod", strlen("k8s_pod")) == 0) { + while (ptr != NULL) + { + /* Follow the order of fields in local_resource_id */ + if (!ctx->namespace_name) { + ctx->namespace_name = flb_sds_create(ptr); + } + else if (!ctx->pod_name) { + ctx->pod_name = flb_sds_create(ptr); + } + + ptr = strtok(NULL, delim); + } + + if (!ctx->namespace_name || !ctx->pod_name) { + ret_code = -1; + } } - ret_code = 0; flag = 1; free(local_resource_id); break; @@ -539,6 +567,7 @@ static int stackdriver_format(struct flb_config *config, int map_size; int new_map_size; int array_size = 0; + int k8s_resource_type = 0; size_t s; size_t off = 0; char path[PATH_MAX]; @@ -619,10 +648,7 @@ static int stackdriver_format(struct flb_config *config, namespace_name, pod_name, container_name */ ret = process_local_resource_id(data, bytes, ctx, "k8s_container"); - if (ret != 0 || - !ctx->namespace_name || - !ctx->pod_name || - !ctx->container_name) { + if (ret != 0) { flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); msgpack_sbuffer_destroy(&mp_sbuf); @@ -669,14 +695,14 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name)); msgpack_pack_str_body(&mp_pck, ctx->container_name, flb_sds_len(ctx->container_name)); + + k8s_resource_type = 1; } else if (strcmp(ctx->resource, "k8s_node") == 0) { /* k8s_container resource has fields project_id, location, cluster_name, node_name */ ret = process_local_resource_id(data, bytes, ctx, "k8s_node"); - if (ret != 0 || - !ctx->node_name) { - + if (ret != 0) { flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); msgpack_sbuffer_destroy(&mp_sbuf); flb_sds_destroy(ctx->node_name); @@ -708,6 +734,55 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name)); msgpack_pack_str_body(&mp_pck, ctx->node_name, flb_sds_len(ctx->node_name)); + + k8s_resource_type = 1; + } + else if (strcmp(ctx->resource, "k8s_pod") == 0) { + /* k8s_pod resource has fields project_id, location, cluster_name, + namespace_name, pod_name */ + + ret = process_local_resource_id(data, bytes, ctx, "k8s_pod"); + if (ret != 0) { + flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + return -1; + } + + msgpack_pack_map(&mp_pck, 5); + + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "project_id", 10); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); + msgpack_pack_str_body(&mp_pck, + ctx->project_id, flb_sds_len(ctx->project_id)); + + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "location", 8); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); + msgpack_pack_str_body(&mp_pck, + ctx->cluster_location, flb_sds_len(ctx->cluster_location)); + + msgpack_pack_str(&mp_pck, 12); + msgpack_pack_str_body(&mp_pck, "cluster_name", 12); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); + msgpack_pack_str_body(&mp_pck, + ctx->cluster_name, flb_sds_len(ctx->cluster_name)); + + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "namespace_name", 14); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name)); + msgpack_pack_str_body(&mp_pck, + ctx->namespace_name, flb_sds_len(ctx->namespace_name)); + + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "pod_name", 8); + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name)); + msgpack_pack_str_body(&mp_pck, + ctx->pod_name, flb_sds_len(ctx->pod_name)); + + k8s_resource_type = 1; } msgpack_pack_str(&mp_pck, 7); @@ -748,9 +823,7 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_str(&mp_pck, 11); msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); - /* if resource type is k8s_container / k8s_node */ - if ((ctx->namespace_name && ctx->pod_name && ctx->container_name) || - ctx->node_name) { + if (k8s_resource_type) { map_size = obj->via.map.size; new_map_size = map_size; msgpack_object_kv *kv = NULL; diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index d960d861372..fc9936e1bf0 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -73,6 +73,7 @@ struct flb_stackdriver { flb_sds_t pod_name; flb_sds_t container_name; flb_sds_t node_name; + bool k8s_resource_type; /* other */ diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index a89417cedef..7c783fdf7c4 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -45,7 +45,8 @@ static int validate_resource(const char *res) if (strcasecmp(res, "global") != 0 && strcasecmp(res, "gce_instance") != 0 && strcasecmp(res, "k8s_container") != 0 && - strcasecmp(res, "k8s_node")) { + strcasecmp(res, "k8s_node") && + strcasecmp(res, "k8s_pod")) { return -1; } @@ -301,7 +302,11 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * if (flb_sds_cmp(ctx->resource, "k8s_container", flb_sds_len(ctx->resource)) == 0 || flb_sds_cmp(ctx->resource, "k8s_node", + flb_sds_len(ctx->resource)) == 0 || + flb_sds_cmp(ctx->resource, "k8s_pod", flb_sds_len(ctx->resource)) == 0) { + + ctx->k8s_resource_type = true; if (!ctx->cluster_name || !ctx->cluster_location) { flb_plg_error(ctx->ins, "Missing k8s_cluster_name " "or k8s_cluster_location on configuration"); @@ -319,16 +324,13 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) return -1; } - if (flb_sds_cmp(ctx->resource, "k8s_container", - flb_sds_len(ctx->resource)) == 0) { + if (ctx->k8s_resource_type){ flb_sds_destroy(ctx->namespace_name); flb_sds_destroy(ctx->pod_name); flb_sds_destroy(ctx->container_name); - } - - if (flb_sds_cmp(ctx->resource, "k8s_node", - flb_sds_len(ctx->resource)) == 0) { flb_sds_destroy(ctx->node_name); + flb_sds_destroy(ctx->cluster_name); + flb_sds_destroy(ctx->cluster_location); } flb_sds_destroy(ctx->credentials_file); @@ -342,8 +344,7 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) flb_sds_destroy(ctx->token_uri); flb_sds_destroy(ctx->resource); flb_sds_destroy(ctx->severity_key); - flb_sds_destroy(ctx->cluster_name); - flb_sds_destroy(ctx->cluster_location); + if (ctx->o) { flb_oauth2_destroy(ctx->o); From 1d5e074280818507237762a50771d3f0f401485b Mon Sep 17 00:00:00 2001 From: "jeffluoo@google.com" Date: Fri, 12 Jun 2020 10:17:49 -0400 Subject: [PATCH 06/12] out_stackdriver: Add test cases for k8s_pod, Remove invalid test cases Signed-off-by: Jeff Luo --- .../k8s_container_test_cases.h | 12 --------- .../stackdriver_test/k8s_pod_test_cases.h | 26 +++++++++++++++++++ 2 files changed, 26 insertions(+), 12 deletions(-) create mode 100644 tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h diff --git a/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h b/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h index b4eee7d8506..1cd53c08913 100644 --- a/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h +++ b/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h @@ -24,16 +24,4 @@ "\"END_KEY\": \"JSON_END\"" \ "}]" -#define K8S_CONTAINER_MISSING_PARTIAL_LOCAL_RESROUCE_ID "[" \ - "1591649196," \ - "{" \ - "\"message\": \"K8S_CONTAINER_MISSING_PARTIAL_LOCAL_RESROUCE_ID\"," \ - "\"logging.googleapis.com/local_resource_id\": \"k8s_container.testnamespace.testpodname\"," \ - "\"key_0\": false," \ - "\"key_1\": true," \ - "\"key_2\": \"some string\"," \ - "\"key_3\": 0.12345678," \ - "\"key_4\": 5000," \ - "\"END_KEY\": \"JSON_END\"" \ - "}]" diff --git a/tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h b/tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h new file mode 100644 index 00000000000..297ee240c64 --- /dev/null +++ b/tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h @@ -0,0 +1,26 @@ +#define K8S_POD_VALID_LOCAL_RESOURCE_ID "[" \ + "1591968280," \ + "{" \ + "\"message\": \"K8S_POD_VALID_LOCAL_RESOURCE_ID\"," \ + "\"logging.googleapis.com/local_resource_id\": \"k8s_pod.testnamespace.testpodname\"," \ + "\"key_0\": false," \ + "\"key_1\": true," \ + "\"key_2\": \"some string\"," \ + "\"key_3\": 0.12345678," \ + "\"key_4\": 5000," \ + "\"END_KEY\": \"JSON_END\"" \ + "}]" + +#define K8S_POD_INVALID_JSON "[" \ + "1591968280," \ + "{{{{" \ + "\"message\": \"K8S_POD_VALID_LOCAL_RESOURCE_ID\"," \ + "\"logging.googleapis.com/local_resource_id\": \"k8s_pod.testnamespace.testpodname\"," \ + "\"key_0\": false," \ + "\"key_1\": true," \ + "\"key_2\": \"some string\"," \ + "\"key_3\": 0.12345678," \ + "\"key_4\": 5000," \ + "\"END_KEY\": \"JSON_END\"" \ + "}]" + From 7b504eb458c9745f6cb0c7ea6efa7d42efe862a6 Mon Sep 17 00:00:00 2001 From: "jeffluoo@google.com" Date: Mon, 15 Jun 2020 10:36:02 -0400 Subject: [PATCH 07/12] out_stackdriver: Remove test cases Signed-off-by: Jeff Luo --- tests/runtime/CMakeLists.txt | 1 - .../k8s_container_test_cases.h | 27 ------------------- .../stackdriver_test/k8s_pod_test_cases.h | 26 ------------------ 3 files changed, 54 deletions(-) delete mode 100644 tests/runtime/data/stackdriver_test/k8s_container_test_cases.h delete mode 100644 tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index d2218d49477..6232a19cb69 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -57,7 +57,6 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_STDOUT "out_stdout.c") FLB_RT_TEST(FLB_OUT_STACKDRIVER "out_stackdriver.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") - FLB_RT_TEST(FLB_OUT_STACKDRIVER "out_stackdriver.c") endif() diff --git a/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h b/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h deleted file mode 100644 index 1cd53c08913..00000000000 --- a/tests/runtime/data/stackdriver_test/k8s_container_test_cases.h +++ /dev/null @@ -1,27 +0,0 @@ -#define K8S_CONTAINER_VALID_LOCAL_RESOURCE_ID "[" \ - "1591649196," \ - "{" \ - "\"message\": \"K8S_CONTAINER_VALID_LOCAL_RESOURCE_ID\"," \ - "\"logging.googleapis.com/local_resource_id\": \"k8s_container.testnamespace.testpodname.testcontainername\"," \ - "\"key_0\": false," \ - "\"key_1\": true," \ - "\"key_2\": \"some string\"," \ - "\"key_3\": 0.12345678," \ - "\"key_4\": 5000," \ - "\"END_KEY\": \"JSON_END\"" \ - "}]" - -#define K8S_CONTAINER_INVALID_JSON "[" \ - "1591649196," \ - "{{{{" \ - "\"message\": \"K8S_CONTAINER_VALID_LOCAL_RESOURCE_ID\"," \ - "\"logging.googleapis.com/local_resource_id\": \"k8s_container.testnamespace.testpodname.testcontainername\"," \ - "\"key_0\": false," \ - "\"key_1\": true," \ - "\"key_2\": \"some string\"," \ - "\"key_3\": 0.12345678," \ - "\"key_4\": 5000," \ - "\"END_KEY\": \"JSON_END\"" \ - "}]" - - diff --git a/tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h b/tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h deleted file mode 100644 index 297ee240c64..00000000000 --- a/tests/runtime/data/stackdriver_test/k8s_pod_test_cases.h +++ /dev/null @@ -1,26 +0,0 @@ -#define K8S_POD_VALID_LOCAL_RESOURCE_ID "[" \ - "1591968280," \ - "{" \ - "\"message\": \"K8S_POD_VALID_LOCAL_RESOURCE_ID\"," \ - "\"logging.googleapis.com/local_resource_id\": \"k8s_pod.testnamespace.testpodname\"," \ - "\"key_0\": false," \ - "\"key_1\": true," \ - "\"key_2\": \"some string\"," \ - "\"key_3\": 0.12345678," \ - "\"key_4\": 5000," \ - "\"END_KEY\": \"JSON_END\"" \ - "}]" - -#define K8S_POD_INVALID_JSON "[" \ - "1591968280," \ - "{{{{" \ - "\"message\": \"K8S_POD_VALID_LOCAL_RESOURCE_ID\"," \ - "\"logging.googleapis.com/local_resource_id\": \"k8s_pod.testnamespace.testpodname\"," \ - "\"key_0\": false," \ - "\"key_1\": true," \ - "\"key_2\": \"some string\"," \ - "\"key_3\": 0.12345678," \ - "\"key_4\": 5000," \ - "\"END_KEY\": \"JSON_END\"" \ - "}]" - From 36832c7cb7898578933893e20e51080194ecb40f Mon Sep 17 00:00:00 2001 From: Jeff Luo Date: Mon, 15 Jun 2020 22:14:44 -0400 Subject: [PATCH 08/12] out_stackdriver: Resolve comments on Pull Request Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 83 ++++++++++++++-------- plugins/out_stackdriver/stackdriver.h | 3 +- plugins/out_stackdriver/stackdriver_conf.c | 2 +- 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index ce287ef0fdc..6cad730ba28 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -282,7 +282,7 @@ static int process_local_resource_id(const void *data, size_t bytes, { int i; int len; - int ret_code = -1; + int ret = 0; int flag = 0; size_t off = 0; msgpack_object k; @@ -314,18 +314,22 @@ static int process_local_resource_id(const void *data, size_t bytes, if (k.type == MSGPACK_OBJECT_STR && strncmp(k.via.str.ptr, LOCAL_RESOURCE_ID_KEY, len) == 0) { - - ret_code = 0; + local_resource_id = malloc(v.via.str.size+1); strncpy(local_resource_id, v.via.str.ptr, v.via.str.size); local_resource_id[v.via.str.size] = '\0'; - ptr = strtok(local_resource_id, delim); - /* Skip the prefix of tag */ - ptr = strtok(NULL, delim); if (strncmp(type, "k8s_container", strlen("k8s_container")) == 0) { + /* check the prefix */ + if (strncasecmp(ptr, type, strlen("k8s_container")) != 0) { + ret = -1; + } + + ptr = strtok(NULL, delim); + /* the local_resource_id for k8s_container is in format: + k8s_container... */ while (ptr != NULL) { /* Follow the order of fields in local_resource_id */ @@ -342,20 +346,43 @@ static int process_local_resource_id(const void *data, size_t bytes, ptr = strtok(NULL, delim); } - if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) { - ret_code = -1; + if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name || + ret != 0) { + free(local_resource_id); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + flb_sds_destroy(ctx->container_name); + return -1; } } else if (strncmp(type, "k8s_node", strlen("k8s_node")) == 0) { + if (strncasecmp(ptr, type, strlen("k8s_node")) != 0) { + ret = -1; + } + + ptr = strtok(NULL, delim); + /* the local_resource_id for k8s_node is in format: + k8s_node. */ if (ptr != NULL) { ctx->node_name = flb_sds_create(ptr); } - if (!ctx->node_name) { - ret_code = -1; + if (!ctx->node_name || ret != 0) { + free(local_resource_id); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(ctx->node_name); + return -1; } } else if (strncmp(type, "k8s_pod", strlen("k8s_pod")) == 0) { + if (strncasecmp(ptr, type, strlen("k8s_pod")) != 0) { + ret = -1; + } + + ptr = strtok(NULL, delim); + /* the local_resource_id for k8s_pod is in format: + k8s_pod.. */ while (ptr != NULL) { /* Follow the order of fields in local_resource_id */ @@ -369,8 +396,12 @@ static int process_local_resource_id(const void *data, size_t bytes, ptr = strtok(NULL, delim); } - if (!ctx->namespace_name || !ctx->pod_name) { - ret_code = -1; + if (!ctx->namespace_name || !ctx->pod_name || ret != 0) { + free(local_resource_id); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + return -1; } } @@ -382,7 +413,7 @@ static int process_local_resource_id(const void *data, size_t bytes, } msgpack_unpacked_destroy(&result); - return ret_code; + return ret; } static int cb_stackdriver_init(struct flb_output_instance *ins, @@ -644,17 +675,14 @@ static int stackdriver_format(struct flb_config *config, ctx->instance_id, flb_sds_len(ctx->instance_id)); } else if (strcmp(ctx->resource, "k8s_container") == 0) { - /* k8s_container resource has fields project_id, location, cluster_name, - namespace_name, pod_name, container_name */ + /* k8s_container resource has fields project_id, location, cluster_name, + namespace_name, pod_name, container_name */ ret = process_local_resource_id(data, bytes, ctx, "k8s_container"); if (ret != 0) { - - flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); + flb_plg_error(ctx->ins, "fail to process local_resource_id from " + "log entry for %s", "k8s_container"); msgpack_sbuffer_destroy(&mp_sbuf); - flb_sds_destroy(ctx->namespace_name); - flb_sds_destroy(ctx->pod_name); - flb_sds_destroy(ctx->container_name); return -1; } @@ -699,13 +727,13 @@ static int stackdriver_format(struct flb_config *config, k8s_resource_type = 1; } else if (strcmp(ctx->resource, "k8s_node") == 0) { - /* k8s_container resource has fields project_id, location, cluster_name, node_name */ + /* k8s_node resource has fields project_id, location, cluster_name, node_name */ ret = process_local_resource_id(data, bytes, ctx, "k8s_node"); if (ret != 0) { - flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); + flb_plg_error(ctx->ins, "fail to process local_resource_id from " + "log entry for %s", "k8s_node"); msgpack_sbuffer_destroy(&mp_sbuf); - flb_sds_destroy(ctx->node_name); return -1; } @@ -738,15 +766,14 @@ static int stackdriver_format(struct flb_config *config, k8s_resource_type = 1; } else if (strcmp(ctx->resource, "k8s_pod") == 0) { - /* k8s_pod resource has fields project_id, location, cluster_name, - namespace_name, pod_name */ + /* k8s_pod resource has fields project_id, location, cluster_name, + namespace_name, pod_name */ ret = process_local_resource_id(data, bytes, ctx, "k8s_pod"); if (ret != 0) { - flb_plg_error(ctx->ins, "Fail to process local_resource_id from log entry"); + flb_plg_error(ctx->ins, "fail to process local_resource_id from " + "log entry for %s", "k8s_pod"); msgpack_sbuffer_destroy(&mp_sbuf); - flb_sds_destroy(ctx->namespace_name); - flb_sds_destroy(ctx->pod_name); return -1; } diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index fc9936e1bf0..212d1ecca38 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -67,6 +67,8 @@ struct flb_stackdriver { flb_sds_t zone; flb_sds_t instance_id; flb_sds_t instance_name; + + /* kubernetes specific */ flb_sds_t cluster_name; flb_sds_t cluster_location; flb_sds_t namespace_name; @@ -75,7 +77,6 @@ struct flb_stackdriver { flb_sds_t node_name; bool k8s_resource_type; - /* other */ flb_sds_t resource; flb_sds_t severity_key; diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index 7c783fdf7c4..3dd49d48dbe 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -309,7 +309,7 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * ctx->k8s_resource_type = true; if (!ctx->cluster_name || !ctx->cluster_location) { flb_plg_error(ctx->ins, "Missing k8s_cluster_name " - "or k8s_cluster_location on configuration"); + "or k8s_cluster_location in configuration"); flb_stackdriver_conf_destroy(ctx); return NULL; } From cfd7cab3522984fd98f55713a8cd183c239c3cd8 Mon Sep 17 00:00:00 2001 From: Jeff Luo Date: Thu, 18 Jun 2020 11:11:47 -0400 Subject: [PATCH 09/12] out_stackdriver: update comments and remove placeholders Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 6cad730ba28..824d625fa60 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -328,8 +328,6 @@ static int process_local_resource_id(const void *data, size_t bytes, } ptr = strtok(NULL, delim); - /* the local_resource_id for k8s_container is in format: - k8s_container... */ while (ptr != NULL) { /* Follow the order of fields in local_resource_id */ @@ -362,8 +360,6 @@ static int process_local_resource_id(const void *data, size_t bytes, } ptr = strtok(NULL, delim); - /* the local_resource_id for k8s_node is in format: - k8s_node. */ if (ptr != NULL) { ctx->node_name = flb_sds_create(ptr); } @@ -381,8 +377,6 @@ static int process_local_resource_id(const void *data, size_t bytes, } ptr = strtok(NULL, delim); - /* the local_resource_id for k8s_pod is in format: - k8s_pod.. */ while (ptr != NULL) { /* Follow the order of fields in local_resource_id */ @@ -678,10 +672,12 @@ static int stackdriver_format(struct flb_config *config, /* k8s_container resource has fields project_id, location, cluster_name, namespace_name, pod_name, container_name */ + /* the local_resource_id for k8s_container is in format: + k8s_container... */ ret = process_local_resource_id(data, bytes, ctx, "k8s_container"); if (ret != 0) { flb_plg_error(ctx->ins, "fail to process local_resource_id from " - "log entry for %s", "k8s_container"); + "log entry for k8s_container"); msgpack_sbuffer_destroy(&mp_sbuf); return -1; } @@ -729,10 +725,12 @@ static int stackdriver_format(struct flb_config *config, else if (strcmp(ctx->resource, "k8s_node") == 0) { /* k8s_node resource has fields project_id, location, cluster_name, node_name */ + /* the local_resource_id for k8s_node is in format: + k8s_node. */ ret = process_local_resource_id(data, bytes, ctx, "k8s_node"); if (ret != 0) { flb_plg_error(ctx->ins, "fail to process local_resource_id from " - "log entry for %s", "k8s_node"); + "log entry for k8s_node"); msgpack_sbuffer_destroy(&mp_sbuf); return -1; } @@ -769,10 +767,12 @@ static int stackdriver_format(struct flb_config *config, /* k8s_pod resource has fields project_id, location, cluster_name, namespace_name, pod_name */ + /* the local_resource_id for k8s_pod is in format: + k8s_pod.. */ ret = process_local_resource_id(data, bytes, ctx, "k8s_pod"); if (ret != 0) { flb_plg_error(ctx->ins, "fail to process local_resource_id from " - "log entry for %s", "k8s_pod"); + "log entry for k8s_pod"); msgpack_sbuffer_destroy(&mp_sbuf); return -1; } From e993e9e758cc31014cfe699237ca1585f52ea513 Mon Sep 17 00:00:00 2001 From: Jeff Luo Date: Fri, 19 Jun 2020 16:56:07 -0400 Subject: [PATCH 10/12] out_stackdriver: Resolve comments from PR and clean up code Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 237 ++++++++++++--------- plugins/out_stackdriver/stackdriver_conf.c | 33 +-- tests/runtime/CMakeLists.txt | 1 - 3 files changed, 154 insertions(+), 117 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 824d625fa60..fd0d3227fc9 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -277,16 +277,54 @@ static char *get_google_token(struct flb_stackdriver *ctx) return ctx->o->access_token; } -static int process_local_resource_id(const void *data, size_t bytes, - struct flb_stackdriver *ctx, char *type) +static bool validate_msgpack_unpacked_data(msgpack_object root) +{ + return root.type == MSGPACK_OBJECT_ARRAY && + root.via.array.size == 2 && + root.via.array.ptr[1].type == MSGPACK_OBJECT_MAP; +} + +static char *get_str_value_from_msgpack_map(msgpack_object_map map, + const char *key) { int i; - int len; - int ret = 0; - int flag = 0; - size_t off = 0; + int key_size; msgpack_object k; msgpack_object v; + char *ptr = NULL; + + key_size = strlen(key); + for (i = 0; i < map.size; i++) { + k = map.ptr[i].key; + v = map.ptr[i].val; + + if (k.type != MSGPACK_OBJECT_STR) { + continue; + } + + if (k.via.str.size == key_size && + strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) { + /* make sure to free it after use */ + ptr = flb_strndup(v.via.str.ptr, v.via.str.size); + break; + } + } + + return ptr; +} + +/* +* process_local_resource_id(): +* - extract the value from "logging.googleapis.com/local_resource_id" field in the record +* - use extracted value to assign the label keys for differnet resource types that specified +* in the configuration of stackdriver_out plugin +*/ +static int process_local_resource_id(const void *data, size_t bytes, + struct flb_stackdriver *ctx, char *type) +{ + int len_ptr; + int ret = -1; + size_t off = 0; msgpack_object root; msgpack_object_map map; msgpack_unpacked result; @@ -295,115 +333,108 @@ static int process_local_resource_id(const void *data, size_t bytes, const char delim[] = "."; msgpack_unpacked_init(&result); - while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS && !flag) { + if (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { root = result.data; - flag = 0; - if (root.type != MSGPACK_OBJECT_ARRAY || - root.via.array.size != 2 || - root.via.array.ptr[1].type != MSGPACK_OBJECT_MAP) { + if (!validate_msgpack_unpacked_data(root)) { flb_plg_warn(ctx->ins, "unexpected record format"); - continue; + return -1; } map = root.via.array.ptr[1].via.map; - len = strlen(LOCAL_RESOURCE_ID_KEY); - for (i = 0; i < map.size; i++) { - k = map.ptr[i].key; - v = map.ptr[i].val; - - if (k.type == MSGPACK_OBJECT_STR && - strncmp(k.via.str.ptr, LOCAL_RESOURCE_ID_KEY, len) == 0) { - - local_resource_id = malloc(v.via.str.size+1); - strncpy(local_resource_id, v.via.str.ptr, v.via.str.size); - local_resource_id[v.via.str.size] = '\0'; - - ptr = strtok(local_resource_id, delim); - - if (strncmp(type, "k8s_container", strlen("k8s_container")) == 0) { - /* check the prefix */ - if (strncasecmp(ptr, type, strlen("k8s_container")) != 0) { - ret = -1; - } - - ptr = strtok(NULL, delim); - while (ptr != NULL) - { - /* Follow the order of fields in local_resource_id */ - if (!ctx->namespace_name) { - ctx->namespace_name = flb_sds_create(ptr); - } - else if (!ctx->pod_name) { - ctx->pod_name = flb_sds_create(ptr); - } - else if (!ctx->container_name) { - ctx->container_name = flb_sds_create(ptr); - } - - ptr = strtok(NULL, delim); - } - - if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name || - ret != 0) { - free(local_resource_id); - msgpack_unpacked_destroy(&result); - flb_sds_destroy(ctx->namespace_name); - flb_sds_destroy(ctx->pod_name); - flb_sds_destroy(ctx->container_name); - return -1; - } + local_resource_id = get_str_value_from_msgpack_map(map, LOCAL_RESOURCE_ID_KEY); + if (!local_resource_id) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ptr = strtok(local_resource_id, delim); + len_ptr = strlen(ptr); + if (strncmp(type, "k8s_container", strlen("k8s_container")) == 0) { + /* check the prefix */ + if (strncasecmp(ptr, type, len_ptr) != 0) { + free(local_resource_id); + msgpack_unpacked_destroy(&result); + return -1; + } + + ptr = strtok(NULL, delim); + while (ptr != NULL) + { + /* Follow the order of fields in local_resource_id */ + if (!ctx->namespace_name) { + ctx->namespace_name = flb_sds_create(ptr); } - else if (strncmp(type, "k8s_node", strlen("k8s_node")) == 0) { - if (strncasecmp(ptr, type, strlen("k8s_node")) != 0) { - ret = -1; - } - - ptr = strtok(NULL, delim); - if (ptr != NULL) { - ctx->node_name = flb_sds_create(ptr); - } - - if (!ctx->node_name || ret != 0) { - free(local_resource_id); - msgpack_unpacked_destroy(&result); - flb_sds_destroy(ctx->node_name); - return -1; - } + else if (!ctx->pod_name) { + ctx->pod_name = flb_sds_create(ptr); + } + else if (!ctx->container_name) { + ctx->container_name = flb_sds_create(ptr); + } + + ptr = strtok(NULL, delim); + } + + if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) { + flb_free(local_resource_id); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + flb_sds_destroy(ctx->container_name); + return -1; + } + } + else if (strncmp(type, "k8s_node", strlen("k8s_node")) == 0) { + if (strncasecmp(ptr, type, len_ptr) != 0) { + flb_free(local_resource_id); + msgpack_unpacked_destroy(&result); + return -1; + } + + ptr = strtok(NULL, delim); + if (ptr != NULL) { + ctx->node_name = flb_sds_create(ptr); + } + + if (!ctx->node_name) { + flb_free(local_resource_id); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(ctx->node_name); + return -1; + } + } + else if (strncmp(type, "k8s_pod", strlen("k8s_pod")) == 0) { + if (strncasecmp(ptr, type, len_ptr) != 0) { + flb_free(local_resource_id); + msgpack_unpacked_destroy(&result); + return -1; + } + + ptr = strtok(NULL, delim); + while (ptr != NULL) + { + /* Follow the order of fields in local_resource_id */ + if (!ctx->namespace_name) { + ctx->namespace_name = flb_sds_create(ptr); } - else if (strncmp(type, "k8s_pod", strlen("k8s_pod")) == 0) { - if (strncasecmp(ptr, type, strlen("k8s_pod")) != 0) { - ret = -1; - } - - ptr = strtok(NULL, delim); - while (ptr != NULL) - { - /* Follow the order of fields in local_resource_id */ - if (!ctx->namespace_name) { - ctx->namespace_name = flb_sds_create(ptr); - } - else if (!ctx->pod_name) { - ctx->pod_name = flb_sds_create(ptr); - } - - ptr = strtok(NULL, delim); - } - - if (!ctx->namespace_name || !ctx->pod_name || ret != 0) { - free(local_resource_id); - msgpack_unpacked_destroy(&result); - flb_sds_destroy(ctx->namespace_name); - flb_sds_destroy(ctx->pod_name); - return -1; - } + else if (!ctx->pod_name) { + ctx->pod_name = flb_sds_create(ptr); } - flag = 1; - free(local_resource_id); - break; + ptr = strtok(NULL, delim); + } + + if (!ctx->namespace_name || !ctx->pod_name) { + flb_free(local_resource_id); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + return -1; } } + + ret = 0; + free(local_resource_id); } msgpack_unpacked_destroy(&result); diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index 3dd49d48dbe..7cd98024f38 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -269,7 +269,14 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * ctx->metadata_server_auth = true; } - /* Resource type (only 'global' and 'gce_instance' are supported) */ + /* + * Supported resource types: + * 'global' + * 'gce_instance' + * 'k8s_pod' + * 'k8s_node' + * 'k8s_container' + */ tmp = flb_output_get_property("resource", ins); if (tmp) { if (validate_resource(tmp) != 0) { @@ -289,16 +296,6 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * ctx->severity_key = flb_sds_create(tmp); } - tmp = flb_output_get_property("k8s_cluster_name", ins); - if (tmp) { - ctx->cluster_name = flb_sds_create(tmp); - } - - tmp = flb_output_get_property("k8s_cluster_location", ins); - if (tmp) { - ctx->cluster_location = flb_sds_create(tmp); - } - if (flb_sds_cmp(ctx->resource, "k8s_container", flb_sds_len(ctx->resource)) == 0 || flb_sds_cmp(ctx->resource, "k8s_node", @@ -306,7 +303,18 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * flb_sds_cmp(ctx->resource, "k8s_pod", flb_sds_len(ctx->resource)) == 0) { - ctx->k8s_resource_type = true; + ctx->k8s_resource_type = FLB_TRUE; + + tmp = flb_output_get_property("k8s_cluster_name", ins); + if (tmp) { + ctx->cluster_name = flb_sds_create(tmp); + } + + tmp = flb_output_get_property("k8s_cluster_location", ins); + if (tmp) { + ctx->cluster_location = flb_sds_create(tmp); + } + if (!ctx->cluster_name || !ctx->cluster_location) { flb_plg_error(ctx->ins, "Missing k8s_cluster_name " "or k8s_cluster_location in configuration"); @@ -345,7 +353,6 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) flb_sds_destroy(ctx->resource); flb_sds_destroy(ctx->severity_key); - if (ctx->o) { flb_oauth2_destroy(ctx->o); } diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 6232a19cb69..429867b00dc 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -57,7 +57,6 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_STDOUT "out_stdout.c") FLB_RT_TEST(FLB_OUT_STACKDRIVER "out_stackdriver.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") - endif() From ca194c171edeacf2c7e607a2bc44da1b97d55b3e Mon Sep 17 00:00:00 2001 From: Jeff Luo Date: Wed, 17 Jun 2020 16:46:13 -0400 Subject: [PATCH 11/12] out_stackdriver: Modify tag value based on stream Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 38 ++++++++++++++++++++++++++- plugins/out_stackdriver/stackdriver.h | 5 ++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index fd0d3227fc9..d9312f06ecd 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -441,6 +441,33 @@ static int process_local_resource_id(const void *data, size_t bytes, return ret; } +static int get_stream(msgpack_object_map map) +{ + int i; + msgpack_object k; + msgpack_object v; + + for (i = 0; i < map.size; i++) { + k = map.ptr[i].key; + v = map.ptr[i].val; + + if (k.type == MSGPACK_OBJECT_STR && + strncmp(k.via.str.ptr, "stream", k.via.str.size) == 0) { + if (strncmp(v.via.str.ptr, "stdout", strlen("stdout")) == 0) { + return STREAM_STDOUT; + } + else if (strncmp(v.via.str.ptr, "stderr", strlen("stderr")) == 0) { + return STREAM_STDERR; + } + else { + return STREAM_UNKNOWN; + } + } + } + + return NO_STREAM; +} + static int cb_stackdriver_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { @@ -624,6 +651,7 @@ static int stackdriver_format(struct flb_config *config, int new_map_size; int array_size = 0; int k8s_resource_type = 0; + int stream; size_t s; size_t off = 0; char path[PATH_MAX]; @@ -908,7 +936,15 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_object(&mp_pck, *obj); } - + if (k8s_resource_type) { + stream = get_stream(result.data.via.array.ptr[1].via.map); + if (stream == STREAM_STDOUT) { + tag = "stdout"; + } + else if (stream == STREAM_STDERR) { + tag = "stderr"; + } + } /* logName */ len = snprintf(path, sizeof(path) - 1, diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 212d1ecca38..537a06c9626 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -48,6 +48,11 @@ #define LOCAL_RESOURCE_ID_KEY "logging.googleapis.com/local_resource_id" +#define NO_STREAM 0 +#define STREAM_STDOUT 1 +#define STREAM_STDERR 2 +#define STREAM_UNKNOWN 3 + struct flb_stackdriver { /* credentials */ flb_sds_t credentials_file; From 9b24ad2420b964060cd7916ca05bf7ce5d3c43ec Mon Sep 17 00:00:00 2001 From: Jeff Luo Date: Tue, 23 Jun 2020 16:24:48 -0400 Subject: [PATCH 12/12] out_stackdriver: resolve comments from PR Signed-off-by: Jeff Luo --- plugins/out_stackdriver/stackdriver.c | 10 ++++++---- plugins/out_stackdriver/stackdriver.h | 1 - 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index d9312f06ecd..25e4c39ffcc 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -465,7 +465,7 @@ static int get_stream(msgpack_object_map map) } } - return NO_STREAM; + return STREAM_UNKNOWN; } static int cb_stackdriver_init(struct flb_output_instance *ins, @@ -656,6 +656,7 @@ static int stackdriver_format(struct flb_config *config, size_t off = 0; char path[PATH_MAX]; char time_formatted[255]; + char *newtag; struct tm tm; struct flb_time tms; severity_t severity; @@ -936,19 +937,20 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_object(&mp_pck, *obj); } + newtag = tag; if (k8s_resource_type) { stream = get_stream(result.data.via.array.ptr[1].via.map); if (stream == STREAM_STDOUT) { - tag = "stdout"; + newtag = "stdout"; } else if (stream == STREAM_STDERR) { - tag = "stderr"; + newtag = "stderr"; } } /* logName */ len = snprintf(path, sizeof(path) - 1, - "projects/%s/logs/%s", ctx->project_id, tag); + "projects/%s/logs/%s", ctx->project_id, newtag); msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "logName", 7); diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 537a06c9626..dc031da0898 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -48,7 +48,6 @@ #define LOCAL_RESOURCE_ID_KEY "logging.googleapis.com/local_resource_id" -#define NO_STREAM 0 #define STREAM_STDOUT 1 #define STREAM_STDERR 2 #define STREAM_UNKNOWN 3