Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriyMusatkin committed Nov 21, 2023
1 parent fccca7c commit 1630751
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 30 deletions.
6 changes: 3 additions & 3 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,6 @@ struct aws_s3_client {
/* Whether or not work processing is currently scheduled. */
uint32_t process_work_task_scheduled : 1;

/* Whether or not work processing is currently scheduled. */
uint32_t trim_buffer_pool_task_scheduled : 1;

/* Whether or not work process is currently in progress. */
uint32_t process_work_task_in_progress : 1;

Expand All @@ -386,6 +383,9 @@ struct aws_s3_client {

/* Number of requests currently being prepared. */
uint32_t num_requests_being_prepared;

/* Whether or not work processing is currently scheduled. */
uint32_t trim_buffer_pool_task_scheduled : 1;
} threaded_data;
};

Expand Down
18 changes: 9 additions & 9 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ static bool s_s3_auto_ranged_get_update(
auto_ranged_get->synced_data.head_object_sent = true;
}
} else if (auto_ranged_get->synced_data.num_parts_requested == 0) {

struct aws_s3_buffer_pool_ticket *ticket =
aws_s3_buffer_pool_reserve(meta_request->client->buffer_pool, meta_request->part_size);

if (ticket == NULL) {
goto has_work_remaining;
}

/* If we aren't using a head object, then discover the size of the object while trying to get the
* first part. */
request = aws_s3_request_new(
Expand All @@ -192,19 +200,11 @@ static bool s_s3_auto_ranged_get_update(
1,
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);

request->ticket = ticket;
request->part_range_start = 0;
request->part_range_end = meta_request->part_size - 1; /* range-end is inclusive */
request->discovers_object_size = true;

struct aws_s3_buffer_pool_ticket *ticket =
aws_s3_buffer_pool_reserve(meta_request->client->buffer_pool, meta_request->part_size);

if (ticket == NULL) {
goto has_work_remaining;
}

request->ticket = ticket;

++auto_ranged_get->synced_data.num_parts_requested;
}

Expand Down
7 changes: 1 addition & 6 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ static bool s_s3_auto_ranged_put_update(
struct aws_s3_buffer_pool_ticket *ticket =
aws_s3_buffer_pool_reserve(meta_request->client->buffer_pool, meta_request->part_size);

if (ticket) {
if (ticket != NULL) {
/* Allocate a request for another part. */
request = aws_s3_request_new(
meta_request,
Expand All @@ -585,11 +585,6 @@ static bool s_s3_auto_ranged_put_update(
(void *)meta_request,
(void *)request,
request->part_number);
} else {
AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p: Failed to allocate due to exceeding memory limit",
(void *)meta_request);
}

goto has_work_remaining;
Expand Down
19 changes: 14 additions & 5 deletions source/s3_buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
struct aws_s3_buffer_pool_ticket {
size_t size;
uint8_t *ptr;
size_t chunks_used;
};

/* Default size for blocks array. Note: this is just for meta info, blocks
Expand Down Expand Up @@ -179,14 +180,16 @@ void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool) {
}

void s_buffer_pool_trim_synced(struct aws_s3_buffer_pool *buffer_pool) {
for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) {
for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks);) {
struct s3_buffer_pool_block *block;
aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i);

if (block->alloc_bit_mask == 0) {
aws_mem_release(buffer_pool->base_allocator, block->block_ptr);
aws_array_list_erase(&buffer_pool->blocks, i);
--i;
/* do not increment since we just released element */
} else {
++i;
}
}
}
Expand Down Expand Up @@ -245,14 +248,16 @@ struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffe

if (ticket == NULL) {
AWS_LOGF_TRACE(
AWS_LS_S3_CLIENT, "Failed to reserve buffer of size %zu. Consider increasing memory limit", size);
AWS_LS_S3_CLIENT, "Memory limit reached while trying to allocate buffer of size %zu. "
"Putting new buffer reservations on hold...", size);
aws_raise_error(AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT);
}
return ticket;
}

bool aws_s3_buffer_pool_has_reservation_hold(struct aws_s3_buffer_pool *buffer_pool) {
AWS_PRECONDITION(buffer_pool);
AWS_LOGF_TRACE(AWS_LS_S3_CLIENT, "Releasing buffer reservation hold.");
return buffer_pool->has_reservation_hold;
}

