Skip to content

Commit

Permalink
address comments and supports backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK committed Aug 23, 2024
1 parent b93ce63 commit a84dcdf
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 27 deletions.
59 changes: 36 additions & 23 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1841,33 +1841,47 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a

if (error_code == AWS_ERROR_SUCCESS && response_body.len > 0) {
if (meta_request->meta_request_level_running_response_sum) {
aws_checksum_update(meta_request->meta_request_level_running_response_sum, &response_body);
if (aws_checksum_update(
meta_request->meta_request_level_running_response_sum, &response_body)) {
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Failed to update checksum. last error:%s",
(void *)meta_request,
aws_error_name(error_code));
}
}
if (meta_request->recv_file) {
/* Write the data directly to the file. No need to seek, since the event will always be
* delivered with the right order. */
if (fwrite((void *)response_body.ptr, response_body.len, 1, meta_request->recv_file) < 1) {
int errno_value = ferror(meta_request->recv_file) ? errno : 0; /* Always cache errno */
error_code = aws_translate_and_raise_io_error_or(errno_value, AWS_ERROR_FILE_WRITE_FAILURE);
if (error_code == AWS_ERROR_SUCCESS) {
if (meta_request->recv_file) {
/* Write the data directly to the file. No need to seek, since the event will always be
* delivered with the right order. */
if (fwrite((void *)response_body.ptr, response_body.len, 1, meta_request->recv_file) < 1) {
int errno_value = ferror(meta_request->recv_file) ? errno : 0; /* Always cache errno */
aws_translate_and_raise_io_error_or(errno_value, AWS_ERROR_FILE_WRITE_FAILURE);
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Failed writing to file. errno:%d. aws-error:%s",
(void *)meta_request,
errno_value,
aws_error_name(error_code));
}
if (meta_request->client->enable_read_backpressure) {
aws_s3_meta_request_increment_read_window(meta_request, response_body.len);
}
} else if (
meta_request->body_callback != NULL &&
meta_request->body_callback(
meta_request, &response_body, request->part_range_start, meta_request->user_data)) {

error_code = aws_last_error_or_unknown();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Failed writing to file. errno:%d. aws-error:%s",
"id=%p Response body callback raised error %d (%s).",
(void *)meta_request,
errno_value,
aws_error_name(aws_last_error()));
error_code,
aws_error_str(error_code));
}
} else if (
meta_request->body_callback != NULL &&
meta_request->body_callback(
meta_request, &response_body, request->part_range_start, meta_request->user_data)) {

error_code = aws_last_error_or_unknown();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Response body callback raised error %d (%s).",
(void *)meta_request,
error_code,
aws_error_str(error_code));
}
}
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);
Expand Down Expand Up @@ -2059,7 +2073,6 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
fclose(meta_request->recv_file);
meta_request->recv_file = NULL;
if (finish_result.error_code && meta_request->recv_file_delete_on_failure) {
/* Ignore the failure. Attempt to delete */
aws_file_delete(meta_request->recv_filepath);
}
}
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ add_net_test_case(test_s3_default_get_object_looks_like_async_error_xml)
add_net_test_case(test_s3_get_object_backpressure_small_increments)
add_net_test_case(test_s3_get_object_backpressure_big_increments)
add_net_test_case(test_s3_get_object_backpressure_initial_size_zero)
add_net_test_case(test_s3_get_object_backpressure_small_increments_recv_filepath)
add_net_test_case(test_s3_get_object_backpressure_big_increments_recv_filepath)
add_net_test_case(test_s3_get_object_backpressure_initial_size_zero_recv_filepath)
add_net_test_case(test_s3_get_object_part)
add_net_test_case(test_s3_no_signing)
add_net_test_case(test_s3_signing_override)
Expand Down
69 changes: 65 additions & 4 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,8 @@ static int s_test_s3_get_object_backpressure_helper(
struct aws_allocator *allocator,
size_t part_size,
size_t window_initial_size,
uint64_t window_increment_size) {
uint64_t window_increment_size,
bool file_on_disk) {

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));
Expand Down Expand Up @@ -1772,6 +1773,11 @@ static int s_test_s3_get_object_backpressure_helper(
.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
.message = message,
};
struct aws_string *filepath_str = NULL;
if (file_on_disk) {
filepath_str = aws_s3_tester_create_file(allocator, g_pre_existing_object_1MB, NULL);
options.recv_filepath = aws_byte_cursor_from_string(filepath_str);
}

struct aws_s3_meta_request_test_results meta_request_test_results;
aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator);
Expand Down Expand Up @@ -1813,6 +1819,10 @@ static int s_test_s3_get_object_backpressure_helper(
client = aws_s3_client_release(client);

aws_s3_tester_clean_up(&tester);
if (filepath_str) {
aws_file_delete(filepath_str);
aws_string_destroy(filepath_str);
}

return 0;
}
Expand All @@ -1826,7 +1836,8 @@ static int s_test_s3_get_object_backpressure_small_increments(struct aws_allocat
size_t part_size = file_size / 4;
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(allocator, part_size, window_initial_size, window_increment_size);
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false);
}

AWS_TEST_CASE(test_s3_get_object_backpressure_big_increments, s_test_s3_get_object_backpressure_big_increments)
Expand All @@ -1838,7 +1849,8 @@ static int s_test_s3_get_object_backpressure_big_increments(struct aws_allocator
size_t part_size = file_size / 8;
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size * 3;
return s_test_s3_get_object_backpressure_helper(allocator, part_size, window_initial_size, window_increment_size);
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false);
}

AWS_TEST_CASE(test_s3_get_object_backpressure_initial_size_zero, s_test_s3_get_object_backpressure_initial_size_zero)
Expand All @@ -1849,7 +1861,56 @@ static int s_test_s3_get_object_backpressure_initial_size_zero(struct aws_alloca
size_t part_size = file_size / 4;
size_t window_initial_size = 0;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(allocator, part_size, window_initial_size, window_increment_size);
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false);
}

AWS_TEST_CASE(
test_s3_get_object_backpressure_small_increments_recv_filepath,
s_test_s3_get_object_backpressure_small_increments_recv_filepath)
static int s_test_s3_get_object_backpressure_small_increments_recv_filepath(
struct aws_allocator *allocator,
void *ctx) {
/* Test increments smaller than part-size.
* Only 1 part at a time should be in flight */
(void)ctx;
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
size_t part_size = file_size / 4;
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, true);
}

AWS_TEST_CASE(
test_s3_get_object_backpressure_big_increments_recv_filepath,
s_test_s3_get_object_backpressure_big_increments_recv_filepath)
static int s_test_s3_get_object_backpressure_big_increments_recv_filepath(struct aws_allocator *allocator, void *ctx) {
/* Test increments larger than part-size.
* Multiple parts should be in flight at a time */
(void)ctx;
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
size_t part_size = file_size / 8;
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size * 3;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, true);
}

AWS_TEST_CASE(
test_s3_get_object_backpressure_initial_size_zero_recv_filepath,
s_test_s3_get_object_backpressure_initial_size_zero_recv_filepath)
static int s_test_s3_get_object_backpressure_initial_size_zero_recv_filepath(
struct aws_allocator *allocator,
void *ctx) {
/* Test with initial window size of zero */
(void)ctx;
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
size_t part_size = file_size / 4;
size_t window_initial_size = 0;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, true);
}

AWS_TEST_CASE(test_s3_get_object_part, s_test_s3_get_object_part)
Expand Down

0 comments on commit a84dcdf

Please sign in to comment.