Skip to content
Open
9 changes: 7 additions & 2 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ struct aws_s3_client {
/* The calculated ideal number of HTTP connections, based on throughput target and throughput per connection. */
const uint32_t ideal_connection_count;

/* Tokens added when meta requests are created and subtracted when requests use the tokens to allocate connections
*/
struct aws_atomic_var token_bucket;

/**
* For multi-part upload, content-md5 will be calculated if the AWS_MR_CONTENT_MD5_ENABLED is specified
* or initial request has content-md5 header.
Expand All @@ -285,8 +289,6 @@ struct aws_s3_client {
/* Hard limit on max connections set through the client config. */
const uint32_t max_active_connections_override;

struct aws_atomic_var max_allowed_connections;

/* Retry strategy used for scheduling request retries. */
struct aws_retry_strategy *retry_strategy;

Expand Down Expand Up @@ -363,6 +365,9 @@ struct aws_s3_client {
/* Number of requests being sent/received over network. */
struct aws_atomic_var num_requests_network_io[AWS_S3_META_REQUEST_TYPE_MAX];

/* Total number of requests on the network over all the meta requests. */
struct aws_atomic_var num_requests_network_total;

/* Number of requests sitting in their meta request priority queue, waiting to be streamed. */
struct aws_atomic_var num_requests_stream_queued_waiting;

Expand Down
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ struct aws_s3_meta_request {

enum aws_s3_meta_request_type type;
struct aws_string *s3express_session_host;
/* Estimated size of a meta request. for file downloads, discovery request reveal size.
* In other cases, we preemptively knew the size or will never know the size of the object. */
size_t object_size;
/* Is the meta request made to s3express bucket or not. */
bool is_express;
/* If the buffer pool optimized for the specific size or not. */
Expand Down
150 changes: 148 additions & 2 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
#include <inttypes.h>
#include <math.h>

#define S_IDEAL_PART_SIZE 8
#define S_S3_CLIENT_MINIMUM_CONCURRENT_REQUESTS 8

#ifdef _MSC_VER
# pragma warning(disable : 4232) /* function pointer to dll symbol */
#endif /* _MSC_VER */
Expand Down Expand Up @@ -75,6 +78,33 @@ const uint32_t g_min_num_connections = 10; /* Magic value based on: 10 was old b
* be 2500 Gbps. */
const uint32_t g_max_num_connections = 10000;

/* This is a first pass at a token based implementation, the calculations are approximate and can be improved in the
* future. The idea is to scale the number of connections we require up and down based on the different requests we
* receive and hence dynamically scale the maximum number of connections we need to open. One token is equivalent to
* 1Mbps of throughput. */

/* All throughput values are in MBps and provided by S3 team */

// 90 MBps
const uint32_t s_s3_download_throughput_per_connection_mbps = 90 * 8;
// 20 MBps
const uint32_t s_s3_upload_throughput_per_connection_mbps = 20 * 8;
// 150 MBps
const uint32_t s_s3_express_download_throughput_per_connection_mbps = 150 * 8;
// 100 MBps
const uint32_t s_s3_express_upload_throughput_per_connection_mbps = 100 * 8;

/* All latency values are in milliseconds (ms) and provided by S3 team */
// 30ms
const double s_s3_p50_request_latency_ms = 0.03;
// 4ms
const double s_s3_express_p50_request_latency_ms = 0.004;

/* Currently the ideal part size is 8MB and hence the value set.
* However, this is subject to change due to newer part sizes and adjustments. */

const uint32_t s_s3_minimum_tokens = S_IDEAL_PART_SIZE * 8 * S_S3_CLIENT_MINIMUM_CONCURRENT_REQUESTS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand.

  1. why the S_S3_CLIENT_MINIMUM_CONCURRENT_REQUESTS is 8?
  2. why we multiply those to get the minimum token?
  3. What does the minimum token represent?


/**
* Default max part size is 5GiB as the server limit.
*/
Expand Down Expand Up @@ -206,6 +236,115 @@ uint32_t aws_s3_client_get_max_active_connections(
return max_active_connections;
}

/* Initialize token bucket based on target throughput */
void s_s3_client_init_tokens(struct aws_s3_client *client, double target_throughput_gbps) {
AWS_PRECONDITION(client);
aws_atomic_store_int(
&client->token_bucket, aws_max_u32((uint32_t)target_throughput_gbps * 1024, s_s3_minimum_tokens));
}

/* Releases tokens back after request is complete. */
void s_s3_client_release_tokens(struct aws_s3_client *client, struct aws_s3_request *request) {
AWS_PRECONDITION(client);
AWS_PRECONDITION(request);

uint32_t tokens = 0;

switch (request->request_type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we calculate the token used again here? Should the request keep the token it used?

case AWS_S3_REQUEST_TYPE_GET_OBJECT: {
if (request->meta_request->is_express) {
tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)),
s_s3_express_download_throughput_per_connection_mbps);
} else {
tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)),
s_s3_download_throughput_per_connection_mbps);
}
break;
}
case AWS_S3_REQUEST_TYPE_UPLOAD_PART: {
if (request->meta_request->is_express) {
tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)),
s_s3_express_upload_throughput_per_connection_mbps);
} else {
tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)),
s_s3_upload_throughput_per_connection_mbps);
}
break;
}
default: {
tokens = s_s3_minimum_tokens;
}
}

// do we need error handling here?
aws_atomic_fetch_add(&client->token_bucket, tokens);
}

