Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 receive filepath #449

Merged
merged 22 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ struct aws_s3_meta_request {

/* running checksum of all the parts of a default get, or ranged get meta request*/
struct aws_s3_checksum *meta_request_level_running_response_sum;

/* The receiving file handler */
FILE *recv_file;
};

/* Info for each part, that we need to remember until we send CompleteMultipartUpload */
Expand Down
8 changes: 8 additions & 0 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,14 @@ struct aws_s3_meta_request_options {
* Do not set the message's body-stream if the body is being passed by other means (see note above) */
struct aws_http_message *message;

/**
* Optional.
* If set, the received data will be written into this file.
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
* the `body_callback` will still be invoked if set.
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
* This gives a better performance when receiving data to write to a file.
*/
struct aws_byte_cursor receive_filepath;
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved

/**
* Optional.
* If set, this file is sent as the request body.
Expand Down
33 changes: 33 additions & 0 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
#include <aws/auth/signing_result.h>
#include <aws/common/clock.h>
#include <aws/common/encoding.h>
#include <aws/common/file.h>
#include <aws/common/string.h>
#include <aws/common/system_info.h>
#include <aws/io/async_stream.h>
#include <aws/io/event_loop.h>
#include <aws/io/retry_strategy.h>
#include <aws/io/socket.h>
#include <aws/io/stream.h>
#include <errno.h>
#include <inttypes.h>

static const size_t s_dynamic_body_initial_buf_size = KB_TO_BYTES(1);
Expand Down Expand Up @@ -234,6 +236,14 @@ int aws_s3_meta_request_init_base(

/* If the request's body is being passed in some other way, set that up.
* (we checked earlier that the request body is not being passed multiple ways) */
if (options->receive_filepath.len > 0) {
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
struct aws_string *file_path = aws_string_new_from_cursor(allocator, &options->receive_filepath);
meta_request->recv_file = aws_fopen(aws_string_c_str(file_path), "wb");
aws_string_destroy(file_path);
if (!meta_request->recv_file) {
goto error;
}
}
if (options->send_filepath.len > 0) {
/* Create parallel read stream from file */
meta_request->request_body_parallel_stream =
Expand Down Expand Up @@ -440,6 +450,10 @@ static void s_s3_meta_request_destroy(void *user_data) {
/* endpoint should have already been released and set NULL by the meta request finish call.
* But call release() again, just in case we're tearing down a half-initialized meta request */
aws_s3_endpoint_release(meta_request->endpoint);
if (meta_request->recv_file) {
fclose(meta_request->recv_file);
meta_request->recv_file = NULL;
}

/* Client may be NULL if meta request failed mid-creation (or this some weird testing mock with no client) */
if (meta_request->client != NULL) {
Expand Down Expand Up @@ -1781,6 +1795,20 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
if (meta_request->meta_request_level_running_response_sum) {
aws_checksum_update(meta_request->meta_request_level_running_response_sum, &response_body);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this isn't part of your PR. But I see 😬 we're not checking aws_checksum_update() for error

}
if (meta_request->recv_file) {
/* Write the data directly to the file. No need to seek, since the event will always be
* delivered with the right order. */
if (fwrite((void *)response_body.ptr, response_body.len, 1, meta_request->recv_file) < 1) {
int errno_value = ferror(meta_request->recv_file) ? errno : 0; /* Always cache errno */
error_code = aws_translate_and_raise_io_error_or(errno_value, AWS_ERROR_FILE_WRITE_FAILURE);
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Failed writing to file. errno:%d. aws-error:%s",
(void *)meta_request,
errno_value,
aws_error_name(aws_last_error()));
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (meta_request->body_callback != NULL &&
meta_request->body_callback(
meta_request, &response_body, request->part_range_start, meta_request->user_data)) {
Expand Down Expand Up @@ -1979,6 +2007,11 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
pending_async_write_waker(pending_async_write_waker_user_data);
}

if (meta_request->recv_file) {
fclose(meta_request->recv_file);
meta_request->recv_file = NULL;
}

while (!aws_linked_list_empty(&release_request_list)) {
struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list);
struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
Expand Down
5 changes: 3 additions & 2 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -4228,10 +4228,10 @@ static int s_test_s3_round_trip_with_filepath_helper(
},
};

ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL));
ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &test_results));
aws_s3_meta_request_test_results_clean_up(&test_results);

/*** GET FILE ***/
aws_s3_meta_request_test_results_clean_up(&test_results);
aws_s3_meta_request_test_results_init(&test_results, allocator);

