Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 123 additions & 59 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,96 @@ static void s3_context_destroy(struct flb_s3 *ctx)
flb_free(ctx);
}

static int init_endpoint(struct flb_s3 *ctx) {
const char *tmp;
char *ep;
struct flb_split_entry *tok;
struct mk_list *split;
int list_size;
flb_sds_t url;
flb_sds_t tmp_sds;
size_t len;

tmp = flb_output_get_property("endpoint", ctx->ins);
if (tmp) {
ctx->insecure = strncmp(tmp, "http://", 7) == 0 ? FLB_TRUE : FLB_FALSE;
if (ctx->insecure == FLB_TRUE) {
ep = removeProtocol((char *) tmp, "http://");
}
else {
ep = removeProtocol((char *) tmp, "https://");
}

split = flb_utils_split((const char *)ep, ':', 1);
if (!split) {
flb_errno();
return -1;
}
list_size = mk_list_size(split);
if (list_size > 2) {
flb_plg_error(ctx->ins, "Failed to split endpoint");
flb_utils_split_free(split);
return -1;
}

tok = mk_list_entry_first(split, struct flb_split_entry, _head);
ctx->endpoint = flb_strndup(tok->value, tok->len);
if (!ctx->endpoint) {
flb_errno();
flb_utils_split_free(split);
return -1;
}
ctx->free_endpoint = FLB_TRUE;
if (list_size == 2) {
tok = mk_list_entry_next(&tok->_head, struct flb_split_entry, _head, split);
ctx->port = atoi(tok->value);
}
else {
ctx->port = ctx->insecure == FLB_TRUE ? DEFAULT_S3_INSECURE_PORT : DEFAULT_S3_PORT;
}
flb_utils_split_free(split);
}
else {
/* default endpoint for the given region */
ctx->endpoint = flb_aws_endpoint("s3", ctx->region);
ctx->insecure = FLB_FALSE;
ctx->port = DEFAULT_S3_PORT;
ctx->free_endpoint = FLB_TRUE;
if (!ctx->endpoint) {
flb_plg_error(ctx->ins, "Could not construct S3 endpoint");
return -1;
}
}

if (ctx->vhost_style_urls == FLB_TRUE) {
// Add 1 because we need an extra dot
len = strlen(ctx->endpoint) + strlen(ctx->bucket) + 1;
url = flb_sds_create_size(len);
tmp_sds = flb_sds_printf(&url, "%s.%s", ctx->bucket, ctx->endpoint);
if (!tmp_sds) {
flb_sds_destroy(url);
flb_plg_error(ctx->ins, "Could not construct vhost-style S3 endpoint");
return -1;
}
url = tmp_sds;

// Free the old one since we no longer need it
if (ctx->free_endpoint == FLB_TRUE) {
flb_free(ctx->endpoint);
}

ctx->endpoint = flb_strndup(url, flb_sds_len(url));
flb_sds_destroy(url);
if (ctx->endpoint == NULL) {
flb_plg_error(ctx->ins, "error duplicating endpoint string");
return -1;
}
flb_plg_info(ctx->ins, "New endpoint: %s", ctx->endpoint);
}
Comment on lines +650 to +674
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify bucket name compatibility with vhost-style URLs.

When using vhost-style URLs, bucket names must be DNS-compliant. The code should validate that the bucket name doesn't contain characters that would make it invalid as a DNS hostname (e.g., underscores, uppercase letters).

Consider adding validation for DNS-compliant bucket names when vhost-style URLs are enabled:

if (ctx->vhost_style_urls == FLB_TRUE) {
    // Validate bucket name is DNS-compliant
    // - Must be 3-63 characters
    // - Must start and end with lowercase letter or number
    // - Can contain lowercase letters, numbers, and hyphens
    // - Cannot contain consecutive periods or be formatted as IP address
}

🏁 Script executed:

#!/bin/bash
# Check if there's existing bucket name validation in the codebase
rg -n "bucket.*valid|valid.*bucket" --type c
# Look for DNS validation patterns
rg -n "DNS|dns.*compliant|hostname.*valid" --type c
# Check for bucket name restrictions in documentation
fd README -e md -x grep -l "bucket.*name.*restriction\|DNS"

Length of output: 107459


Validate bucket name before constructing vhost-style endpoint (DNS‑compliant).

plugins/out_s3/s3.c (around lines 650–674) currently concatenates ctx->bucket and ctx->endpoint into a hostname without validating the bucket. Add a DNS-compatibility check and fall back to path-style (or return an error) when the bucket is not DNS-compatible. Enforce: 3–63 chars; only lowercase letters, digits, hyphen and dot; start/end with letter or digit; no consecutive dots; not IPv4-formatted. A practical check: regex ^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$ AND reject /(\d+.){3}\d+/ and '..' before using vhost-style formatting.


return 0;
}

