Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriyMusatkin committed Nov 18, 2023
1 parent 3cfceb7 commit cb943cd
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 28 deletions.
38 changes: 25 additions & 13 deletions source/s3_buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct aws_s3_buffer_pool_ticket {

/* Default size for blocks array. Note: this is just for meta info, blocks
* themselves are not preallocated s*/
static size_t s_min_num_blocks = 5;
static size_t s_block_list_initial_capacity = 5;

/* Amount of mem reserved for use outside of buffer pool.
* This is an optimistic upper bound on mem used as we dont track it.
Expand Down Expand Up @@ -109,17 +109,13 @@ struct aws_s3_buffer_pool *aws_s3_buffer_pool_new(
* being wasted. */
buffer_pool->primary_size_cutoff = block_size / 4;
buffer_pool->mem_limit = mem_limit - s_buffer_pool_reserved_mem;
if (aws_mutex_init(&buffer_pool->mutex)) {
goto on_error;
}
int mutex_error = aws_mutex_init(&buffer_pool->mutex);
AWS_ASSERT(mutex_error == AWS_OP_SUCCESS);

aws_array_list_init_dynamic(&buffer_pool->blocks, allocator, s_min_num_blocks, sizeof(struct s3_buffer_pool_block));
aws_array_list_init_dynamic(&buffer_pool->blocks, allocator,
s_block_list_initial_capacity, sizeof(struct s3_buffer_pool_block));

return buffer_pool;

on_error:
aws_mem_release(allocator, buffer_pool);
return NULL;
}