struct aws_s3_tester_meta_request_options get_options = {
Expand All @@ -4242,6 +4242,7 @@ static int s_test_s3_round_trip_with_filepath_helper(
.get_options =
{
.object_path = object_path,
.file_on_disk = true,
},
};
ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &get_options, &test_results));
Expand Down
60 changes: 49 additions & 11 deletions tests/s3_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ static int s_s3_test_meta_request_body_callback(
struct aws_s3_meta_request_test_results *meta_request_test_results = user_data;
meta_request_test_results->received_body_size += body->len;
aws_atomic_fetch_add(&meta_request_test_results->received_body_size_delta, body->len);
aws_checksum_update(meta_request_test_results->get_object_checksum_crc32c, body);

AWS_LOGF_DEBUG(
AWS_LS_S3_GENERAL,
Expand Down Expand Up @@ -539,13 +540,14 @@ void aws_s3_meta_request_test_results_init(
aws_atomic_init_int(&test_meta_request->received_body_size_delta, 0);
aws_array_list_init_dynamic(
&test_meta_request->synced_data.metrics, allocator, 4, sizeof(struct aws_s3_request_metrics *));
test_meta_request->get_object_checksum_crc32c = aws_checksum_new(allocator, AWS_SCA_CRC32C);
}

void aws_s3_meta_request_test_results_clean_up(struct aws_s3_meta_request_test_results *test_meta_request) {
if (test_meta_request == NULL) {
return;
}

aws_checksum_destroy(test_meta_request->get_object_checksum_crc32c);
aws_http_headers_release(test_meta_request->error_response_headers);
aws_byte_buf_clean_up(&test_meta_request->error_response_body);
aws_string_destroy(test_meta_request->error_response_operation_name);
Expand Down Expand Up @@ -1525,6 +1527,10 @@ int aws_s3_tester_send_meta_request_with_options(
aws_http_message_add_header(message, range_header);
}

if (options->get_options.file_on_disk) {
filepath_str = aws_s3_tester_create_file(allocator, options->get_options.object_path, NULL);
meta_request_options.receive_filepath = aws_byte_cursor_from_string(filepath_str);
}
meta_request_options.message = message;

} else if (
Expand Down Expand Up @@ -1755,6 +1761,33 @@ int aws_s3_tester_send_meta_request_with_options(
ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting));
ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_streaming_response));
ASSERT_SUCCESS(s_tester_check_client_thread_data(client));
if (options->get_options.file_on_disk) {
/* Validate the checksum from the file match the checksum we calculate on stream. */
ASSERT_NOT_NULL(filepath_str);
uint8_t output_from_stream[4] = {0};
struct aws_byte_buf output_from_stream_buf =
aws_byte_buf_from_array(output_from_stream, sizeof(output_from_stream));
output_from_stream_buf.len = 0;
ASSERT_SUCCESS(
aws_checksum_finalize(out_results->get_object_checksum_crc32c, &output_from_stream_buf, 0));
FILE *file = aws_fopen(aws_string_c_str(filepath_str), "rb");
ASSERT_NOT_NULL(file);
int64_t file_length = 0;
ASSERT_SUCCESS(aws_file_get_length(file, &file_length));
struct aws_byte_buf buf;
aws_byte_buf_init(&buf, allocator, (size_t)file_length);
size_t read_length = fread(buf.buffer, 1, (size_t)file_length, file);
ASSERT_INT_EQUALS(file_length, (int64_t)read_length);
buf.len = read_length;
struct aws_byte_cursor file_cursor = aws_byte_cursor_from_buf(&buf);
uint8_t output_from_file[4] = {0};
struct aws_byte_buf output_from_file_buf =
aws_byte_buf_from_array(output_from_file, sizeof(output_from_file));
output_from_file_buf.len = 0;
ASSERT_SUCCESS(aws_checksum_compute(allocator, AWS_SCA_CRC32C, &file_cursor, &output_from_file_buf, 0));
ASSERT_TRUE(aws_byte_buf_eq(&output_from_stream_buf, &output_from_file_buf));
aws_byte_buf_clean_up(&buf);
}
break;
case AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE:
ASSERT_FALSE(out_results->finished_error_code == AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -2154,16 +2187,21 @@ struct aws_string *aws_s3_tester_create_file(
FILE *file = aws_fopen(aws_string_c_str(filepath_str), "wb");
AWS_FATAL_ASSERT(file != NULL);

int64_t stream_length = 0;
AWS_FATAL_ASSERT(aws_input_stream_get_length(input_stream, &stream_length) == AWS_OP_SUCCESS);

struct aws_byte_buf data_buf;
AWS_FATAL_ASSERT(aws_byte_buf_init(&data_buf, allocator, (size_t)stream_length) == AWS_OP_SUCCESS);
AWS_FATAL_ASSERT(aws_input_stream_read(input_stream, &data_buf) == AWS_OP_SUCCESS);
AWS_FATAL_ASSERT((size_t)stream_length == data_buf.len);
AWS_FATAL_ASSERT(data_buf.len == fwrite(data_buf.buffer, 1, data_buf.len, file));
fclose(file);
aws_byte_buf_clean_up(&data_buf);
if (input_stream) {
int64_t stream_length = 0;
AWS_FATAL_ASSERT(aws_input_stream_get_length(input_stream, &stream_length) == AWS_OP_SUCCESS);

struct aws_byte_buf data_buf;
AWS_FATAL_ASSERT(aws_byte_buf_init(&data_buf, allocator, (size_t)stream_length) == AWS_OP_SUCCESS);
AWS_FATAL_ASSERT(aws_input_stream_read(input_stream, &data_buf) == AWS_OP_SUCCESS);
AWS_FATAL_ASSERT((size_t)stream_length == data_buf.len);
AWS_FATAL_ASSERT(data_buf.len == fwrite(data_buf.buffer, 1, data_buf.len, file));
fclose(file);
aws_byte_buf_clean_up(&data_buf);
} else {
/* Create an empty file */
fclose(file);
}

return filepath_str;
}
3 changes: 3 additions & 0 deletions tests/s3_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ struct aws_s3_tester_meta_request_options {
struct aws_byte_cursor object_range;
/* Get the part from S3, starts from 1. 0 means not set. */
int part_number;
bool file_on_disk;
} get_options;

/* Put Object Meta request specific options. */
Expand Down Expand Up @@ -250,6 +251,8 @@ struct aws_s3_meta_request_test_results {
int finished_error_code;
enum aws_s3_checksum_algorithm algorithm;

struct aws_s3_checksum *get_object_checksum_crc32c;

/* Record data from progress_callback() */
struct {
uint64_t content_length; /* Remember progress->content_length */
Expand Down
Loading