/* Returns true or false based on whether the request was able to avail the required amount of tokens.
* TODO: try to introduce a scalability factor instead of using pure latency. */
bool s_s3_client_acquire_tokens(struct aws_s3_client *client, struct aws_s3_request *request) {
AWS_PRECONDITION(client);
AWS_PRECONDITION(request);

// We ensure we do not violate the user set max-connections limit
if ((uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_total) >=
client->max_active_connections_override &&
client->max_active_connections_override > 0) {
return false;
}

struct aws_s3_meta_request *meta_request = request->meta_request;
if (meta_request &&
(uint32_t)aws_atomic_load_int(&meta_request->num_requests_network) >=
meta_request->max_active_connections_override &&
meta_request->max_active_connections_override > 0) {
return false;
}

uint32_t required_tokens = 0;

switch (request->request_type) {
case AWS_S3_REQUEST_TYPE_GET_OBJECT: {
if (request->meta_request->is_express) {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)),
s_s3_express_download_throughput_per_connection_mbps);
} else {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)),
s_s3_download_throughput_per_connection_mbps);
}
break;
}
case AWS_S3_REQUEST_TYPE_UPLOAD_PART: {
if (request->meta_request->is_express) {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)),
s_s3_express_upload_throughput_per_connection_mbps);
} else {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)),
s_s3_upload_throughput_per_connection_mbps);
}
break;
}
default: {
required_tokens = s_s3_minimum_tokens;
}
}

if ((uint32_t)aws_atomic_load_int(&client->token_bucket) > required_tokens) {
// do we need error handling here?
aws_atomic_fetch_sub(&client->token_bucket, required_tokens);
return true;
}
return false;
}

/* Returns the max number of requests allowed to be in memory */
uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client) {
AWS_PRECONDITION(client);
Expand Down Expand Up @@ -421,6 +560,8 @@ struct aws_s3_client *aws_s3_client_new(
*(uint32_t *)&client->ideal_connection_count = aws_max_u32(
g_min_num_connections, s_get_ideal_connection_number_from_throughput(client->throughput_target_gbps));

s_s3_client_init_tokens(client, client->throughput_target_gbps);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the token usage to the logs from /* Step 4: Log client stats. */ so that we have better track of it.


size_t part_size = (size_t)g_default_part_size_fallback;
if (client_config->part_size != 0) {
if (client_config->part_size > SIZE_MAX) {
Expand Down Expand Up @@ -2266,7 +2407,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {

struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client);
struct aws_s3_meta_request *meta_request = request->meta_request;
const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, meta_request);

if (request->is_noop) {
/* If request is no-op, finishes and cleans up the request */
s_s3_client_meta_request_finished_request(client, meta_request, request, AWS_ERROR_SUCCESS);
Expand All @@ -2286,7 +2427,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {

s_s3_client_meta_request_finished_request(client, meta_request, request, AWS_ERROR_S3_CANCELED);
request = aws_s3_request_release(request);
} else if ((uint32_t)aws_atomic_load_int(&meta_request->num_requests_network) < max_active_connections) {
} else if (s_s3_client_acquire_tokens(client, request)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still respect the max active connections from settings. And then apply the token limitation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquire tokens checks for the max active connection limits at both the client and meta request level before allocating tokens. Should that maybe be a separate function?

/* Make sure it's above the max request level limitation. */
s_s3_client_create_connection_for_request(client, request);
} else {
Expand Down Expand Up @@ -2336,6 +2477,7 @@ static void s_s3_client_create_connection_for_request_default(

aws_atomic_fetch_add(&meta_request->num_requests_network, 1);
aws_atomic_fetch_add(&client->stats.num_requests_network_io[meta_request->type], 1);
aws_atomic_fetch_add(&client->stats.num_requests_network_total, 1);

struct aws_s3_connection *connection = aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_connection));

Expand Down Expand Up @@ -2612,6 +2754,9 @@ void aws_s3_client_notify_connection_finished(
request->send_data.metrics->time_metrics.s3_request_last_attempt_end_timestamp_ns -
request->send_data.metrics->time_metrics.s3_request_first_attempt_start_timestamp_ns;

// release tokens acquired for the request
s_s3_client_release_tokens(client, request);

if (connection->retry_token != NULL) {
/* If we have a retry token and successfully finished, record that success. */
if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
Expand All @@ -2631,6 +2776,7 @@ void aws_s3_client_notify_connection_finished(
}
aws_atomic_fetch_sub(&meta_request->num_requests_network, 1);
aws_atomic_fetch_sub(&client->stats.num_requests_network_io[meta_request->type], 1);
aws_atomic_fetch_sub(&client->stats.num_requests_network_total, 1);

s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);

Expand Down
2 changes: 1 addition & 1 deletion tests/s3_max_active_connections_override_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ TEST_CASE(s3_max_active_connections_override_enforced) {
* TODO: this test seems a bit flaky. Sometime the peak we collect is like one more than expected. Maybe some race
* conditions that release and acquire happening. Check it against either the expected or expected + 1 for now.
*/
ASSERT_TRUE(peak == options.max_active_connections_override || peak == options.max_active_connections_override + 1);
ASSERT_TRUE(peak <= options.max_active_connections_override + 1);

aws_input_stream_destroy(input_stream);
aws_string_destroy(host_name);
Expand Down
3 changes: 3 additions & 0 deletions tests/s3_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,9 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester
aws_atomic_init_int(&mock_client->stats.num_requests_stream_queued_waiting, 0);
aws_atomic_init_int(&mock_client->stats.num_requests_streaming_response, 0);

// create tokens for mock use
aws_atomic_init_int(&mock_client->token_bucket, 1000000);

return mock_client;
}

Expand Down