void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool) {
Expand Down Expand Up @@ -163,14 +159,22 @@ void aws_s3_buffer_pool_trim(struct aws_s3_buffer_pool *buffer_pool) {

struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffer_pool *buffer_pool, size_t size) {

if (size == 0) {
AWS_LOGF_ERROR(AWS_LS_S3_CLIENT, "Could not reserve from buffer pool. 0 is not a valid size.");
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}

struct aws_s3_buffer_pool_ticket *ticket = NULL;
aws_mutex_lock(&buffer_pool->mutex);

size_t overall_taken = buffer_pool->primary_used + buffer_pool->primary_reserved +
buffer_pool->secondary_used + buffer_pool->secondary_reserved;

/*
*
* If we are allocating from secondary and there is a lot of unused space in
* primary, trim the primary.
* TODO: something smarter, like partial trim?
*/
if (size > buffer_pool->primary_size_cutoff &&
(size + overall_taken) > buffer_pool->mem_limit &&
Expand Down Expand Up @@ -263,9 +267,17 @@ void aws_s3_buffer_pool_release_ticket(
struct aws_s3_buffer_pool *buffer_pool,
struct aws_s3_buffer_pool_ticket *ticket) {

if (buffer_pool == NULL || ticket == NULL || ticket->ptr == NULL) {
if (ticket != NULL) {
aws_mem_release(buffer_pool->base_allocator, ticket);
if (buffer_pool == NULL || ticket == NULL) {
return;
}

if (ticket->ptr == NULL) {
aws_mem_release(buffer_pool->base_allocator, ticket);
/* Ticket was never used, make sure to clean up reserved count. */
if (ticket->size <= buffer_pool->primary_size_cutoff) {
buffer_pool->primary_reserved -= ticket->size;
} else {
buffer_pool->secondary_reserved -= ticket->size;
}
return;
}
Expand Down
26 changes: 23 additions & 3 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ static const uint32_t s_default_throughput_failure_interval_seconds = 30;
/* Default size of buffer pool blocks. */
static const size_t s_buffer_pool_default_block_size = MB_TO_BYTES(128);

/* Default multiplier between max part size and memory limit */
static const size_t s_default_max_part_size_to_mem_lim_multiplier = 4;

/* Amount of time spent idling before trimming buffer. */
//static const size_t s_buffer_pool_trim_time_offset_in_s = 5;

Expand Down Expand Up @@ -226,6 +229,10 @@ void aws_s3_client_unlock_synced_data(struct aws_s3_client *client) {
aws_mutex_unlock(&client->synced_data.lock);
}

static size_t s_default_max_part_size_based_on_mem_limit(size_t mem_lim) {
return mem_lim / s_default_max_part_size_to_mem_lim_multiplier;
}

struct aws_s3_client *aws_s3_client_new(
struct aws_allocator *allocator,
const struct aws_s3_client_config *client_config) {
Expand All @@ -250,6 +257,15 @@ struct aws_s3_client *aws_s3_client_new(
return NULL;
}

if (client_config->max_part_size != 0 && client_config->memory_limit_in_bytes != 0 &&
client_config->max_part_size > (client_config->memory_limit_in_bytes / s_default_max_part_size_to_mem_lim_multiplier)) {
AWS_LOGF_ERROR(
AWS_LS_S3_CLIENT,
"Cannot create client from client_config; memory limit should be at least 4 times higher than max part size.");
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}

#ifdef BYO_CRYPTO
if (client_config->tls_mode == AWS_MR_TLS_ENABLED && client_config->tls_connection_options == NULL) {
AWS_LOGF_ERROR(
Expand All @@ -269,9 +285,9 @@ struct aws_s3_client *aws_s3_client_new(
if (client_config->memory_limit_in_bytes == 0) {
#if SIZE_BITS == 32
if (client_config->throughput_target_gbps > 25.0) {
mem_limit = GB_TO_BYTES(1);
} else {
mem_limit = GB_TO_BYTES(2);
} else {
mem_limit = GB_TO_BYTES(1);
}
#else
if (client_config->throughput_target_gbps > 75.0) {
Expand Down Expand Up @@ -336,6 +352,11 @@ struct aws_s3_client *aws_s3_client_new(
} else {
*((uint64_t *)&client->max_part_size) = s_default_max_part_size;
}

if (client_config->max_part_size > s_default_max_part_size_based_on_mem_limit(mem_limit)) {
*((uint64_t *)&client->max_part_size) = s_default_max_part_size_based_on_mem_limit(mem_limit);
}

if (client->max_part_size > SIZE_MAX) {
/* For the 32bit max part size to be SIZE_MAX */
*((uint64_t *)&client->max_part_size) = SIZE_MAX;
Expand Down Expand Up @@ -1954,7 +1975,6 @@ void aws_s3_client_notify_connection_finished(
}

if (connection->request != NULL) {
AWS_LOGF_DEBUG(0, "releasing connection hold on req %p", (void *)connection->request);
connection->request = aws_s3_request_release(connection->request);
}

Expand Down
1 change: 0 additions & 1 deletion source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,6 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list);
struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
AWS_FATAL_ASSERT(release_request != NULL);
AWS_LOGF_DEBUG(0, "releasing mr hold on req %p", (void *)release_request);
aws_s3_request_release(release_request);
}

Expand Down
3 changes: 0 additions & 3 deletions source/s3_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ void aws_s3_request_clean_up_send_data(struct aws_s3_request *request) {
metric->time_metrics.total_duration_ns =
metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns;
if (meta_request->telemetry_callback) {
AWS_LOGF_DEBUG(0, "invoking telemetry callback");
meta_request->telemetry_callback(meta_request, metric, meta_request->user_data);
}
request->send_data.metrics = aws_s3_request_metrics_release(metric);
Expand Down Expand Up @@ -118,8 +117,6 @@ struct aws_s3_request *aws_s3_request_release(struct aws_s3_request *request) {
static void s_s3_request_destroy(void *user_data) {
struct aws_s3_request *request = user_data;

AWS_LOGF_DEBUG(0, "killing req %p", (void *)request);

if (request == NULL) {
return;
}
Expand Down
2 changes: 0 additions & 2 deletions tests/s3_buffer_pool_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ static int s_test_s3_buffer_pool_trim(struct aws_allocator *allocator, void *ctx
}

struct aws_s3_buffer_pool_usage_stats stats_before = aws_s3_buffer_pool_get_usage(buffer_pool);
AWS_LOGF_DEBUG(0, "before %zu", stats_before.primary_num_blocks);

for (size_t i = 0; i < 20; ++i) {
aws_s3_buffer_pool_release_ticket(buffer_pool, tickets[i]);
Expand All @@ -137,7 +136,6 @@ static int s_test_s3_buffer_pool_trim(struct aws_allocator *allocator, void *ctx
aws_s3_buffer_pool_trim(buffer_pool);

struct aws_s3_buffer_pool_usage_stats stats_after = aws_s3_buffer_pool_get_usage(buffer_pool);
AWS_LOGF_DEBUG(0, "after %zu", stats_after.primary_num_blocks);

ASSERT_TRUE(stats_before.primary_num_blocks > stats_after.primary_num_blocks);

Expand Down
8 changes: 7 additions & 1 deletion tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -3914,9 +3914,15 @@ static int s_test_s3_meta_request_default(struct aws_allocator *allocator, void

aws_s3_tester_wait_for_meta_request_shutdown(&tester);

/*
* TODO: telemetry is sent from request destructor, http threads hold on to
* req for a little bit after on_req_finished callback and its possible that
* telemetry callback will be invoked after meta reqs on_finished callback.
* Moving the telemetry check to after meta req shutdown callback. Need to
* figure out whether current behavior can be improved.
*/
/* Check the size of the metrics should be the same as the number of
requests, which should be 1 */
AWS_LOGF_DEBUG(0, "Checking for telemetry");
ASSERT_UINT_EQUALS(1, aws_array_list_length(&meta_request_test_results.synced_data.metrics));
struct aws_s3_request_metrics *metrics = NULL;
aws_array_list_back(&meta_request_test_results.synced_data.metrics, (void **)&metrics);
Expand Down
5 changes: 0 additions & 5 deletions tests/s3_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ static void s_s3_test_meta_request_finish(
void *user_data) {
(void)meta_request;

AWS_LOGF_DEBUG(0, "received finish callback");

struct aws_s3_meta_request_test_results *meta_request_test_results = user_data;
struct aws_s3_tester *tester = meta_request_test_results->tester;

Expand Down Expand Up @@ -193,7 +191,6 @@ static void s_s3_test_meta_request_telemetry(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request_metrics *metrics,
void *user_data) {
AWS_LOGF_DEBUG(0, "Received telemetry callback");
(void)meta_request;
struct aws_s3_meta_request_test_results *meta_request_test_results = user_data;
struct aws_s3_tester *tester = meta_request_test_results->tester;
Expand Down Expand Up @@ -231,8 +228,6 @@ static void s_s3_test_meta_request_telemetry(

aws_s3_tester_lock_synced_data(tester);
aws_array_list_push_back(&meta_request_test_results->synced_data.metrics, &metrics);
AWS_LOGF_DEBUG(0, "Added metrics to the list %zu",
aws_array_list_length(&meta_request_test_results->synced_data.metrics));
aws_s3_request_metrics_acquire(metrics);
aws_s3_tester_unlock_synced_data(tester);
}
Expand Down

0 comments on commit cb943cd

Please sign in to comment.