From a84dcdfd5c09863f1cfc4864a9fe5bca4049ca7c Mon Sep 17 00:00:00 2001 From: Dengke Date: Fri, 23 Aug 2024 15:24:25 -0700 Subject: [PATCH] address comments and supports backpressure --- source/s3_meta_request.c | 59 ++++++++++++++++++------------- tests/CMakeLists.txt | 3 ++ tests/s3_data_plane_tests.c | 69 ++++++++++++++++++++++++++++++++++--- 3 files changed, 104 insertions(+), 27 deletions(-) diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 1d0e0d08..ccafe63c 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -1841,33 +1841,47 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a if (error_code == AWS_ERROR_SUCCESS && response_body.len > 0) { if (meta_request->meta_request_level_running_response_sum) { - aws_checksum_update(meta_request->meta_request_level_running_response_sum, &response_body); + if (aws_checksum_update( + meta_request->meta_request_level_running_response_sum, &response_body)) { + error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p Failed to update checksum. last error:%s", + (void *)meta_request, + aws_error_name(error_code)); + } } - if (meta_request->recv_file) { - /* Write the data directly to the file. No need to seek, since the event will always be - * delivered with the right order. */ - if (fwrite((void *)response_body.ptr, response_body.len, 1, meta_request->recv_file) < 1) { - int errno_value = ferror(meta_request->recv_file) ? errno : 0; /* Always cache errno */ - error_code = aws_translate_and_raise_io_error_or(errno_value, AWS_ERROR_FILE_WRITE_FAILURE); + if (error_code == AWS_ERROR_SUCCESS) { + if (meta_request->recv_file) { + /* Write the data directly to the file. No need to seek, since the event will always be + * delivered with the right order. */ + if (fwrite((void *)response_body.ptr, response_body.len, 1, meta_request->recv_file) < 1) { + int errno_value = ferror(meta_request->recv_file) ? errno : 0; /* Always cache errno */ + aws_translate_and_raise_io_error_or(errno_value, AWS_ERROR_FILE_WRITE_FAILURE); + error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p Failed writing to file. errno:%d. aws-error:%s", + (void *)meta_request, + errno_value, + aws_error_name(error_code)); + } + if (meta_request->client->enable_read_backpressure) { + aws_s3_meta_request_increment_read_window(meta_request, response_body.len); + } + } else if ( + meta_request->body_callback != NULL && + meta_request->body_callback( + meta_request, &response_body, request->part_range_start, meta_request->user_data)) { + + error_code = aws_last_error_or_unknown(); AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, - "id=%p Failed writing to file. errno:%d. aws-error:%s", + "id=%p Response body callback raised error %d (%s).", (void *)meta_request, - errno_value, - aws_error_name(aws_last_error())); + error_code, + aws_error_str(error_code)); } - } else if ( - meta_request->body_callback != NULL && - meta_request->body_callback( - meta_request, &response_body, request->part_range_start, meta_request->user_data)) { - - error_code = aws_last_error_or_unknown(); - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, - "id=%p Response body callback raised error %d (%s).", - (void *)meta_request, - error_code, - aws_error_str(error_code)); } } aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1); @@ -2059,7 +2073,6 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request fclose(meta_request->recv_file); meta_request->recv_file = NULL; if (finish_result.error_code && meta_request->recv_file_delete_on_failure) { - /* Ignore the failure. Attempt to delete */ aws_file_delete(meta_request->recv_filepath); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 079f9744..423c1bf5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -81,6 +81,9 @@ add_net_test_case(test_s3_default_get_object_looks_like_async_error_xml) add_net_test_case(test_s3_get_object_backpressure_small_increments) add_net_test_case(test_s3_get_object_backpressure_big_increments) add_net_test_case(test_s3_get_object_backpressure_initial_size_zero) +add_net_test_case(test_s3_get_object_backpressure_small_increments_recv_filepath) +add_net_test_case(test_s3_get_object_backpressure_big_increments_recv_filepath) +add_net_test_case(test_s3_get_object_backpressure_initial_size_zero_recv_filepath) add_net_test_case(test_s3_get_object_part) add_net_test_case(test_s3_no_signing) add_net_test_case(test_s3_signing_override) diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 8adb68c4..e36590a2 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -1744,7 +1744,8 @@ static int s_test_s3_get_object_backpressure_helper( struct aws_allocator *allocator, size_t part_size, size_t window_initial_size, - uint64_t window_increment_size) { + uint64_t window_increment_size, + bool file_on_disk) { struct aws_s3_tester tester; ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); @@ -1772,6 +1773,11 @@ static int s_test_s3_get_object_backpressure_helper( .type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT, .message = message, }; + struct aws_string *filepath_str = NULL; + if (file_on_disk) { + filepath_str = aws_s3_tester_create_file(allocator, g_pre_existing_object_1MB, NULL); + options.recv_filepath = aws_byte_cursor_from_string(filepath_str); + } struct aws_s3_meta_request_test_results meta_request_test_results; aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator); @@ -1813,6 +1819,10 @@ static int s_test_s3_get_object_backpressure_helper( client = aws_s3_client_release(client); aws_s3_tester_clean_up(&tester); + if (filepath_str) { + aws_file_delete(filepath_str); + aws_string_destroy(filepath_str); + } return 0; } @@ -1826,7 +1836,8 @@ static int s_test_s3_get_object_backpressure_small_increments(struct aws_allocat size_t part_size = file_size / 4; size_t window_initial_size = 1024; uint64_t window_increment_size = part_size / 2; - return s_test_s3_get_object_backpressure_helper(allocator, part_size, window_initial_size, window_increment_size); + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, false); } AWS_TEST_CASE(test_s3_get_object_backpressure_big_increments, s_test_s3_get_object_backpressure_big_increments) @@ -1838,7 +1849,8 @@ static int s_test_s3_get_object_backpressure_big_increments(struct aws_allocator size_t part_size = file_size / 8; size_t window_initial_size = 1024; uint64_t window_increment_size = part_size * 3; - return s_test_s3_get_object_backpressure_helper(allocator, part_size, window_initial_size, window_increment_size); + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, false); } AWS_TEST_CASE(test_s3_get_object_backpressure_initial_size_zero, s_test_s3_get_object_backpressure_initial_size_zero) @@ -1849,7 +1861,56 @@ static int s_test_s3_get_object_backpressure_initial_size_zero(struct aws_alloca size_t part_size = file_size / 4; size_t window_initial_size = 0; uint64_t window_increment_size = part_size / 2; - return s_test_s3_get_object_backpressure_helper(allocator, part_size, window_initial_size, window_increment_size); + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, false); +} + +AWS_TEST_CASE( + test_s3_get_object_backpressure_small_increments_recv_filepath, + s_test_s3_get_object_backpressure_small_increments_recv_filepath) +static int s_test_s3_get_object_backpressure_small_increments_recv_filepath( + struct aws_allocator *allocator, + void *ctx) { + /* Test increments smaller than part-size. + * Only 1 part at a time should be in flight */ + (void)ctx; + size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */ + size_t part_size = file_size / 4; + size_t window_initial_size = 1024; + uint64_t window_increment_size = part_size / 2; + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, true); +} + +AWS_TEST_CASE( + test_s3_get_object_backpressure_big_increments_recv_filepath, + s_test_s3_get_object_backpressure_big_increments_recv_filepath) +static int s_test_s3_get_object_backpressure_big_increments_recv_filepath(struct aws_allocator *allocator, void *ctx) { + /* Test increments larger than part-size. + * Multiple parts should be in flight at a time */ + (void)ctx; + size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */ + size_t part_size = file_size / 8; + size_t window_initial_size = 1024; + uint64_t window_increment_size = part_size * 3; + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, true); +} + +AWS_TEST_CASE( + test_s3_get_object_backpressure_initial_size_zero_recv_filepath, + s_test_s3_get_object_backpressure_initial_size_zero_recv_filepath) +static int s_test_s3_get_object_backpressure_initial_size_zero_recv_filepath( + struct aws_allocator *allocator, + void *ctx) { + /* Test with initial window size of zero */ + (void)ctx; + size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */ + size_t part_size = file_size / 4; + size_t window_initial_size = 0; + uint64_t window_increment_size = part_size / 2; + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, true); } AWS_TEST_CASE(test_s3_get_object_part, s_test_s3_get_object_part)