Skip to content
Open
9 changes: 7 additions & 2 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ struct aws_s3_client {
/* The calculated ideal number of HTTP connections, based on throughput target and throughput per connection. */
const uint32_t ideal_connection_count;

/* Tokens added when meta requests are created and subtracted when requests use the tokens to allocate connections
*/
struct aws_atomic_var token_bucket;

/**
* For multi-part upload, content-md5 will be calculated if the AWS_MR_CONTENT_MD5_ENABLED is specified
* or initial request has content-md5 header.
Expand All @@ -285,8 +289,6 @@ struct aws_s3_client {
/* Hard limit on max connections set through the client config. */
const uint32_t max_active_connections_override;

struct aws_atomic_var max_allowed_connections;

/* Retry strategy used for scheduling request retries. */
struct aws_retry_strategy *retry_strategy;

Expand Down Expand Up @@ -363,6 +365,9 @@ struct aws_s3_client {
/* Number of requests being sent/received over network. */
struct aws_atomic_var num_requests_network_io[AWS_S3_META_REQUEST_TYPE_MAX];

/* Total number of requests on the network over all the meta requests. */
struct aws_atomic_var num_requests_network_total;

/* Number of requests sitting in their meta request priority queue, waiting to be streamed. */
struct aws_atomic_var num_requests_stream_queued_waiting;

Expand Down
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ struct aws_s3_meta_request {

enum aws_s3_meta_request_type type;
struct aws_string *s3express_session_host;
/* Estimated size of a meta request. for file downloads, discovery request reveal size.
* In other cases, we preemptively knew the size or will never know the size of the object. */
size_t object_size;
/* Is the meta request made to s3express bucket or not. */
bool is_express;
/* If the buffer pool optimized for the specific size or not. */
Expand Down
4 changes: 4 additions & 0 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ struct aws_s3_request {
/* The upload_timeout used. Zero, if the request is not a upload part */
size_t upload_timeout_ms;

/* The number of tokens used to send the request. Initialized to zero until connection needs to be created. In case
* of failure, we also set it back to zero until the retry requires a connection again. */
uint32_t tokens_used;

/* Number of times aws_s3_meta_request_prepare has been called for a request. During the first call to the virtual
* prepare function, this will be 0.*/
uint32_t num_times_prepared;
Expand Down
169 changes: 165 additions & 4 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,42 @@ const uint32_t g_min_num_connections = 10; /* Magic value based on: 10 was old b
* be 2500 Gbps. */
const uint32_t g_max_num_connections = 10000;

/*
* This is a token based implementation dor dynamic scaling of connections.
* The calculations are approximate and can be improved in the future. The idea is to scale the number of connections
* we require up and down based on the different requests we receive and hence dynamically scale the maximum number
* of connections we need to open. One token is equivalent to 1Mbps of throughput.
*/

/* All throughput values are in MBps and provided by S3 team */

// 90 MBps
const uint32_t s_s3_download_throughput_per_connection_mbps = 90 * 8;
// 20 MBps
const uint32_t s_s3_upload_throughput_per_connection_mbps = 20 * 8;
// 150 MBps
const uint32_t s_s3_express_download_throughput_per_connection_mbps = 150 * 8;
// 100 MBps
const uint32_t s_s3_express_upload_throughput_per_connection_mbps = 100 * 8;

/* All latency values are in milliseconds (ms) and provided by S3 team */
// 30ms
const double s_s3_p50_request_latency_ms = 0.03;
// 4ms
const double s_s3_express_p50_request_latency_ms = 0.004;

/*
* Represents the minimum number of tokens a particular request might use irrespective of payload size or throughput
* achieved. This is required to hard limit the number of connections we open for extremely small request sizes. The
* number below is arbitrary until we come up with more sophisticated math.
*/
const uint32_t s_s3_minimum_tokens = 10;

/*
* Represents a rough estimate of the tokens used by a request we do not have the idea of type or payload for.
* This is hard to set and hence is an approximation.
*/
const uint32_t s_s3_default_tokens = 500;
/**
* Default max part size is 5GiB as the server limit.
*/
Expand Down Expand Up @@ -156,8 +192,9 @@ void aws_s3_set_dns_ttl(size_t ttl) {

/**
* Determine how many connections are ideal by dividing target-throughput by throughput-per-connection.
* TODO: we may consider to alter this, upload to regular s3 can use more connections, and s3 express
* can use less connections to reach the target throughput..
* TODO: we have begun altering the calculation behind this. get_ideal_connection_number no longer provides
* the basis for the number of connections we use for a particular meta request. It will soon be removed
* as part of future work towards moving to a dynamic connection allocation implementation.
**/
static uint32_t s_get_ideal_connection_number_from_throughput(double throughput_gps) {
double ideal_connection_count_double = throughput_gps / s_throughput_per_connection_gbps;
Expand Down Expand Up @@ -206,6 +243,112 @@ uint32_t aws_s3_client_get_max_active_connections(
return max_active_connections;
}

/* Initialize token bucket based on target throughput */
void s_s3_client_init_tokens(struct aws_s3_client *client) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add static to the static functions.

Suggested change
void s_s3_client_init_tokens(struct aws_s3_client *client) {
static void s_s3_client_init_tokens(struct aws_s3_client *client) {

AWS_PRECONDITION(client);

aws_atomic_store_int(&client->token_bucket, (uint32_t)client->throughput_target_gbps * 1024);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for overflow. The throughput_target_gbps is customer provided, it could be anything. Check if this multiply will cause overflow.

}

/* Releases tokens back after request is complete. */
void s_s3_client_release_tokens(struct aws_s3_client *client, struct aws_s3_request *request) {
AWS_PRECONDITION(client);
AWS_PRECONDITION(request);

aws_atomic_fetch_add(&client->token_bucket, request->tokens_used);
request->tokens_used = 0;
}

/* Checks to ensure we are not violating user configured connection limits althought we are dynamically increasing
* and decreasing connections */
bool s_check_connection_limits(struct aws_s3_client *client, struct aws_s3_request *request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the same logic at the same place.
We have a aws_s3_client_get_max_active_connections function to get the limit from customer settings about the meta request, reuse the function instead of writing a new one.

AWS_PRECONDITION(client);
AWS_PRECONDITION(request);

// We ensure we do not violate the user set max-connections limit
if ((uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_total) >=
client->max_active_connections_override &&
client->max_active_connections_override > 0) {
return false;
}

struct aws_s3_meta_request *meta_request = request->meta_request;
if (meta_request &&
(uint32_t)aws_atomic_load_int(&meta_request->num_requests_network) >=
meta_request->max_active_connections_override &&
meta_request->max_active_connections_override > 0) {
return false;
}

return true;
}

/* Returns true or false based on whether the request was able to avail the required amount of tokens.
* TODO: try to introduce a scalability factor instead of using pure latency. */
bool s_s3_client_acquire_tokens(struct aws_s3_client *client, struct aws_s3_request *request) {
AWS_PRECONDITION(client);
AWS_PRECONDITION(request);

uint32_t required_tokens = 0;

/*
* In each of the following cases, we determine the number of tokens required using the following formula, (One
* token is equivalent to attaining 1Mbps of target throughput):-
*
* For each operation (upload/download) and each service (s3/s3express), we have a hardcoded attainable throughput
* per connection value obtained from S3. also the latency involved in the respective services.
*
* The attained throughput per request is atmost the attainable throughput of the respective service-operation or
* when the payload size is small enough the delivery time is neglegible and hence close to payload/latency.
*
* The tokens used is basically the minimum of the two since larger objects end up getting closer to the attainable
* throughput and the smaller object max out at a theoretical limit of what it can attain.
*
* This calculation is an approximation of reality and might continuously improve based on S3 performance.
*/

switch (request->request_type) {
case AWS_S3_REQUEST_TYPE_GET_OBJECT: {
if (request->meta_request->is_express) {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)),
s_s3_express_download_throughput_per_connection_mbps);
} else {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)),
s_s3_download_throughput_per_connection_mbps);
}
break;
}
case AWS_S3_REQUEST_TYPE_UPLOAD_PART: {
if (request->meta_request->is_express) {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)),
s_s3_express_upload_throughput_per_connection_mbps);
} else {
required_tokens = aws_min_u32(
(uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)),
s_s3_upload_throughput_per_connection_mbps);
}
break;
}
default: {
required_tokens = s_s3_default_tokens;
}
}

