Skip to content

Commit

Permalink
WIP getFirstPart
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 committed Dec 2, 2023
1 parent 4979c06 commit a249a10
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 24 deletions.
1 change: 1 addition & 0 deletions include/aws/s3/private/s3_auto_ranged_get.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum aws_s3_auto_ranged_get_request_type {
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_HEAD_OBJECT,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_INITIAL_MESSAGE,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_PART,
};

struct aws_s3_auto_ranged_get {
Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum aws_s3_errors {
AWS_ERROR_S3_FILE_MODIFIED,
AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT,
AWS_ERROR_S3_INVALID_MEMORY_LIMIT_CONFIG,
AWS_ERROR_S3_PART_TOO_LARGE_FOR_GET_PART,
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};

Expand Down
1 change: 1 addition & 0 deletions source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT, "Request was not created due to used memory exceeding memory limit."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INVALID_MEMORY_LIMIT_CONFIG, "Specified memory configuration is invalid for the system. "
"Memory limit should be at least 1GiB. Part size and max part size should be smaller than memory limit."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_PART_TOO_LARGE_FOR_GET_PART, "First part size is larger than size of part"),
};
/* clang-format on */

Expand Down
45 changes: 25 additions & 20 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ static bool s_s3_auto_ranged_get_update(
if (ticket == NULL) {
goto has_work_remaining;
}

/*TODO: do rangeGet if checksum validation is off */
/* If we aren't using a head object, then discover the size of the object while trying to get the
* first part. */
request = aws_s3_request_new(
meta_request,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_PART,
AWS_S3_REQUEST_TYPE_GET_OBJECT,
1 /*part_number*/,
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);
Expand Down Expand Up @@ -374,19 +374,19 @@ static struct aws_future_void *s_s3_auto_ranged_get_prepare_request(struct aws_s
break;
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART:
// waahm7: TODO refactor it to reduce code duplication
if (request->discovers_object_size && !auto_ranged_get->initial_message_has_range_header) {
message = aws_s3_message_util_copy_http_message_no_body_all_headers(
meta_request->allocator, meta_request->initial_request_message);
if (message) {
aws_s3_message_util_set_multipart_request_path(
meta_request->allocator, NULL, request->part_number, false, message);
}
} else {
message = aws_s3_ranged_get_object_message_new(
meta_request->allocator,
meta_request->initial_request_message,
request->part_range_start,
request->part_range_end);
message = aws_s3_ranged_get_object_message_new(
meta_request->allocator,
meta_request->initial_request_message,
request->part_range_start,
request->part_range_end);

break;
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_PART:
message = aws_s3_message_util_copy_http_message_no_body_all_headers(
meta_request->allocator, meta_request->initial_request_message);
if (message) {
aws_s3_message_util_set_multipart_request_path(
meta_request->allocator, NULL, request->part_number, false, message);
}
break;
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_INITIAL_MESSAGE:
Expand Down Expand Up @@ -555,6 +555,7 @@ static int s_discover_object_range_and_content_length(
}
break;
}
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_PART:

AWS_ASSERT(request->send_data.response_headers != NULL);

Expand Down Expand Up @@ -608,6 +609,7 @@ static void s_s3_auto_ranged_get_request_finished(

bool found_object_size = false;
bool request_failed = error_code != AWS_ERROR_SUCCESS;
bool failed_due_to_part_too_large = true;

if (request->discovers_object_size) {

Expand Down Expand Up @@ -638,9 +640,6 @@ static void s_s3_auto_ranged_get_request_finished(
auto_ranged_get->etag = aws_string_new_from_cursor(auto_ranged_get->base.allocator, &etag_header_value);
}

/* If we were able to discover the object-range/content length successfully, then any error code that was passed
* into this function is being handled and does not indicate an overall failure.*/
error_code = AWS_ERROR_SUCCESS;
found_object_size = true;

if (meta_request->headers_callback != NULL) {
Expand All @@ -649,7 +648,8 @@ static void s_s3_auto_ranged_get_request_finished(
copy_http_headers(request->send_data.response_headers, response_headers);

/* If this request is a part, then the content range isn't applicable. */
if (request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART) {
if (request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART ||
request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_PART) {
/* For now, we can assume that discovery of size via the first part of the object does not apply to
* breaking up a ranged request. If it ever does, then we will need to repopulate this header. */
AWS_ASSERT(!auto_ranged_get->initial_message_has_range_header);
Expand Down Expand Up @@ -685,7 +685,10 @@ static void s_s3_auto_ranged_get_request_finished(
/* If the object range was found, then record it. */
if (found_object_size) {
AWS_ASSERT(!auto_ranged_get->synced_data.object_range_known);

if (error_code == AWS_ERROR_S3_PART_TOO_LARGE_FOR_GET_PART) {
--auto_ranged_get->synced_data.num_parts_requested;
}
error_code = AWS_ERROR_SUCCESS;
auto_ranged_get->synced_data.object_range_known = true;
auto_ranged_get->synced_data.object_range_empty = (total_content_length == 0);
auto_ranged_get->synced_data.object_range_start = object_range_start;
Expand All @@ -699,6 +702,8 @@ static void s_s3_auto_ranged_get_request_finished(
auto_ranged_get->synced_data.head_object_completed = true;
AWS_LOGF_DEBUG(AWS_LS_S3_META_REQUEST, "id=%p Head object completed.", (void *)meta_request);
break;
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_PART:
break;
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART:
++auto_ranged_get->synced_data.num_parts_completed;

Expand Down
37 changes: 35 additions & 2 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ static int s_s3_meta_request_incoming_headers(
size_t headers_count,
void *user_data);

static int s_s3_meta_request_headers_block_done(
struct aws_http_stream *stream,
enum aws_http_header_block header_block,
void *user_data);

static void s_s3_meta_request_stream_metrics(
struct aws_http_stream *stream,
const struct aws_http_stream_metrics *metrics,
Expand Down Expand Up @@ -1046,7 +1051,7 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request,
options.request = request->send_data.message;
options.user_data = connection;
options.on_response_headers = s_s3_meta_request_incoming_headers;
options.on_response_header_block_done = NULL;
options.on_response_header_block_done = s_s3_meta_request_headers_block_done;
options.on_response_body = s_s3_meta_request_incoming_body;
if (request->send_data.metrics) {
options.on_metrics = s_s3_meta_request_stream_metrics;
Expand Down Expand Up @@ -1193,7 +1198,6 @@ static int s_s3_meta_request_incoming_headers(
const struct aws_http_header *headers,
size_t headers_count,
void *user_data) {

(void)header_block;

AWS_PRECONDITION(stream);
Expand Down Expand Up @@ -1260,6 +1264,34 @@ static int s_s3_meta_request_incoming_headers(

return AWS_OP_SUCCESS;
}
static int s_s3_meta_request_headers_block_done(
struct aws_http_stream *stream,
enum aws_http_header_block header_block,
void *user_data) {

AWS_PRECONDITION(stream);

struct aws_s3_connection *connection = user_data;
AWS_PRECONDITION(connection);

struct aws_s3_request *request = connection->request;
AWS_PRECONDITION(request);

struct aws_s3_meta_request *meta_request = request->meta_request;
AWS_PRECONDITION(meta_request);

if (request->request_type == AWS_S3_REQUEST_TYPE_GET_OBJECT &&
request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_PART &&
header_block == AWS_HTTP_HEADER_BLOCK_MAIN) {
uint64_t content_length;
if (aws_s3_parse_content_length_response_header(
request->allocator, request->send_data.response_headers, &content_length) ||
content_length > meta_request->part_size) {
return aws_raise_error(AWS_ERROR_S3_PART_TOO_LARGE_FOR_GET_PART);
}
}
return AWS_OP_SUCCESS;
}

/*
* Small helper to either do a static or dynamic append.
Expand Down Expand Up @@ -1506,6 +1538,7 @@ void aws_s3_meta_request_send_request_finish_default(
/* If the request failed due to an invalid (ie: unrecoverable) response status, or the meta request already
* has a result, then make sure that this request isn't retried. */
if (error_code == AWS_ERROR_S3_INVALID_RESPONSE_STATUS ||
error_code == AWS_ERROR_S3_PART_TOO_LARGE_FOR_GET_PART ||
error_code == AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR || meta_request_finishing) {
finish_code = AWS_S3_CONNECTION_FINISH_CODE_FAILED;

Expand Down
4 changes: 2 additions & 2 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -3602,13 +3602,13 @@ static int s_test_s3_download_multipart_get_single_part_upload(struct aws_alloca
struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));
struct aws_s3_tester_client_options client_options = {
.part_size = MB_TO_BYTES(5),
.part_size = MB_TO_BYTES(3),
};

struct aws_s3_client *client = NULL;
ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client));

struct aws_byte_cursor object_path = aws_byte_cursor_from_c_str("/download/Caltech256/001.ak47/001_0001.jpg");
struct aws_byte_cursor object_path = aws_byte_cursor_from_c_str("/head-object-test/SingleUpload20mbWithCRC32.txt");

// struct aws_s3_tester_meta_request_options put_options = {
// .allocator = allocator,
Expand Down

0 comments on commit a249a10

Please sign in to comment.