Skip to content

Commit

Permalink
tweak buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriyMusatkin committed Nov 20, 2023
1 parent c600680 commit 849d655
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 36 deletions.
79 changes: 52 additions & 27 deletions source/s3_buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ static size_t s_block_list_initial_capacity = 5;
* client as well as any allocations overruns due to memory waste in the pool. */
static const size_t s_buffer_pool_reserved_mem = MB_TO_BYTES(128);

static const size_t s_chunks_per_block = 16;

struct aws_s3_buffer_pool {
struct aws_allocator *base_allocator;
struct aws_mutex mutex;

size_t block_size;
size_t chunk_size;
/* size at which allocations should go to secondary */
size_t primary_size_cutoff;

Expand All @@ -68,23 +71,33 @@ struct aws_s3_buffer_pool {
size_t secondary_reserved;
size_t secondary_used;

size_t acquire_count;
size_t release_count;

struct aws_array_list blocks;
};

struct s3_buffer_pool_block {
size_t block_size;
uint8_t *block_ptr;
uint8_t *offset_ptr;

size_t alloc_count;
uint16_t alloc_bit_mask;
};

static inline uint16_t s_set_bit_n(uint16_t num, size_t position, size_t n) {
uint16_t mask = ((uint16_t)0x00FF) >> (8 - n) ;
return num | (mask << position) ;
}

static inline uint16_t s_clear_bit_n(uint16_t num, size_t position, size_t n) {
uint16_t mask = ((uint16_t)0x00FF) >> (8 - n) ;
return num & ~ (mask << position);
}

static inline bool s_check_bit_n(uint16_t num, size_t position, size_t n) {
uint16_t mask = ((uint16_t)0x00FF) >> (8 - n) ;
return (num >> position) & mask;
}

struct aws_s3_buffer_pool *aws_s3_buffer_pool_new(
struct aws_allocator *allocator,
size_t block_size,
size_t chunk_size,
size_t mem_limit) {

if (mem_limit < GB_TO_BYTES(1)) {
Expand All @@ -94,10 +107,10 @@ struct aws_s3_buffer_pool *aws_s3_buffer_pool_new(
return NULL;
}

if (!(block_size == 0 || (block_size > 1024 * 1024 && block_size % 1024 == 0))) {
if (!(chunk_size == 0 || (chunk_size > 1024 * 1024 && chunk_size % 1024 == 0))) {
AWS_LOGF_ERROR(
AWS_LS_S3_CLIENT,
"Failed to initialize buffer pool. Block size must be either 0 or more than 1 mb and size must be 1 KB "
"Failed to initialize buffer pool. Chunk size must be either 0 or more than 1 mb and size must be 1 KB "
"aligned.");
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
Expand All @@ -108,11 +121,12 @@ struct aws_s3_buffer_pool *aws_s3_buffer_pool_new(
AWS_FATAL_ASSERT(buffer_pool != NULL);

buffer_pool->base_allocator = allocator;
buffer_pool->block_size = block_size;
buffer_pool->chunk_size = chunk_size;
buffer_pool->block_size = chunk_size * s_chunks_per_block;
/* Somewhat arbitrary number.
* Tries to balance between how many allocations use buffer and buffer space
* being wasted. */
buffer_pool->primary_size_cutoff = block_size / 4;
buffer_pool->primary_size_cutoff = chunk_size * 4;
buffer_pool->mem_limit = mem_limit - s_buffer_pool_reserved_mem;
int mutex_error = aws_mutex_init(&buffer_pool->mutex);
AWS_FATAL_ASSERT(mutex_error == AWS_OP_SUCCESS);
Expand All @@ -132,7 +146,7 @@ void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool) {
struct s3_buffer_pool_block *block;
aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i);

AWS_FATAL_ASSERT(block->alloc_count == 0 && "Allocator still has outstanding blocks");
AWS_FATAL_ASSERT(block->alloc_bit_mask == 0 && "Allocator still has outstanding blocks");
aws_mem_release(buffer_pool->base_allocator, block->block_ptr);
}

Expand All @@ -148,7 +162,7 @@ void s_buffer_pool_trim_synced(struct aws_s3_buffer_pool *buffer_pool) {
struct s3_buffer_pool_block *block;
aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i);

if (block->alloc_count == 0) {
if (block->alloc_bit_mask == 0) {
aws_mem_release(buffer_pool->base_allocator, block->block_ptr);
aws_array_list_erase(&buffer_pool->blocks, i);
--i;
Expand Down Expand Up @@ -229,22 +243,29 @@ void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffe
static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool, size_t size) {
uint8_t *alloc_ptr = NULL;

size_t chunks_needed = size / buffer_pool->chunk_size;
if (size % buffer_pool->chunk_size != 0) {
++chunks_needed; /* round up */
}

for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) {
struct s3_buffer_pool_block *block;
aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i);

if (block->offset_ptr + size <= block->block_ptr + block->block_size) {
alloc_ptr = block->offset_ptr;
block->offset_ptr += size;
block->alloc_count += 1;
goto on_allocated;
for (size_t chunk_i = 0; chunk_i < s_chunks_per_block - chunks_needed + 1; ++chunk_i) {
if (!s_check_bit_n(block->alloc_bit_mask, chunk_i, chunks_needed)) {
alloc_ptr = block->block_ptr + chunk_i * buffer_pool->chunk_size;
block->alloc_bit_mask = s_set_bit_n(block->alloc_bit_mask, chunk_i, chunks_needed);
AWS_LOGF_DEBUG(0, "foo reuse %#010x", block->alloc_bit_mask);
goto on_allocated;
}
}
}

struct s3_buffer_pool_block block;
block.alloc_count = 1;
block.alloc_bit_mask = s_set_bit_n(block.alloc_bit_mask, 0, chunks_needed);
AWS_LOGF_DEBUG(0, "foo new %#010x", block.alloc_bit_mask);
block.block_ptr = aws_mem_acquire(buffer_pool->base_allocator, buffer_pool->block_size);
block.offset_ptr = block.block_ptr + size;
block.block_size = buffer_pool->block_size;
aws_array_list_push_back(&buffer_pool->blocks, &block);
alloc_ptr = block.block_ptr;
Expand Down Expand Up @@ -280,7 +301,6 @@ struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer(
buffer_pool->secondary_used += ticket->size;
}

++buffer_pool->acquire_count;
aws_mutex_unlock(&buffer_pool->mutex);
ticket->ptr = alloc_ptr;

Expand Down Expand Up @@ -310,17 +330,23 @@ void aws_s3_buffer_pool_release_ticket(

aws_mutex_lock(&buffer_pool->mutex);
if (ticket->size <= buffer_pool->primary_size_cutoff) {

size_t chunks_used = ticket->size / buffer_pool->chunk_size;
if (ticket->size % buffer_pool->chunk_size != 0) {
++chunks_used; /* round up */
}

bool found = false;
for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) {
struct s3_buffer_pool_block *block;
aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i);

if (block->block_ptr <= ticket->ptr && block->block_ptr + block->block_size > ticket->ptr) {
block->alloc_count -= 1;
if (block->alloc_count == 0) {
buffer_pool->primary_used -= block->offset_ptr - block->block_ptr;
block->offset_ptr = block->block_ptr;
}
size_t alloc_i = (ticket->ptr - block->block_ptr) / buffer_pool->chunk_size;

block->alloc_bit_mask = s_clear_bit_n(block->alloc_bit_mask, alloc_i, chunks_used);
AWS_LOGF_DEBUG(0, "foo clear %#010x", block->alloc_bit_mask);
buffer_pool->primary_used -= ticket->size;

found = true;
break;
Expand All @@ -334,7 +360,6 @@ void aws_s3_buffer_pool_release_ticket(
}

aws_mem_release(buffer_pool->base_allocator, ticket);
++buffer_pool->release_count;

aws_mutex_unlock(&buffer_pool->mutex);
}
Expand Down
6 changes: 3 additions & 3 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static size_t s_dns_host_address_ttl_seconds = 5 * 60;
static const uint32_t s_default_throughput_failure_interval_seconds = 30;

/* Default size of buffer pool blocks. */
static const size_t s_buffer_pool_default_block_size = MB_TO_BYTES(128);
static const size_t s_buffer_pool_default_chunk_size = MB_TO_BYTES(8);

/* Default multiplier between max part size and memory limit */
static const size_t s_default_max_part_size_to_mem_lim_multiplier = 4;
Expand Down Expand Up @@ -304,7 +304,7 @@ struct aws_s3_client *aws_s3_client_new(
mem_limit = client_config->memory_limit_in_bytes;
}

client->buffer_pool = aws_s3_buffer_pool_new(allocator, s_buffer_pool_default_block_size, mem_limit);
client->buffer_pool = aws_s3_buffer_pool_new(allocator, s_buffer_pool_default_chunk_size, mem_limit);

client->vtable = &s_s3_client_default_vtable;

Expand Down Expand Up @@ -606,7 +606,7 @@ static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) {
AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client);

if (client->synced_data.trim_buffer_pool_task_scheduled) {
aws_event_loop_cancel_task(client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task);
aws_event_loop_cancel_task(client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task);
}

aws_string_destroy(client->region);
Expand Down
12 changes: 7 additions & 5 deletions tests/s3_buffer_pool_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static int s_test_s3_buffer_pool_threaded_allocs_and_frees(struct aws_allocator
(void)allocator;
(void)ctx;

struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(2));
struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(2));

s_thread_test(allocator, s_threaded_alloc_worker, buffer_pool);

Expand All @@ -77,7 +77,7 @@ static int s_test_s3_buffer_pool_limits(struct aws_allocator *allocator, void *c
(void)allocator;
(void)ctx;

struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1));
struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1));

struct aws_s3_buffer_pool_ticket *ticket1 = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(64));
ASSERT_NOT_NULL(ticket1);
Expand Down Expand Up @@ -118,7 +118,7 @@ static int s_test_s3_buffer_pool_trim(struct aws_allocator *allocator, void *ctx
(void)allocator;
(void)ctx;

struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1));
struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1));