static int cb_s3_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
{
Expand All @@ -598,10 +688,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
struct flb_aws_client_generator *generator;
(void) config;
(void) data;
char *ep;
struct flb_split_entry *tok;
struct mk_list *split;
int list_size;

FLB_TLS_INIT(s3_worker_info);

Expand Down Expand Up @@ -790,55 +876,11 @@ static int cb_s3_init(struct flb_output_instance *ins,
}
}

tmp = flb_output_get_property("endpoint", ins);
if (tmp) {
ctx->insecure = strncmp(tmp, "http://", 7) == 0 ? FLB_TRUE : FLB_FALSE;
if (ctx->insecure == FLB_TRUE) {
ep = removeProtocol((char *) tmp, "http://");
}
else {
ep = removeProtocol((char *) tmp, "https://");
}

split = flb_utils_split((const char *)ep, ':', 1);
if (!split) {
flb_errno();
return -1;
}
list_size = mk_list_size(split);
if (list_size > 2) {
flb_plg_error(ctx->ins, "Failed to split endpoint");
flb_utils_split_free(split);
return -1;
}

tok = mk_list_entry_first(split, struct flb_split_entry, _head);
ctx->endpoint = flb_strndup(tok->value, tok->len);
if (!ctx->endpoint) {
flb_errno();
flb_utils_split_free(split);
return -1;
}
ctx->free_endpoint = FLB_TRUE;
if (list_size == 2) {
tok = mk_list_entry_next(&tok->_head, struct flb_split_entry, _head, split);
ctx->port = atoi(tok->value);
}
else {
ctx->port = ctx->insecure == FLB_TRUE ? DEFAULT_S3_INSECURE_PORT : DEFAULT_S3_PORT;
}
flb_utils_split_free(split);
}
else {
/* default endpoint for the given region */
ctx->endpoint = flb_aws_endpoint("s3", ctx->region);
ctx->insecure = FLB_FALSE;
ctx->port = DEFAULT_S3_PORT;
ctx->free_endpoint = FLB_TRUE;
if (!ctx->endpoint) {
flb_plg_error(ctx->ins, "Could not construct S3 endpoint");
return -1;
}
// VHOST endpoint construction
ret = init_endpoint(ctx);
if (ret != 0) {
flb_plg_error(ctx->ins, "Error initialising endpoint");
return -1;
}

tmp = flb_output_get_property("sts_endpoint", ins);
Expand Down Expand Up @@ -1506,7 +1548,10 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_
append_random = FLB_TRUE;
len += 16;
}
len += strlen(ctx->bucket + 1);

if (ctx->vhost_style_urls == FLB_FALSE) {
len += strlen(ctx->bucket + 1);
}

uri = flb_sds_create_size(len);

Expand All @@ -1521,12 +1566,21 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_
/* only use 8 chars of the random string */
random_alphanumeric[8] = '\0';

tmp = flb_sds_printf(&uri, "/%s%s-object%s", ctx->bucket, s3_key,
random_alphanumeric);
if (ctx->vhost_style_urls == FLB_TRUE) {
tmp = flb_sds_printf(&uri, "%s-object%s", s3_key,
random_alphanumeric);
} else {
tmp = flb_sds_printf(&uri, "/%s%s-object%s", ctx->bucket, s3_key,
random_alphanumeric);
}
flb_free(random_alphanumeric);
}
else {
tmp = flb_sds_printf(&uri, "/%s%s", ctx->bucket, s3_key);
if (ctx->vhost_style_urls == FLB_TRUE) {
tmp = flb_sds_printf(&uri, "%s", s3_key);
} else {
tmp = flb_sds_printf(&uri, "/%s%s", ctx->bucket, s3_key);
}
}