Expand All @@ -261,14 +266,17 @@ void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffe
buffer_pool->has_reservation_hold = false;
}

static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool, size_t size) {
static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool,
size_t size, size_t *out_chunks_used) {
uint8_t *alloc_ptr = NULL;

size_t chunks_needed = size / buffer_pool->chunk_size;
if (size % buffer_pool->chunk_size != 0) {
++chunks_needed; /* round up */
}
*out_chunks_used = chunks_needed;

/* Look for space in existing blocks */
for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) {
struct s3_buffer_pool_block *block;
aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i);
Expand All @@ -282,6 +290,7 @@ static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool,
}
}

/* No space available. Allocate new block. */
struct s3_buffer_pool_block block;
block.alloc_bit_mask = s_set_bits(0, 0, chunks_needed);
block.block_ptr = aws_mem_acquire(buffer_pool->base_allocator, buffer_pool->block_size);
Expand Down Expand Up @@ -313,7 +322,7 @@ struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer(
aws_mutex_lock(&buffer_pool->mutex);

if (ticket->size <= buffer_pool->primary_size_cutoff) {
alloc_ptr = s_primary_acquire_synced(buffer_pool, ticket->size);
alloc_ptr = s_primary_acquire_synced(buffer_pool, ticket->size, &ticket->chunks_used);
} else {
alloc_ptr = aws_mem_acquire(buffer_pool->base_allocator, ticket->size);
buffer_pool->secondary_reserved -= ticket->size;
Expand Down
16 changes: 9 additions & 7 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ struct aws_s3_client *aws_s3_client_new(

if (client_config->max_part_size != 0 && client_config->memory_limit_in_bytes != 0 &&
client_config->max_part_size >
(client_config->memory_limit_in_bytes / s_default_max_part_size_to_mem_lim_multiplier)) {
s_default_max_part_size_based_on_mem_limit(client_config->memory_limit_in_bytes)) {
AWS_LOGF_ERROR(
AWS_LS_S3_CLIENT,
"Cannot create client from client_config; memory limit should be at least 4 times higher than max part "
Expand Down Expand Up @@ -605,7 +605,7 @@ static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) {

AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client);

if (client->synced_data.trim_buffer_pool_task_scheduled) {
if (client->threaded_data.trim_buffer_pool_task_scheduled) {
aws_event_loop_cancel_task(client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task);
}

Expand Down Expand Up @@ -1195,12 +1195,14 @@ static void s_s3_client_trim_buffer_pool_task(struct aws_task *task, void *arg,
(void)task;
(void)task_status;

if (task_status != AWS_TASK_STATUS_RUN_READY) {
return;
}

struct aws_s3_client *client = arg;
AWS_PRECONDITION(client);

aws_s3_client_lock_synced_data(client);
client->synced_data.trim_buffer_pool_task_scheduled = false;
aws_s3_client_unlock_synced_data(client);
client->threaded_data.trim_buffer_pool_task_scheduled = false;

uint32_t num_reqs_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);

Expand All @@ -1212,7 +1214,7 @@ static void s_s3_client_trim_buffer_pool_task(struct aws_task *task, void *arg,
static void s_s3_client_schedule_buffer_pool_trim_synced(struct aws_s3_client *client) {
ASSERT_SYNCED_DATA_LOCK_HELD(client);

if (client->synced_data.trim_buffer_pool_task_scheduled) {
if (client->threaded_data.trim_buffer_pool_task_scheduled) {
return;
}

Expand All @@ -1235,7 +1237,7 @@ static void s_s3_client_schedule_buffer_pool_trim_synced(struct aws_s3_client *c
aws_event_loop_schedule_task_future(
client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task, trim_time);

client->synced_data.trim_buffer_pool_task_scheduled = true;
client->threaded_data.trim_buffer_pool_task_scheduled = true;
}

void aws_s3_client_schedule_process_work(struct aws_s3_client *client) {
Expand Down
1 change: 1 addition & 0 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ static int s_s3_meta_request_incoming_body(

if (request->send_data.response_body.capacity == 0) {
if (request->has_part_size_response_body) {
AWS_FATAL_ASSERT(request->ticket);
request->send_data.response_body =
aws_s3_buffer_pool_acquire_buffer(request->meta_request->client->buffer_pool, request->ticket);
} else {
Expand Down

0 comments on commit 1630751

Please sign in to comment.