// Ensure we are using atleast minimum number of tokens irrespective of payload size.
required_tokens = aws_max_u32(required_tokens, s_s3_minimum_tokens);

if ((uint32_t)aws_atomic_load_int(&client->token_bucket) > required_tokens) {
// do we need error handling here?
aws_atomic_fetch_sub(&client->token_bucket, required_tokens);
request->tokens_used = required_tokens;
return true;
}
return false;
}

/* Returns the max number of requests allowed to be in memory */
uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client) {
AWS_PRECONDITION(client);
Expand Down Expand Up @@ -421,6 +564,8 @@ struct aws_s3_client *aws_s3_client_new(
*(uint32_t *)&client->ideal_connection_count = aws_max_u32(
g_min_num_connections, s_get_ideal_connection_number_from_throughput(client->throughput_target_gbps));

s_s3_client_init_tokens(client);

size_t part_size = (size_t)g_default_part_size_fallback;
if (client_config->part_size != 0) {
if (client_config->part_size > SIZE_MAX) {
Expand Down Expand Up @@ -1826,12 +1971,20 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {
uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
num_requests_streaming_response + num_requests_being_prepared +
client->threaded_data.request_queue_size;

uint32_t total_tokens = (uint32_t)client->throughput_target_gbps * 1024;

uint32_t available_tokens = (uint32_t)aws_atomic_load_int(&client->token_bucket);

uint32_t used_tokens = total_tokens - available_tokens;

AWS_LOGF(
s_log_level_client_stats,
AWS_LS_S3_CLIENT_STATS,
"id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d "
"Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d "
"Requests-streaming-response:%d "
"Total Tokens: %d, Tokens Available: %d, Tokens Used: %d"
" Endpoints(in-table/allocated):%d/%d",
(void *)client,
total_approx_requests,
Expand All @@ -1844,6 +1997,9 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {
num_requests_network_io,
num_requests_stream_queued_waiting,
num_requests_streaming_response,
total_tokens,
available_tokens,
used_tokens,
num_endpoints_in_table,
num_endpoints_allocated);
}
Expand Down Expand Up @@ -2266,7 +2422,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {

struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client);
struct aws_s3_meta_request *meta_request = request->meta_request;
const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, meta_request);

if (request->is_noop) {
/* If request is no-op, finishes and cleans up the request */
s_s3_client_meta_request_finished_request(client, meta_request, request, AWS_ERROR_SUCCESS);
Expand All @@ -2286,7 +2442,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {

s_s3_client_meta_request_finished_request(client, meta_request, request, AWS_ERROR_S3_CANCELED);
request = aws_s3_request_release(request);
} else if ((uint32_t)aws_atomic_load_int(&meta_request->num_requests_network) < max_active_connections) {
} else if (s_check_connection_limits(client, request) && s_s3_client_acquire_tokens(client, request)) {
/* Make sure it's above the max request level limitation. */
s_s3_client_create_connection_for_request(client, request);
} else {
Expand Down Expand Up @@ -2336,6 +2492,7 @@ static void s_s3_client_create_connection_for_request_default(

aws_atomic_fetch_add(&meta_request->num_requests_network, 1);
aws_atomic_fetch_add(&client->stats.num_requests_network_io[meta_request->type], 1);
aws_atomic_fetch_add(&client->stats.num_requests_network_total, 1);

struct aws_s3_connection *connection = aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_connection));

Expand Down Expand Up @@ -2612,6 +2769,9 @@ void aws_s3_client_notify_connection_finished(
request->send_data.metrics->time_metrics.s3_request_last_attempt_end_timestamp_ns -
request->send_data.metrics->time_metrics.s3_request_first_attempt_start_timestamp_ns;

// release tokens acquired for the request
s_s3_client_release_tokens(client, request);

if (connection->retry_token != NULL) {
/* If we have a retry token and successfully finished, record that success. */
if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
Expand All @@ -2631,6 +2791,7 @@ void aws_s3_client_notify_connection_finished(
}
aws_atomic_fetch_sub(&meta_request->num_requests_network, 1);
aws_atomic_fetch_sub(&client->stats.num_requests_network_io[meta_request->type], 1);
aws_atomic_fetch_sub(&client->stats.num_requests_network_total, 1);

s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);

Expand Down
2 changes: 1 addition & 1 deletion tests/s3_max_active_connections_override_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ TEST_CASE(s3_max_active_connections_override_enforced) {
* TODO: this test seems a bit flaky. Sometime the peak we collect is like one more than expected. Maybe some race
* conditions that release and acquire happening. Check it against either the expected or expected + 1 for now.
*/
ASSERT_TRUE(peak == options.max_active_connections_override || peak == options.max_active_connections_override + 1);
ASSERT_TRUE(peak <= options.max_active_connections_override + 1);

aws_input_stream_destroy(input_stream);
aws_string_destroy(host_name);
Expand Down
3 changes: 3 additions & 0 deletions tests/s3_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,9 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester
aws_atomic_init_int(&mock_client->stats.num_requests_stream_queued_waiting, 0);
aws_atomic_init_int(&mock_client->stats.num_requests_streaming_response, 0);

// create tokens for mock use
aws_atomic_init_int(&mock_client->token_bucket, 1000000);

return mock_client;
}

Expand Down