Skip to content

Commit

Permalink
Ref (#284)
Browse files Browse the repository at this point in the history
* rework ref-counting and async shutdown for a number of core types and their dependents
  • Loading branch information
bretambrose authored Sep 3, 2020
1 parent 98b3e49 commit 8f706b2
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 166 deletions.
5 changes: 5 additions & 0 deletions .tsan_suppressions.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# threads created here are not explicitly joined but they are part of a ref-count mechanism that
# decrements (with a possible signal of a shutdown callback based on state and count) on thread
# exit function. For now, there is no reasonable way to integrate thread join into the host resolver
# logic, and so this is a false positive.
thread:create_and_init_host_entry
41 changes: 11 additions & 30 deletions bin/elasticurl/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ struct elasticurl_ctx {
enum aws_log_level log_level;
enum aws_http_version required_http_version;
bool exchange_completed;
bool bootstrap_shutdown_completed;
};

static void s_usage(int exit_code) {
Expand Down Expand Up @@ -539,20 +538,6 @@ static bool s_completion_predicate(void *arg) {
return app_ctx->exchange_completed;
}

static void s_bootstrap_on_shutdown(void *user_data) {
struct elasticurl_ctx *app_ctx = user_data;

aws_mutex_lock(&app_ctx->mutex);
app_ctx->bootstrap_shutdown_completed = true;
aws_mutex_unlock(&app_ctx->mutex);
aws_condition_variable_notify_all(&app_ctx->c_var);
}

static bool s_bootstrap_shutdown_predicate(void *arg) {
struct elasticurl_ctx *app_ctx = arg;
return app_ctx->bootstrap_shutdown_completed;
}

int main(int argc, char **argv) {
struct aws_allocator *allocator = aws_default_allocator();

Expand Down Expand Up @@ -690,17 +675,12 @@ int main(int argc, char **argv) {
}
}

struct aws_event_loop_group el_group;
aws_event_loop_group_default_init(&el_group, allocator, 1);

struct aws_host_resolver resolver;
aws_host_resolver_init_default(&resolver, allocator, 8, &el_group);
struct aws_event_loop_group *el_group = aws_event_loop_group_new_default(allocator, 1, NULL);
struct aws_host_resolver *resolver = aws_host_resolver_new_default(allocator, 8, el_group, NULL);

struct aws_client_bootstrap_options bootstrap_options = {
.event_loop_group = &el_group,
.host_resolver = &resolver,
.on_shutdown_complete = s_bootstrap_on_shutdown,
.user_data = &app_ctx,
.event_loop_group = el_group,
.host_resolver = resolver,
};
struct aws_client_bootstrap *bootstrap = aws_client_bootstrap_new(allocator, &bootstrap_options);

Expand Down Expand Up @@ -731,16 +711,17 @@ int main(int argc, char **argv) {
aws_mutex_unlock(&app_ctx.mutex);

aws_client_bootstrap_release(bootstrap);
aws_mutex_lock(&app_ctx.mutex);
aws_condition_variable_wait_pred(&app_ctx.c_var, &app_ctx.mutex, s_bootstrap_shutdown_predicate, &app_ctx);
aws_mutex_unlock(&app_ctx.mutex);
aws_host_resolver_release(resolver);
aws_event_loop_group_release(el_group);

aws_host_resolver_clean_up(&resolver);
aws_event_loop_group_clean_up(&el_group);
if (aws_global_thread_creator_shutdown_wait_for(5)) {
fprintf(stderr, "Timeout waiting for thread shutdown!");
exit(1);
}

if (tls_ctx) {
aws_tls_connection_options_clean_up(&tls_connection_options);
aws_tls_ctx_destroy(tls_ctx);
aws_tls_ctx_release(tls_ctx);
aws_tls_ctx_options_clean_up(&tls_ctx_options);
}

Expand Down
5 changes: 4 additions & 1 deletion source/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ static void s_http_server_clean_up(struct aws_http_server *server) {
if (!server) {
return;
}

aws_server_bootstrap_release(server->bootstrap);

/* invoke the user callback */
if (server->on_destroy_complete) {
server->on_destroy_complete(server->user_data);
Expand Down Expand Up @@ -578,7 +581,7 @@ struct aws_http_server *aws_http_server_new(const struct aws_http_server_options
}

server->alloc = options->allocator;
server->bootstrap = options->bootstrap;
server->bootstrap = aws_server_bootstrap_acquire(options->bootstrap);
server->is_using_tls = options->tls_options != NULL;
server->initial_window_size = options->initial_window_size;
server->user_data = options->server_user_data;
Expand Down
4 changes: 3 additions & 1 deletion source/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,8 @@ static void s_aws_http_connection_manager_finish_destroy(struct aws_http_connect

aws_mutex_clean_up(&manager->lock);

aws_client_bootstrap_release(manager->bootstrap);

if (manager->shutdown_complete_callback) {
manager->shutdown_complete_callback(manager->shutdown_complete_user_data);
}
Expand Down Expand Up @@ -815,7 +817,7 @@ struct aws_http_connection_manager *aws_http_connection_manager_new(
manager->port = options->port;
manager->max_connections = options->max_connections;
manager->socket_options = *options->socket_options;
manager->bootstrap = options->bootstrap;
manager->bootstrap = aws_client_bootstrap_acquire(options->bootstrap);
manager->system_vtable = g_aws_http_connection_manager_default_system_vtable_ptr;
manager->external_ref_count = 1;
manager->shutdown_complete_callback = options->shutdown_complete_callback;
Expand Down
33 changes: 8 additions & 25 deletions tests/proxy_test_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ void proxy_tester_on_client_connection_shutdown(
aws_condition_variable_notify_one(&tester->wait_cvar);
}

void proxy_tester_on_client_bootstrap_shutdown(void *user_data) {
struct proxy_tester *tester = user_data;
AWS_FATAL_ASSERT(aws_mutex_lock(&tester->wait_lock) == AWS_OP_SUCCESS);

tester->client_bootstrap_is_shutdown = true;

AWS_FATAL_ASSERT(aws_mutex_unlock(&tester->wait_lock) == AWS_OP_SUCCESS);
aws_condition_variable_notify_one(&tester->wait_cvar);
}

int proxy_tester_wait(struct proxy_tester *tester, bool (*pred)(void *user_data)) {
ASSERT_SUCCESS(aws_mutex_lock(&tester->wait_lock));
ASSERT_SUCCESS(aws_condition_variable_wait_pred(&tester->wait_cvar, &tester->wait_lock, pred, tester));
Expand All @@ -95,11 +85,6 @@ bool proxy_tester_request_complete_pred_fn(void *user_data) {
return tester->request_complete || tester->client_connection_is_shutdown;
}

bool proxy_tester_client_bootstrap_shutdown_pred(void *user_data) {
struct proxy_tester *tester = user_data;
return tester->client_bootstrap_is_shutdown;
}

int proxy_tester_init(struct proxy_tester *tester, const struct proxy_tester_options *options) {
AWS_ZERO_STRUCT(*tester);

Expand All @@ -126,8 +111,8 @@ int proxy_tester_init(struct proxy_tester *tester, const struct proxy_tester_opt
ASSERT_SUCCESS(aws_mutex_init(&tester->wait_lock));
ASSERT_SUCCESS(aws_condition_variable_init(&tester->wait_cvar));

ASSERT_SUCCESS(aws_event_loop_group_default_init(&tester->event_loop_group, tester->alloc, 1));
ASSERT_SUCCESS(aws_host_resolver_init_default(&tester->host_resolver, tester->alloc, 8, &tester->event_loop_group));
tester->event_loop_group = aws_event_loop_group_new_default(tester->alloc, 1, NULL);
tester->host_resolver = aws_host_resolver_new_default(tester->alloc, 8, tester->event_loop_group, NULL);

struct aws_socket_options socket_options = {
.type = AWS_SOCKET_STREAM,
Expand All @@ -137,10 +122,8 @@ int proxy_tester_init(struct proxy_tester *tester, const struct proxy_tester_opt
};

struct aws_client_bootstrap_options bootstrap_options = {
.event_loop_group = &tester->event_loop_group,
.host_resolver = &tester->host_resolver,
.on_shutdown_complete = proxy_tester_on_client_bootstrap_shutdown,
.user_data = tester,
.event_loop_group = tester->event_loop_group,
.host_resolver = tester->host_resolver,
};
tester->client_bootstrap = aws_client_bootstrap_new(tester->alloc, &bootstrap_options);
ASSERT_NOT_NULL(tester->client_bootstrap);
Expand Down Expand Up @@ -206,14 +189,14 @@ int proxy_tester_clean_up(struct proxy_tester *tester) {
}

aws_client_bootstrap_release(tester->client_bootstrap);
ASSERT_SUCCESS(proxy_tester_wait(tester, proxy_tester_client_bootstrap_shutdown_pred));

aws_host_resolver_clean_up(&tester->host_resolver);
aws_event_loop_group_clean_up(&tester->event_loop_group);
aws_host_resolver_release(tester->host_resolver);
aws_event_loop_group_release(tester->event_loop_group);
ASSERT_SUCCESS(aws_global_thread_creator_shutdown_wait_for(10));

if (tester->tls_ctx) {
aws_tls_connection_options_clean_up(&tester->tls_connection_options);
aws_tls_ctx_destroy(tester->tls_ctx);
aws_tls_ctx_release(tester->tls_ctx);
aws_tls_ctx_options_clean_up(&tester->tls_ctx_options);
}

Expand Down
5 changes: 2 additions & 3 deletions tests/proxy_test_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ struct proxy_tester_options {
struct proxy_tester {
struct aws_allocator *alloc;
struct aws_logger logger;
struct aws_event_loop_group event_loop_group;
struct aws_host_resolver host_resolver;
struct aws_event_loop_group *event_loop_group;
struct aws_host_resolver *host_resolver;
struct aws_client_bootstrap *client_bootstrap;

struct aws_tls_ctx *tls_ctx;
Expand All @@ -63,7 +63,6 @@ struct proxy_tester {
struct testing_channel *testing_channel;

bool client_connection_is_shutdown;
bool client_bootstrap_is_shutdown;

/* If we need to wait for some async process*/
struct aws_mutex wait_lock;
Expand Down
61 changes: 18 additions & 43 deletions tests/test_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ struct tester_options {
struct tester {
struct aws_allocator *alloc;
struct aws_logger logger;
struct aws_event_loop_group event_loop_group;
struct aws_host_resolver host_resolver;
struct aws_event_loop_group *event_loop_group;
struct aws_host_resolver *host_resolver;
struct aws_server_bootstrap *server_bootstrap;
struct aws_http_server *server;
struct aws_client_bootstrap *client_bootstrap;
Expand All @@ -64,7 +64,6 @@ struct tester {
int server_connection_is_shutdown;
int wait_client_connection_is_shutdown;
int wait_server_connection_is_shutdown;
bool client_bootstrap_is_shutdown;

bool server_is_shutdown;
struct aws_http_connection *new_client_connection;
Expand Down Expand Up @@ -177,15 +176,6 @@ static void s_tester_on_client_connection_shutdown(
AWS_FATAL_ASSERT(aws_mutex_unlock(&tester->wait_lock) == AWS_OP_SUCCESS);
aws_condition_variable_notify_one(&tester->wait_cvar);
}
static void s_tester_on_client_bootstrap_shutdown(void *user_data) {
struct tester *tester = user_data;
AWS_FATAL_ASSERT(aws_mutex_lock(&tester->wait_lock) == AWS_OP_SUCCESS);

tester->client_bootstrap_is_shutdown = true;

AWS_FATAL_ASSERT(aws_mutex_unlock(&tester->wait_lock) == AWS_OP_SUCCESS);
aws_condition_variable_notify_one(&tester->wait_cvar);
}

static int s_tester_wait(struct tester *tester, bool (*pred)(void *user_data)) {
int local_wait_result;
Expand Down Expand Up @@ -230,11 +220,6 @@ static bool s_tester_server_shutdown_pred(void *user_data) {
return tester->server_is_shutdown;
}

static bool s_tester_client_bootstrap_shutdown_pred(void *user_data) {
struct tester *tester = user_data;
return tester->client_bootstrap_is_shutdown;
}

static int s_tester_init(struct tester *tester, const struct tester_options *options) {
AWS_ZERO_STRUCT(*tester);

Expand All @@ -253,9 +238,9 @@ static int s_tester_init(struct tester *tester, const struct tester_options *opt
ASSERT_SUCCESS(aws_mutex_init(&tester->wait_lock));
ASSERT_SUCCESS(aws_condition_variable_init(&tester->wait_cvar));

ASSERT_SUCCESS(aws_event_loop_group_default_init(&tester->event_loop_group, tester->alloc, 1));
ASSERT_SUCCESS(aws_host_resolver_init_default(&tester->host_resolver, tester->alloc, 8, &tester->event_loop_group));
tester->server_bootstrap = aws_server_bootstrap_new(tester->alloc, &tester->event_loop_group);
tester->event_loop_group = aws_event_loop_group_new_default(tester->alloc, 1, NULL);
tester->host_resolver = aws_host_resolver_new_default(tester->alloc, 8, tester->event_loop_group, NULL);
tester->server_bootstrap = aws_server_bootstrap_new(tester->alloc, tester->event_loop_group);
ASSERT_NOT_NULL(tester->server_bootstrap);

struct aws_socket_options socket_options = {
Expand Down Expand Up @@ -295,10 +280,8 @@ static int s_tester_init(struct tester *tester, const struct tester_options *opt
}

struct aws_client_bootstrap_options bootstrap_options = {
.event_loop_group = &tester->event_loop_group,
.host_resolver = &tester->host_resolver,
.on_shutdown_complete = s_tester_on_client_bootstrap_shutdown,
.user_data = tester,
.event_loop_group = tester->event_loop_group,
.host_resolver = tester->host_resolver,
};
tester->client_bootstrap = aws_client_bootstrap_new(tester->alloc, &bootstrap_options);
ASSERT_NOT_NULL(tester->client_bootstrap);
Expand Down Expand Up @@ -335,8 +318,11 @@ static int s_tester_clean_up(struct tester *tester) {
ASSERT_SUCCESS(s_tester_wait(tester, s_tester_server_shutdown_pred));
}
aws_server_bootstrap_release(tester->server_bootstrap);
aws_host_resolver_clean_up(&tester->host_resolver);
aws_event_loop_group_clean_up(&tester->event_loop_group);
aws_client_bootstrap_release(tester->client_bootstrap);
aws_host_resolver_release(tester->host_resolver);
aws_event_loop_group_release(tester->event_loop_group);
ASSERT_SUCCESS(aws_global_thread_creator_shutdown_wait_for(10));

aws_http_library_clean_up();
aws_logger_clean_up(&tester->logger);
aws_mutex_clean_up(&tester->wait_lock);
Expand Down Expand Up @@ -386,9 +372,6 @@ static int s_test_connection_setup_shutdown(struct aws_allocator *allocator, voi
release_all_server_connections(&tester);
ASSERT_SUCCESS(s_tester_wait(&tester, s_tester_connection_shutdown_pred));

aws_client_bootstrap_release(tester.client_bootstrap);
ASSERT_SUCCESS(s_tester_wait(&tester, s_tester_client_bootstrap_shutdown_pred));

ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}
Expand All @@ -413,8 +396,6 @@ static int s_test_connection_destroy_server_with_connection_existing(struct aws_
/* release memory */
release_all_client_connections(&tester);
release_all_server_connections(&tester);
aws_client_bootstrap_release(tester.client_bootstrap);
ASSERT_SUCCESS(s_tester_wait(&tester, s_tester_client_bootstrap_shutdown_pred));

ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
Expand Down Expand Up @@ -458,9 +439,6 @@ static int s_test_connection_destroy_server_with_multiple_connections_existing(
release_all_client_connections(&tester);
release_all_server_connections(&tester);

aws_client_bootstrap_release(tester.client_bootstrap);
ASSERT_SUCCESS(s_tester_wait(&tester, s_tester_client_bootstrap_shutdown_pred));

ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -540,25 +518,24 @@ static int s_test_connection_server_shutting_down_new_connection_setup_fail(
};
/* create a new eventloop for the new connection and block the new connection. Waiting server to begin shutting
* down. */
struct aws_event_loop_group event_loop_group;
ASSERT_SUCCESS(aws_event_loop_group_default_init(&event_loop_group, allocator, 1));
struct aws_event_loop_group *event_loop_group = aws_event_loop_group_new_default(allocator, 1, NULL);

/* get the first eventloop, which will be the eventloop for client to connect */
struct aws_event_loop *current_eventloop = aws_event_loop_group_get_loop_at(&event_loop_group, 0);
struct aws_event_loop *current_eventloop = aws_event_loop_group_get_loop_at(event_loop_group, 0);
struct aws_task *block_task = aws_mem_acquire(allocator, sizeof(struct aws_task));
aws_task_init(block_task, s_block_task, &tester, "wait_a_bit");
aws_event_loop_schedule_task_now(current_eventloop, block_task);

/* get the first eventloop of tester, which will be the eventloop for server listener socket, block the listener
* socket */
struct aws_event_loop *server_eventloop = aws_event_loop_group_get_loop_at(&tester.event_loop_group, 0);
struct aws_event_loop *server_eventloop = aws_event_loop_group_get_loop_at(tester.event_loop_group, 0);
struct aws_task *server_block_task = aws_mem_acquire(allocator, sizeof(struct aws_task));
aws_task_init(server_block_task, s_block_task, &tester, "wait_a_bit");
aws_event_loop_schedule_task_now(server_eventloop, server_block_task);

struct aws_client_bootstrap_options bootstrap_options = {
.event_loop_group = &event_loop_group,
.host_resolver = &tester.host_resolver,
.event_loop_group = event_loop_group,
.host_resolver = tester.host_resolver,
};
struct aws_client_bootstrap *bootstrap = aws_client_bootstrap_new(allocator, &bootstrap_options);
struct aws_http_client_connection_options client_options = AWS_HTTP_CLIENT_CONNECTION_OPTIONS_INIT;
Expand Down Expand Up @@ -603,9 +580,7 @@ static int s_test_connection_server_shutting_down_new_connection_setup_fail(
release_all_client_connections(&tester);
release_all_server_connections(&tester);
aws_client_bootstrap_release(bootstrap);
aws_client_bootstrap_release(tester.client_bootstrap);
ASSERT_SUCCESS(s_tester_wait(&tester, s_tester_client_bootstrap_shutdown_pred));
aws_event_loop_group_clean_up(&event_loop_group);
aws_event_loop_group_release(event_loop_group);
ASSERT_SUCCESS(s_tester_clean_up(&tester));

return AWS_OP_SUCCESS;
Expand Down
Loading

0 comments on commit 8f706b2

Please sign in to comment.