struct aws_s3_buffer_pool_ticket *tickets[40];
for (size_t i = 0; i < 40; ++i) {
Expand All @@ -138,6 +138,8 @@ static int s_test_s3_buffer_pool_trim(struct aws_allocator *allocator, void *ctx

struct aws_s3_buffer_pool_usage_stats stats_after = aws_s3_buffer_pool_get_usage(buffer_pool);

AWS_LOGF_DEBUG(0, "foo %zu", stats_after.primary_num_blocks);

ASSERT_TRUE(stats_before.primary_num_blocks > stats_after.primary_num_blocks);

for (size_t i = 20; i < 40; ++i) {
Expand All @@ -154,7 +156,7 @@ static int s_test_s3_buffer_reserve_zero_size(struct aws_allocator *allocator, v
(void)allocator;
(void)ctx;

struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1));
struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1));

ASSERT_NULL(aws_s3_buffer_pool_reserve(buffer_pool, 0));
ASSERT_TRUE(aws_last_error() == AWS_ERROR_INVALID_ARGUMENT);
Expand All @@ -169,7 +171,7 @@ static int s_test_s3_buffer_pool_reservation_hold(struct aws_allocator *allocato
(void)allocator;
(void)ctx;

struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1));
struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1));

struct aws_s3_buffer_pool_ticket *tickets[112];
for (size_t i = 0; i < 112; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion tests/s3_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester
struct aws_s3_client *mock_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client));

mock_client->allocator = allocator;
mock_client->buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(128), GB_TO_BYTES(1));
mock_client->buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1));
mock_client->vtable = &g_aws_s3_client_mock_vtable;

aws_ref_count_init(
Expand Down

0 comments on commit 849d655

Please sign in to comment.