if (!tmp) {
Expand Down Expand Up @@ -1580,10 +1634,14 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_
flb_plg_debug(ctx->ins, "PutObject http status=%d", c->resp.status);
if (c->resp.status == 200) {
/*
* URI contains bucket name, so we must advance over it
* URI may contain bucket name, so we must advance over it
* to print the object key
*/
final_key = uri + strlen(ctx->bucket) + 1;
if (ctx->vhost_style_urls == FLB_TRUE) {
final_key = uri;
} else {
final_key = uri + strlen(ctx->bucket) + 1;
}
flb_plg_info(ctx->ins, "Successfully uploaded object %s", final_key);
flb_sds_destroy(uri);
flb_http_client_destroy(c);
Expand Down Expand Up @@ -4037,6 +4095,12 @@ static struct flb_config_map config_map[] = {
"A standard MIME type for the S3 object; this will be set "
"as the Content-Type HTTP header."
},
{
FLB_CONFIG_MAP_BOOL, "vhost_style_urls", "false",
0, FLB_TRUE, offsetof(struct flb_s3, vhost_style_urls),
"Force the use of vhost-style S3 urls. "
"https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html"
},

{
FLB_CONFIG_MAP_STR, "store_dir", "/tmp/fluent-bit/s3",
Expand Down
1 change: 1 addition & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ struct flb_s3 {
int compression;
int port;
int insecure;
int vhost_style_urls;
size_t store_dir_limit_size;

struct flb_blob_db blob_db;
Expand Down
18 changes: 12 additions & 6 deletions plugins/out_s3/s3_multipart.c
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,10 @@ int complete_multipart_upload(struct flb_s3 *ctx,

if (pre_signed_url != NULL) {
tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
}
else {
} else if (ctx->vhost_style_urls == FLB_TRUE) {
tmp = flb_sds_printf(&uri, "%s?uploadId=%s",
m_upload->s3_key, m_upload->upload_id);
} else {
tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket,
m_upload->s3_key, m_upload->upload_id);
}
Expand Down Expand Up @@ -575,8 +577,9 @@ int create_multipart_upload(struct flb_s3 *ctx,

if (pre_signed_url != NULL) {
tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
}
else {
} else if (ctx->vhost_style_urls == FLB_TRUE) {
tmp = flb_sds_printf(&uri, "%s?uploads=", m_upload->s3_key);
} else {
tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key);
}

Expand Down Expand Up @@ -702,8 +705,11 @@ int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload,

if (pre_signed_url != NULL) {
tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
}
else {
} else if (ctx->vhost_style_urls == FLB_TRUE) {
tmp = flb_sds_printf(&uri, "%s?partNumber=%d&uploadId=%s",
m_upload->s3_key, m_upload->part_number,
m_upload->upload_id);
} else {
tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s",
ctx->bucket, m_upload->s3_key, m_upload->part_number,
m_upload->upload_id);
Expand Down
46 changes: 46 additions & 0 deletions tests/runtime/out_s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,51 @@ void flb_test_s3_complete_upload_error(void)
unsetenv("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR");
}

void flb_test_s3_vhost_urls(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

char *bucket;
char *region;

bucket = getenv("FLB_OUT_S3_TEST_BUCKET");
if (bucket == NULL) {
return;
}

region = getenv("FLB_OUT_S3_TEST_REGION");
if (region == NULL) {
return;
}

ctx = flb_create();

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx,in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "s3", NULL);
TEST_CHECK(out_ffd >= 0);

flb_output_set(ctx, out_ffd,"match", "*", NULL);
flb_output_set(ctx, out_ffd,"region", region, NULL);
flb_output_set(ctx, out_ffd,"bucket", bucket, NULL);
flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL);
flb_output_set(ctx, out_ffd,"vhost_style_urls", "true", NULL);
flb_output_set(ctx, out_ffd,"use_put_object", "true", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
Expand All @@ -237,5 +282,6 @@ TEST_LIST = {
{"create_upload_error", flb_test_s3_create_upload_error },
{"upload_part_error", flb_test_s3_upload_part_error },
{"complete_upload_error", flb_test_s3_complete_upload_error },
{"vhost_style_urls_success", flb_test_s3_vhost_urls},
{NULL, NULL}
};