From 452155b28e9606e003ca2fa315f38b34a4545984 Mon Sep 17 00:00:00 2001 From: Dengke Date: Fri, 15 Nov 2024 14:38:58 -0800 Subject: [PATCH] update first byte timeout algo --- include/aws/s3/private/s3_client_impl.h | 3 ++ source/s3_client.c | 64 +++++++++++++++++++++---- tests/s3_client_test.c | 58 ++++++++++++++++++++-- 3 files changed, 114 insertions(+), 11 deletions(-) diff --git a/include/aws/s3/private/s3_client_impl.h b/include/aws/s3/private/s3_client_impl.h index d90dad112..d0bc863df 100644 --- a/include/aws/s3/private/s3_client_impl.h +++ b/include/aws/s3/private/s3_client_impl.h @@ -26,6 +26,7 @@ struct aws_http_connection; struct aws_http_connection_manager; struct aws_host_resolver; struct aws_s3_endpoint; +struct aws_priority_queue; enum aws_s3_connection_finish_code { AWS_S3_CONNECTION_FINISH_CODE_SUCCESS, @@ -181,6 +182,8 @@ struct aws_s3_upload_part_timeout_stats { struct { uint64_t sum_ns; uint64_t num_samples; + bool collecting_p90; + struct aws_priority_queue p90_samples; } initial_request_time; /* Track the timeout rate. */ diff --git a/source/s3_client.c b/source/s3_client.c index c1f2fcf75..52b779f34 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -763,6 +764,9 @@ static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) { void *shutdown_user_data = client->shutdown_callback_user_data; aws_s3_buffer_pool_destroy(client->buffer_pool); + if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) { + aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples); + } aws_mem_release(client->allocator, client->network_interface_names_cursor_array); for (size_t i = 0; i < client->num_network_interface_names; i++) { @@ -2528,6 +2532,13 @@ static uint64_t s_upload_timeout_threshold_ns = 5000000000; /* 5 Secs */ const size_t g_expect_timeout_offset_ms = 700; /* 0.7 Secs. From experiments on c5n.18xlarge machine for 30 GiB upload, it gave us best performance. */ +static int s_compare_uint64_min_heap(const void *a, const void *b) { + uint64_t arg1 = *(const uint64_t *)a; + uint64_t arg2 = *(const uint64_t *)b; + /* Use a min-heap to get the P90, which will be the min of the largest 10% of data. */ + return arg1 > arg2; +} + /** * The upload timeout optimization: explained. * @@ -2548,10 +2559,10 @@ const size_t g_expect_timeout_offset_ms = * would be better off waiting 5sec for the response, vs re-uploading the whole request. * * The current algorithm: - * 1. Start without a timeout value. After 10 requests completed, we know the average of how long the - * request takes. We decide if it's worth to set a timeout value or not. (If the average of request takes more than - * 5 secs or not) TODO: if the client have different part size, this doesn't make sense - * 2. If it is worth to retry, start with a default timeout value, 1 sec. + * 1. Start without a timeout value. After # of ideal connections requests completed, we know the average of how long + * the request takes. We decide if it's worth to set a timeout value or not. (If the average of request takes more + * than 5 secs or not). + * 2. If it is worth to retry, start with a timeout value from P90 of the initial samples. * 3. If a request finishes successfully, use the average response_to_first_byte_time + g_expect_timeout_offset_ms as * our expected timeout value. (TODO: The real expected timeout value should be a P99 of all the requests.) * 3.1 Adjust the current timeout value against the expected timeout value, via 0.99 * + 0.01 * @@ -2589,14 +2600,44 @@ void aws_s3_client_update_upload_part_timeout( case AWS_ERROR_SUCCESS: /* We only interested in request succeed */ stats->num_successful_upload_requests = aws_add_u64_saturating(stats->num_successful_upload_requests, 1); - if (stats->num_successful_upload_requests <= 10) { + if (stats->num_successful_upload_requests <= client->ideal_connection_count) { /* Gether the data */ uint64_t request_time_ns = metrics->time_metrics.receive_end_timestamp_ns - metrics->time_metrics.send_start_timestamp_ns; stats->initial_request_time.sum_ns = aws_add_u64_saturating(stats->initial_request_time.sum_ns, request_time_ns); ++stats->initial_request_time.num_samples; - if (stats->num_successful_upload_requests == 10) { + if (!stats->initial_request_time.collecting_p90) { + /* Initialize the priority queue to collect the p90 of the initial requests. */ + stats->initial_request_time.collecting_p90 = true; + size_t queue_size = client->ideal_connection_count / 10; + /* at least take one */ + queue_size = queue_size == 0 ? 1 : queue_size; + aws_priority_queue_init_dynamic( + &stats->initial_request_time.p90_samples, + client->allocator, + queue_size, + sizeof(uint64_t), + s_compare_uint64_min_heap); + } else { + /* check if the queue is full, if full pop and top and push the next. */ + size_t current_size = aws_priority_queue_size(&stats->initial_request_time.p90_samples); + size_t current_capacity = aws_priority_queue_capacity(&stats->initial_request_time.p90_samples); + if (current_size == current_capacity) { + uint64_t *min = NULL; + aws_priority_queue_top(&stats->initial_request_time.p90_samples, (void **)&min); + if (request_time_ns > *min) { + /* Push the data into the queue if it's larger than the min of the queue. */ + uint64_t pop_val = 0; + aws_priority_queue_pop(&stats->initial_request_time.p90_samples, &pop_val); + aws_priority_queue_push(&stats->initial_request_time.p90_samples, &request_time_ns); + } + } else { + aws_priority_queue_push(&stats->initial_request_time.p90_samples, &request_time_ns); + } + } + + if (stats->num_successful_upload_requests == client->ideal_connection_count) { /* Decide we need a timeout or not */ uint64_t average_request_time_ns = stats->initial_request_time.sum_ns / stats->initial_request_time.num_samples; @@ -2604,8 +2645,15 @@ void aws_s3_client_update_upload_part_timeout( /* We don't need a timeout, as retry will be slower than just wait for the server to response */ stats->stop_timeout = true; } else { - /* Start the timeout at 1 secs */ - aws_atomic_store_int(&client->upload_timeout_ms, 1000); + /* Start the timeout at th p90 of the initial samples. */ + uint64_t *p90_ns = NULL; + aws_priority_queue_top(&stats->initial_request_time.p90_samples, (void **)&p90_ns); + uint64_t p90_ms = + aws_timestamp_convert(*p90_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL); + aws_atomic_store_int(&client->upload_timeout_ms, (size_t)p90_ms); + /* Clean up the queue now, as not needed anymore. */ + aws_priority_queue_clean_up(&stats->initial_request_time.p90_samples); + stats->initial_request_time.collecting_p90 = false; } } goto unlock; diff --git a/tests/s3_client_test.c b/tests/s3_client_test.c index cba98096d..b8a10a2a9 100644 --- a/tests/s3_client_test.c +++ b/tests/s3_client_test.c @@ -41,11 +41,19 @@ static int s_starts_upload_retry(struct aws_s3_client *client, struct aws_s3_req AWS_ZERO_STRUCT(client->synced_data.upload_part_stats); s_init_mock_s3_request_upload_part_timeout(mock_request, 0, average_time_ns, average_time_ns); - for (size_t i = 0; i < 10; i++) { - /* Mock a number of requests completed with the large time for the request */ + size_t init_count = client->ideal_connection_count; + size_t p90_count = init_count / 10 + 1; + for (size_t i = 0; i < init_count - p90_count; i++) { + /* With 90% of the average request time. */ aws_s3_client_update_upload_part_timeout(client, mock_request, AWS_ERROR_SUCCESS); } + uint64_t one_sec_time_ns = aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); /* 1 Secs */ + s_init_mock_s3_request_upload_part_timeout(mock_request, 0, one_sec_time_ns, one_sec_time_ns); + for (size_t i = 0; i < p90_count; i++) { + /* 10 percent of the request takes 1 sec */ + aws_s3_client_update_upload_part_timeout(client, mock_request, AWS_ERROR_SUCCESS); + } /* Check that retry should be turned off */ ASSERT_FALSE(client->synced_data.upload_part_stats.stop_timeout); size_t current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms); @@ -78,7 +86,7 @@ TEST_CASE(client_update_upload_part_timeout) { uint64_t average_time_ns = aws_timestamp_convert( 250, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); /* 0.25 Secs, close to average for upload a part */ - size_t init_count = 10; + size_t init_count = client->ideal_connection_count; { /* 1. If the request time is larger than 5 secs, we don't do retry */ AWS_ZERO_STRUCT(client->synced_data.upload_part_stats); @@ -95,6 +103,35 @@ TEST_CASE(client_update_upload_part_timeout) { ASSERT_TRUE(client->synced_data.upload_part_stats.stop_timeout); size_t current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms); ASSERT_UINT_EQUALS(0, current_timeout_ms); + /* clean up */ + if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) { + aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples); + client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false; + } + } + { + /* 2. Test that the P90 of the init samples are used correctly */ + AWS_ZERO_STRUCT(client->synced_data.upload_part_stats); + /* Hack around to set the ideal connection time for testing. */ + size_t test_init_connection = 1000; + *(uint32_t *)(void *)&client->ideal_connection_count = test_init_connection; + for (size_t i = 0; i < test_init_connection; i++) { + /* Mock a number of requests completed with the large time for the request */ + uint64_t time_ns = aws_timestamp_convert(i, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + s_init_mock_s3_request_upload_part_timeout(&mock_request, 0, time_ns, time_ns); + aws_s3_client_update_upload_part_timeout(client, &mock_request, AWS_ERROR_SUCCESS); + } + + /* Check that retry should be turned off */ + size_t current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms); + ASSERT_UINT_EQUALS(900, current_timeout_ms); + /* clean up */ + if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) { + aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples); + client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false; + } + /* Change it back */ + *(uint32_t *)(void *)&client->ideal_connection_count = init_count; } { @@ -137,6 +174,11 @@ TEST_CASE(client_update_upload_part_timeout) { aws_timestamp_convert(average_time_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL) + g_expect_timeout_offset_ms, current_timeout_ms); + /* clean up */ + if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) { + aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples); + client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false; + } } { @@ -162,6 +204,11 @@ TEST_CASE(client_update_upload_part_timeout) { current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms); /* 1.1 secs, still */ ASSERT_UINT_EQUALS(1100, current_timeout_ms); + /* clean up */ + if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) { + aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples); + client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false; + } } { @@ -224,6 +271,11 @@ TEST_CASE(client_update_upload_part_timeout) { current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms); ASSERT_UINT_EQUALS(3000, current_timeout_ms); ASSERT_FALSE(client->synced_data.upload_part_stats.stop_timeout); + /* clean up */ + if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) { + aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples); + client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false; + } } {