diff --git a/source/s3_buffer_pool.c b/source/s3_buffer_pool.c index 62faa90fe..664eb0bff 100644 --- a/source/s3_buffer_pool.c +++ b/source/s3_buffer_pool.c @@ -49,11 +49,14 @@ static size_t s_block_list_initial_capacity = 5; * client as well as any allocations overruns due to memory waste in the pool. */ static const size_t s_buffer_pool_reserved_mem = MB_TO_BYTES(128); +static const size_t s_chunks_per_block = 16; + struct aws_s3_buffer_pool { struct aws_allocator *base_allocator; struct aws_mutex mutex; size_t block_size; + size_t chunk_size; /* size at which allocations should go to secondary */ size_t primary_size_cutoff; @@ -68,23 +71,33 @@ struct aws_s3_buffer_pool { size_t secondary_reserved; size_t secondary_used; - size_t acquire_count; - size_t release_count; - struct aws_array_list blocks; }; struct s3_buffer_pool_block { size_t block_size; uint8_t *block_ptr; - uint8_t *offset_ptr; - - size_t alloc_count; + uint16_t alloc_bit_mask; }; +static inline uint16_t s_set_bit_n(uint16_t num, size_t position, size_t n) { + uint16_t mask = ((uint16_t)0x00FF) >> (8 - n) ; + return num | (mask << position) ; +} + +static inline uint16_t s_clear_bit_n(uint16_t num, size_t position, size_t n) { + uint16_t mask = ((uint16_t)0x00FF) >> (8 - n) ; + return num & ~ (mask << position); +} + +static inline bool s_check_bit_n(uint16_t num, size_t position, size_t n) { + uint16_t mask = ((uint16_t)0x00FF) >> (8 - n) ; + return (num >> position) & mask; +} + struct aws_s3_buffer_pool *aws_s3_buffer_pool_new( struct aws_allocator *allocator, - size_t block_size, + size_t chunk_size, size_t mem_limit) { if (mem_limit < GB_TO_BYTES(1)) { @@ -94,10 +107,10 @@ struct aws_s3_buffer_pool *aws_s3_buffer_pool_new( return NULL; } - if (!(block_size == 0 || (block_size > 1024 * 1024 && block_size % 1024 == 0))) { + if (!(chunk_size == 0 || (chunk_size > 1024 * 1024 && chunk_size % 1024 == 0))) { AWS_LOGF_ERROR( AWS_LS_S3_CLIENT, - "Failed to initialize buffer pool. Block size must be either 0 or more than 1 mb and size must be 1 KB " + "Failed to initialize buffer pool. Chunk size must be either 0 or more than 1 mb and size must be 1 KB " "aligned."); aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); return NULL; @@ -108,11 +121,12 @@ struct aws_s3_buffer_pool *aws_s3_buffer_pool_new( AWS_FATAL_ASSERT(buffer_pool != NULL); buffer_pool->base_allocator = allocator; - buffer_pool->block_size = block_size; + buffer_pool->chunk_size = chunk_size; + buffer_pool->block_size = chunk_size * s_chunks_per_block; /* Somewhat arbitrary number. * Tries to balance between how many allocations use buffer and buffer space * being wasted. */ - buffer_pool->primary_size_cutoff = block_size / 4; + buffer_pool->primary_size_cutoff = chunk_size * 4; buffer_pool->mem_limit = mem_limit - s_buffer_pool_reserved_mem; int mutex_error = aws_mutex_init(&buffer_pool->mutex); AWS_FATAL_ASSERT(mutex_error == AWS_OP_SUCCESS); @@ -132,7 +146,7 @@ void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool) { struct s3_buffer_pool_block *block; aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); - AWS_FATAL_ASSERT(block->alloc_count == 0 && "Allocator still has outstanding blocks"); + AWS_FATAL_ASSERT(block->alloc_bit_mask == 0 && "Allocator still has outstanding blocks"); aws_mem_release(buffer_pool->base_allocator, block->block_ptr); } @@ -148,7 +162,7 @@ void s_buffer_pool_trim_synced(struct aws_s3_buffer_pool *buffer_pool) { struct s3_buffer_pool_block *block; aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); - if (block->alloc_count == 0) { + if (block->alloc_bit_mask == 0) { aws_mem_release(buffer_pool->base_allocator, block->block_ptr); aws_array_list_erase(&buffer_pool->blocks, i); --i; @@ -229,22 +243,29 @@ void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffe static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool, size_t size) { uint8_t *alloc_ptr = NULL; + size_t chunks_needed = size / buffer_pool->chunk_size; + if (size % buffer_pool->chunk_size != 0) { + ++chunks_needed; /* round up */ + } + for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) { struct s3_buffer_pool_block *block; aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); - if (block->offset_ptr + size <= block->block_ptr + block->block_size) { - alloc_ptr = block->offset_ptr; - block->offset_ptr += size; - block->alloc_count += 1; - goto on_allocated; + for (size_t chunk_i = 0; chunk_i < s_chunks_per_block - chunks_needed + 1; ++chunk_i) { + if (!s_check_bit_n(block->alloc_bit_mask, chunk_i, chunks_needed)) { + alloc_ptr = block->block_ptr + chunk_i * buffer_pool->chunk_size; + block->alloc_bit_mask = s_set_bit_n(block->alloc_bit_mask, chunk_i, chunks_needed); + AWS_LOGF_DEBUG(0, "foo reuse %#010x", block->alloc_bit_mask); + goto on_allocated; + } } } struct s3_buffer_pool_block block; - block.alloc_count = 1; + block.alloc_bit_mask = s_set_bit_n(block.alloc_bit_mask, 0, chunks_needed); + AWS_LOGF_DEBUG(0, "foo new %#010x", block.alloc_bit_mask); block.block_ptr = aws_mem_acquire(buffer_pool->base_allocator, buffer_pool->block_size); - block.offset_ptr = block.block_ptr + size; block.block_size = buffer_pool->block_size; aws_array_list_push_back(&buffer_pool->blocks, &block); alloc_ptr = block.block_ptr; @@ -280,7 +301,6 @@ struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer( buffer_pool->secondary_used += ticket->size; } - ++buffer_pool->acquire_count; aws_mutex_unlock(&buffer_pool->mutex); ticket->ptr = alloc_ptr; @@ -310,17 +330,23 @@ void aws_s3_buffer_pool_release_ticket( aws_mutex_lock(&buffer_pool->mutex); if (ticket->size <= buffer_pool->primary_size_cutoff) { + + size_t chunks_used = ticket->size / buffer_pool->chunk_size; + if (ticket->size % buffer_pool->chunk_size != 0) { + ++chunks_used; /* round up */ + } + bool found = false; for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) { struct s3_buffer_pool_block *block; aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); if (block->block_ptr <= ticket->ptr && block->block_ptr + block->block_size > ticket->ptr) { - block->alloc_count -= 1; - if (block->alloc_count == 0) { - buffer_pool->primary_used -= block->offset_ptr - block->block_ptr; - block->offset_ptr = block->block_ptr; - } + size_t alloc_i = (ticket->ptr - block->block_ptr) / buffer_pool->chunk_size; + + block->alloc_bit_mask = s_clear_bit_n(block->alloc_bit_mask, alloc_i, chunks_used); + AWS_LOGF_DEBUG(0, "foo clear %#010x", block->alloc_bit_mask); + buffer_pool->primary_used -= ticket->size; found = true; break; @@ -334,7 +360,6 @@ void aws_s3_buffer_pool_release_ticket( } aws_mem_release(buffer_pool->base_allocator, ticket); - ++buffer_pool->release_count; aws_mutex_unlock(&buffer_pool->mutex); } diff --git a/source/s3_client.c b/source/s3_client.c index e1c98d5f1..f8aa98066 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -82,7 +82,7 @@ static size_t s_dns_host_address_ttl_seconds = 5 * 60; static const uint32_t s_default_throughput_failure_interval_seconds = 30; /* Default size of buffer pool blocks. */ -static const size_t s_buffer_pool_default_block_size = MB_TO_BYTES(128); +static const size_t s_buffer_pool_default_chunk_size = MB_TO_BYTES(8); /* Default multiplier between max part size and memory limit */ static const size_t s_default_max_part_size_to_mem_lim_multiplier = 4; @@ -304,7 +304,7 @@ struct aws_s3_client *aws_s3_client_new( mem_limit = client_config->memory_limit_in_bytes; } - client->buffer_pool = aws_s3_buffer_pool_new(allocator, s_buffer_pool_default_block_size, mem_limit); + client->buffer_pool = aws_s3_buffer_pool_new(allocator, s_buffer_pool_default_chunk_size, mem_limit); client->vtable = &s_s3_client_default_vtable; @@ -606,7 +606,7 @@ static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) { AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client); if (client->synced_data.trim_buffer_pool_task_scheduled) { - aws_event_loop_cancel_task(client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task); + aws_event_loop_cancel_task(client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task); } aws_string_destroy(client->region); diff --git a/tests/s3_buffer_pool_tests.c b/tests/s3_buffer_pool_tests.c index 24ecdcfc9..0dfb41ac7 100644 --- a/tests/s3_buffer_pool_tests.c +++ b/tests/s3_buffer_pool_tests.c @@ -63,7 +63,7 @@ static int s_test_s3_buffer_pool_threaded_allocs_and_frees(struct aws_allocator (void)allocator; (void)ctx; - struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(2)); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(2)); s_thread_test(allocator, s_threaded_alloc_worker, buffer_pool); @@ -77,7 +77,7 @@ static int s_test_s3_buffer_pool_limits(struct aws_allocator *allocator, void *c (void)allocator; (void)ctx; - struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1)); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); struct aws_s3_buffer_pool_ticket *ticket1 = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(64)); ASSERT_NOT_NULL(ticket1); @@ -118,7 +118,7 @@ static int s_test_s3_buffer_pool_trim(struct aws_allocator *allocator, void *ctx (void)allocator; (void)ctx; - struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1)); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); struct aws_s3_buffer_pool_ticket *tickets[40]; for (size_t i = 0; i < 40; ++i) { @@ -138,6 +138,8 @@ static int s_test_s3_buffer_pool_trim(struct aws_allocator *allocator, void *ctx struct aws_s3_buffer_pool_usage_stats stats_after = aws_s3_buffer_pool_get_usage(buffer_pool); + AWS_LOGF_DEBUG(0, "foo %zu", stats_after.primary_num_blocks); + ASSERT_TRUE(stats_before.primary_num_blocks > stats_after.primary_num_blocks); for (size_t i = 20; i < 40; ++i) { @@ -154,7 +156,7 @@ static int s_test_s3_buffer_reserve_zero_size(struct aws_allocator *allocator, v (void)allocator; (void)ctx; - struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1)); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); ASSERT_NULL(aws_s3_buffer_pool_reserve(buffer_pool, 0)); ASSERT_TRUE(aws_last_error() == AWS_ERROR_INVALID_ARGUMENT); @@ -169,7 +171,7 @@ static int s_test_s3_buffer_pool_reservation_hold(struct aws_allocator *allocato (void)allocator; (void)ctx; - struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1)); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); struct aws_s3_buffer_pool_ticket *tickets[112]; for (size_t i = 0; i < 112; ++i) { diff --git a/tests/s3_tester.c b/tests/s3_tester.c index 3115ce868..a3425cf5f 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -796,7 +796,7 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester struct aws_s3_client *mock_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client)); mock_client->allocator = allocator; - mock_client->buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1)); + mock_client->buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); mock_client->vtable = &g_aws_s3_client_mock_vtable; aws_ref_count_init(