diff --git a/include/aws/s3/private/s3_client_impl.h b/include/aws/s3/private/s3_client_impl.h index 0fb87ca5..282a47c7 100644 --- a/include/aws/s3/private/s3_client_impl.h +++ b/include/aws/s3/private/s3_client_impl.h @@ -275,6 +275,10 @@ struct aws_s3_client { /* The calculated ideal number of HTTP connections, based on throughput target and throughput per connection. */ const uint32_t ideal_connection_count; + /* Tokens added when meta requests are created and subtracted when requests use the tokens to allocate connections + */ + struct aws_atomic_var token_bucket; + /** * For multi-part upload, content-md5 will be calculated if the AWS_MR_CONTENT_MD5_ENABLED is specified * or initial request has content-md5 header. @@ -285,8 +289,6 @@ struct aws_s3_client { /* Hard limit on max connections set through the client config. */ const uint32_t max_active_connections_override; - struct aws_atomic_var max_allowed_connections; - /* Retry strategy used for scheduling request retries. */ struct aws_retry_strategy *retry_strategy; @@ -363,6 +365,9 @@ struct aws_s3_client { /* Number of requests being sent/received over network. */ struct aws_atomic_var num_requests_network_io[AWS_S3_META_REQUEST_TYPE_MAX]; + /* Total number of requests on the network over all the meta requests. */ + struct aws_atomic_var num_requests_network_total; + /* Number of requests sitting in their meta request priority queue, waiting to be streamed. */ struct aws_atomic_var num_requests_stream_queued_waiting; diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index e9f3b5dc..1779a7ac 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -191,6 +191,9 @@ struct aws_s3_meta_request { enum aws_s3_meta_request_type type; struct aws_string *s3express_session_host; + /* Estimated size of a meta request. for file downloads, discovery request reveal size. + * In other cases, we preemptively knew the size or will never know the size of the object. */ + size_t object_size; /* Is the meta request made to s3express bucket or not. */ bool is_express; /* If the buffer pool optimized for the specific size or not. */ diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index af8b8565..09dbefc1 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -255,6 +255,10 @@ struct aws_s3_request { /* The upload_timeout used. Zero, if the request is not a upload part */ size_t upload_timeout_ms; + /* The number of tokens used to send the request. Initialized to zero until connection needs to be created. In case + * of failure, we also set it back to zero until the retry requires a connection again. */ + uint32_t tokens_used; + /* Number of times aws_s3_meta_request_prepare has been called for a request. During the first call to the virtual * prepare function, this will be 0.*/ uint32_t num_times_prepared; diff --git a/source/s3_client.c b/source/s3_client.c index 5d9370a0..b26f8ef0 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -75,6 +75,42 @@ const uint32_t g_min_num_connections = 10; /* Magic value based on: 10 was old b * be 2500 Gbps. */ const uint32_t g_max_num_connections = 10000; +/* + * This is a token based implementation dor dynamic scaling of connections. + * The calculations are approximate and can be improved in the future. The idea is to scale the number of connections + * we require up and down based on the different requests we receive and hence dynamically scale the maximum number + * of connections we need to open. One token is equivalent to 1Mbps of throughput. + */ + +/* All throughput values are in MBps and provided by S3 team */ + +// 90 MBps +const uint32_t s_s3_download_throughput_per_connection_mbps = 90 * 8; +// 20 MBps +const uint32_t s_s3_upload_throughput_per_connection_mbps = 20 * 8; +// 150 MBps +const uint32_t s_s3_express_download_throughput_per_connection_mbps = 150 * 8; +// 100 MBps +const uint32_t s_s3_express_upload_throughput_per_connection_mbps = 100 * 8; + +/* All latency values are in milliseconds (ms) and provided by S3 team */ +// 30ms +const double s_s3_p50_request_latency_ms = 0.03; +// 4ms +const double s_s3_express_p50_request_latency_ms = 0.004; + +/* + * Represents the minimum number of tokens a particular request might use irrespective of payload size or throughput + * achieved. This is required to hard limit the number of connections we open for extremely small request sizes. The + * number below is arbitrary until we come up with more sophisticated math. + */ +const uint32_t s_s3_minimum_tokens = 10; + +/* + * Represents a rough estimate of the tokens used by a request we do not have the idea of type or payload for. + * This is hard to set and hence is an approximation. + */ +const uint32_t s_s3_default_tokens = 500; /** * Default max part size is 5GiB as the server limit. */ @@ -156,8 +192,10 @@ void aws_s3_set_dns_ttl(size_t ttl) { /** * Determine how many connections are ideal by dividing target-throughput by throughput-per-connection. - * TODO: we may consider to alter this, upload to regular s3 can use more connections, and s3 express - * can use less connections to reach the target throughput.. + * We have begun altering the calculation behind this. get_ideal_connection_number no longer provides + * the basis for the number of connections we use for a particular meta request. However, we may use lesser + * number of connections that the ideal_connection_count incase of endpoints which may have higher connection + * throughput allowing us to use the memory and connection more efficiently. **/ static uint32_t s_get_ideal_connection_number_from_throughput(double throughput_gps) { double ideal_connection_count_double = throughput_gps / s_throughput_per_connection_gbps; @@ -169,7 +207,7 @@ static uint32_t s_get_ideal_connection_number_from_throughput(double throughput_ /* Returns the max number of connections allowed. * - * When meta request is NULL, this will return the overall allowed number of connections based on the clinet + * When meta request is NULL, this will return the overall allowed number of connections based on the client * configurations. * * If meta_request is not NULL, this will return the number of connections allowed based on the meta request @@ -206,6 +244,90 @@ uint32_t aws_s3_client_get_max_active_connections( return max_active_connections; } +/* Initialize token bucket based on target throughput */ +static void s_s3_client_init_tokens(struct aws_s3_client *client) { + AWS_PRECONDITION(client); + + uint32_t target_throughput_mbps = 0; + aws_mul_u32_checked((uint32_t)client->throughput_target_gbps, 1024, &target_throughput_mbps); + aws_atomic_store_int(&client->token_bucket, target_throughput_mbps); +} + +/* Releases tokens back after request is complete. */ +static void s_s3_client_release_tokens(struct aws_s3_client *client, struct aws_s3_request *request) { + AWS_PRECONDITION(client); + AWS_PRECONDITION(request); + + aws_atomic_fetch_add(&client->token_bucket, request->tokens_used); + request->tokens_used = 0; +} + +/* Returns true or false based on whether the request was able to avail the required amount of tokens. + * TODO: try to introduce a scalability factor instead of using pure latency. */ +static bool s_s3_client_acquire_tokens(struct aws_s3_client *client, struct aws_s3_request *request) { + AWS_PRECONDITION(client); + AWS_PRECONDITION(request); + + uint32_t required_tokens = 0; + + /* + * In each of the following cases, we determine the number of tokens required using the following formula, (One + * token is equivalent to attaining 1Mbps of target throughput):- + * + * For each operation (upload/download) and each service (s3/s3express), we have a hardcoded attainable throughput + * per connection value obtained from S3. also the latency involved in the respective services. + * + * The attained throughput per request is atmost the attainable throughput of the respective service-operation or + * when the payload size is small enough the delivery time is neglegible and hence close to payload/latency. + * + * The tokens used is basically the minimum of the two since larger objects end up getting closer to the attainable + * throughput and the smaller object max out at a theoretical limit of what it can attain. + * + * This calculation is an approximation of reality and might continuously improve based on S3 performance. + */ + + switch (request->request_type) { + case AWS_S3_REQUEST_TYPE_GET_OBJECT: { + if (request->meta_request->is_express) { + required_tokens = aws_min_u32( + (uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)), + s_s3_express_download_throughput_per_connection_mbps); + } else { + required_tokens = aws_min_u32( + (uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)), + s_s3_download_throughput_per_connection_mbps); + } + break; + } + case AWS_S3_REQUEST_TYPE_UPLOAD_PART: { + if (request->meta_request->is_express) { + required_tokens = aws_min_u32( + (uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_express_p50_request_latency_ms)), + s_s3_express_upload_throughput_per_connection_mbps); + } else { + required_tokens = aws_min_u32( + (uint32_t)ceil(request->buffer_size * 8 / (MB_TO_BYTES(1) * s_s3_p50_request_latency_ms)), + s_s3_upload_throughput_per_connection_mbps); + } + break; + } + default: { + required_tokens = s_s3_default_tokens; + } + } + + // Ensure we are using atleast minimum number of tokens irrespective of payload size. + required_tokens = aws_max_u32(required_tokens, s_s3_minimum_tokens); + + if ((uint32_t)aws_atomic_load_int(&client->token_bucket) > required_tokens) { + // do we need error handling here? + aws_atomic_fetch_sub(&client->token_bucket, required_tokens); + request->tokens_used = required_tokens; + return true; + } + return false; +} + /* Returns the max number of requests allowed to be in memory */ uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client) { AWS_PRECONDITION(client); @@ -421,6 +543,8 @@ struct aws_s3_client *aws_s3_client_new( *(uint32_t *)&client->ideal_connection_count = aws_max_u32( g_min_num_connections, s_get_ideal_connection_number_from_throughput(client->throughput_target_gbps)); + s_s3_client_init_tokens(client); + size_t part_size = (size_t)g_default_part_size_fallback; if (client_config->part_size != 0) { if (client_config->part_size > SIZE_MAX) { @@ -1826,12 +1950,20 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) { uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting + num_requests_streaming_response + num_requests_being_prepared + client->threaded_data.request_queue_size; + + uint32_t total_tokens = (uint32_t)client->throughput_target_gbps * 1024; + + uint32_t available_tokens = (uint32_t)aws_atomic_load_int(&client->token_bucket); + + uint32_t used_tokens = total_tokens - available_tokens; + AWS_LOGF( s_log_level_client_stats, AWS_LS_S3_CLIENT_STATS, "id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d " "Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d " "Requests-streaming-response:%d " + "Total Tokens: %d, Tokens Available: %d, Tokens Used: %d" " Endpoints(in-table/allocated):%d/%d", (void *)client, total_approx_requests, @@ -1844,6 +1976,9 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) { num_requests_network_io, num_requests_stream_queued_waiting, num_requests_streaming_response, + total_tokens, + available_tokens, + used_tokens, num_endpoints_in_table, num_endpoints_allocated); } @@ -2266,7 +2401,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) { struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client); struct aws_s3_meta_request *meta_request = request->meta_request; - const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, meta_request); + if (request->is_noop) { /* If request is no-op, finishes and cleans up the request */ s_s3_client_meta_request_finished_request(client, meta_request, request, AWS_ERROR_SUCCESS); @@ -2286,7 +2421,10 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) { s_s3_client_meta_request_finished_request(client, meta_request, request, AWS_ERROR_S3_CANCELED); request = aws_s3_request_release(request); - } else if ((uint32_t)aws_atomic_load_int(&meta_request->num_requests_network) < max_active_connections) { + } else if ( + (uint32_t)aws_atomic_load_int(&meta_request->num_requests_network) < + (uint32_t)aws_s3_client_get_max_active_connections(client, meta_request) && + s_s3_client_acquire_tokens(client, request)) { /* Make sure it's above the max request level limitation. */ s_s3_client_create_connection_for_request(client, request); } else { @@ -2336,6 +2474,7 @@ static void s_s3_client_create_connection_for_request_default( aws_atomic_fetch_add(&meta_request->num_requests_network, 1); aws_atomic_fetch_add(&client->stats.num_requests_network_io[meta_request->type], 1); + aws_atomic_fetch_add(&client->stats.num_requests_network_total, 1); struct aws_s3_connection *connection = aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_connection)); @@ -2612,6 +2751,9 @@ void aws_s3_client_notify_connection_finished( request->send_data.metrics->time_metrics.s3_request_last_attempt_end_timestamp_ns - request->send_data.metrics->time_metrics.s3_request_first_attempt_start_timestamp_ns; + // release tokens acquired for the request + s_s3_client_release_tokens(client, request); + if (connection->retry_token != NULL) { /* If we have a retry token and successfully finished, record that success. */ if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) { @@ -2631,6 +2773,7 @@ void aws_s3_client_notify_connection_finished( } aws_atomic_fetch_sub(&meta_request->num_requests_network, 1); aws_atomic_fetch_sub(&client->stats.num_requests_network_io[meta_request->type], 1); + aws_atomic_fetch_sub(&client->stats.num_requests_network_total, 1); s_s3_client_meta_request_finished_request(client, meta_request, request, error_code); diff --git a/tests/s3_max_active_connections_override_test.c b/tests/s3_max_active_connections_override_test.c index 2c09a47e..3ef893a3 100644 --- a/tests/s3_max_active_connections_override_test.c +++ b/tests/s3_max_active_connections_override_test.c @@ -239,7 +239,7 @@ TEST_CASE(s3_max_active_connections_override_enforced) { * TODO: this test seems a bit flaky. Sometime the peak we collect is like one more than expected. Maybe some race * conditions that release and acquire happening. Check it against either the expected or expected + 1 for now. */ - ASSERT_TRUE(peak == options.max_active_connections_override || peak == options.max_active_connections_override + 1); + ASSERT_TRUE(peak <= options.max_active_connections_override + 1); aws_input_stream_destroy(input_stream); aws_string_destroy(host_name); diff --git a/tests/s3_tester.c b/tests/s3_tester.c index 7fec8f7e..13a9784b 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -895,6 +895,9 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester aws_atomic_init_int(&mock_client->stats.num_requests_stream_queued_waiting, 0); aws_atomic_init_int(&mock_client->stats.num_requests_streaming_response, 0); + // create tokens for mock use + aws_atomic_init_int(&mock_client->token_bucket, 1000000); + return mock_client; }