diff --git a/include/aws/mqtt/private/request-response/protocol_adapter.h b/include/aws/mqtt/private/request-response/protocol_adapter.h index f5b784c2..a4896c68 100644 --- a/include/aws/mqtt/private/request-response/protocol_adapter.h +++ b/include/aws/mqtt/private/request-response/protocol_adapter.h @@ -102,8 +102,9 @@ struct aws_protocol_adapter_connection_event { bool joined_session; }; -typedef void( - aws_protocol_adapter_subscription_event_fn)(const struct aws_protocol_adapter_subscription_event *event, void *user_data); +typedef void(aws_protocol_adapter_subscription_event_fn)( + const struct aws_protocol_adapter_subscription_event *event, + void *user_data); typedef void(aws_protocol_adapter_incoming_publish_fn)( const struct aws_protocol_adapter_incoming_publish_event *publish, @@ -111,8 +112,9 @@ typedef void(aws_protocol_adapter_incoming_publish_fn)( typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data); -typedef void( - aws_protocol_adapter_connection_event_fn)(const struct aws_protocol_adapter_connection_event *event, void *user_data); +typedef void(aws_protocol_adapter_connection_event_fn)( + const struct aws_protocol_adapter_connection_event *event, + void *user_data); /* * Set of callbacks invoked by the protocol adapter. These must all be set. diff --git a/include/aws/mqtt/private/request-response/request_response_client.h b/include/aws/mqtt/private/request-response/request_response_client.h deleted file mode 100644 index 31f90c9f..00000000 --- a/include/aws/mqtt/private/request-response/request_response_client.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H -#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H - -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include - -struct aws_mqtt_protocol_adapter; -struct aws_mqtt_protocol_adapter_options; -struct aws_mqtt_request_response_client; -struct aws_mqtt_request_response_client_options; - -struct aws_protocol_adapter_factory_options { - struct aws_event_loop *loop; - void *creation_context; - struct aws_mqtt_protocol_adapter * (*mqtt_protocol_adaptor_factory_fn)(struct aws_mqtt_request_response_client *, struct aws_mqtt_protocol_adapter_options *, void *); -}; - -AWS_EXTERN_C_BEGIN - -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_adaptor_factory(struct aws_allocator *allocator, const struct aws_protocol_adapter_factory_options *factory_options, const struct aws_mqtt_request_response_client_options *client_options); - -AWS_EXTERN_C_END - -#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */ diff --git a/include/aws/mqtt/private/request-response/subscription_manager.h b/include/aws/mqtt/private/request-response/subscription_manager.h index 89eb4bff..605a4994 100644 --- a/include/aws/mqtt/private/request-response/subscription_manager.h +++ b/include/aws/mqtt/private/request-response/subscription_manager.h @@ -143,7 +143,7 @@ AWS_EXTERN_C_BEGIN /* * Initializes a subscription manager. Every native request-response client owns a single subscription manager. */ -AWS_MQTT_API int aws_rr_subscription_manager_init( +AWS_MQTT_API void aws_rr_subscription_manager_init( struct aws_rr_subscription_manager *manager, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter *protocol_adapter, @@ -201,6 +201,12 @@ AWS_MQTT_API void aws_rr_subscription_manager_on_protocol_adapter_connection_eve struct aws_rr_subscription_manager *manager, const struct aws_protocol_adapter_connection_event *event); +/* + * Checks subscription manager options for validity. + */ +AWS_MQTT_API bool aws_rr_subscription_manager_are_options_valid( + const struct aws_rr_subscription_manager_options *options); + AWS_EXTERN_C_END #endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H */ diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h index 4e2e9fb4..5e8fa221 100644 --- a/include/aws/mqtt/request-response/request_response_client.h +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -12,21 +12,37 @@ struct aws_mqtt_request_response_client; struct aws_mqtt_client_connection; struct aws_mqtt5_client; +typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data); +typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data); + struct aws_mqtt_request_response_client_options { size_t max_subscriptions; uint32_t operation_timeout_seconds; + + // Do not bind the initialized callback; it exists mostly for tests and should not be exposed + aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback; + + aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback; + void *user_data; }; AWS_EXTERN_C_BEGIN -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options); - -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options); +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client( + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *client, + const struct aws_mqtt_request_response_client_options *options); -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client); +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client( + struct aws_allocator *allocator, + struct aws_mqtt5_client *client, + const struct aws_mqtt_request_response_client_options *options); -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(struct aws_mqtt_request_response_client *client); +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire( + struct aws_mqtt_request_response_client *client); +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release( + struct aws_mqtt_request_response_client *client); AWS_EXTERN_C_END diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 2d106f29..1d313a2b 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -10,11 +10,13 @@ #include #include #include -#include #include #include enum aws_request_response_client_state { + // cross-thread initialization has not completed and all protocol adapter callbacks are ignored + AWS_RRCS_UNINITIALIZED, + AWS_RRCS_ACTIVE, // asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored @@ -35,6 +37,7 @@ struct aws_mqtt_request_response_client { struct aws_event_loop *loop; + struct aws_task initialize_task; struct aws_task external_shutdown_task; struct aws_task internal_shutdown_task; @@ -54,7 +57,14 @@ static void s_aws_rr_client_on_zero_external_ref_count(void *context) { } static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request_response_client *client) { + aws_mqtt_request_response_client_terminated_callback_fn *terminate_callback = client->config.terminated_callback; + void *user_data = client->config.user_data; + aws_mem_release(client->allocator, client); + + if (terminate_callback != NULL) { + (*terminate_callback)(user_data); + } } static void s_mqtt_request_response_client_internal_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { @@ -153,12 +163,23 @@ static void s_aws_rr_client_protocol_adapter_connection_event_callback(const str } static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(struct aws_allocator *allocator, const struct aws_mqtt_request_response_client_options *options, struct aws_event_loop *loop) { + struct aws_rr_subscription_manager_options sm_options = { + .max_subscriptions = options->max_subscriptions, + .operation_timeout_seconds = options->operation_timeout_seconds, + }; + + // we can't initialize the subscription manager until we're running on the event loop, so make sure that + // initialize can't fail by checking its options for validity now. + if (!aws_rr_subscription_manager_are_options_valid(&sm_options)) { + return NULL; + } + struct aws_mqtt_request_response_client *rr_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client)); rr_client->allocator = allocator; rr_client->config = *options; rr_client->loop = loop; - rr_client->state = AWS_RRCS_ACTIVE; + rr_client->state = AWS_RRCS_UNINITIALIZED; aws_task_init(&rr_client->external_shutdown_task, s_mqtt_request_response_client_external_shutdown_task_fn, rr_client, "mqtt_rr_client_external_shutdown"); aws_task_init(&rr_client->internal_shutdown_task, s_mqtt_request_response_client_internal_shutdown_task_fn, rr_client, "mqtt_rr_client_internal_shutdown"); @@ -172,7 +193,7 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie return rr_client; } -static int s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_response_client *rr_client, struct aws_allocator *allocator) { +static void s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_response_client *rr_client, struct aws_allocator *allocator) { struct aws_rr_subscription_manager_options subscription_manager_options = { .operation_timeout_seconds = rr_client->config.operation_timeout_seconds, .max_subscriptions = rr_client->config.max_subscriptions, @@ -180,47 +201,72 @@ static int s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_res .userdata = rr_client, }; - return aws_rr_subscription_manager_init(&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options); + aws_rr_subscription_manager_init(&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options); } -static struct aws_mqtt_protocol_adapter *s_mqtt_protocol_adaptor_factory_from_mqtt311_client(struct aws_mqtt_request_response_client *rr_client, struct aws_mqtt_protocol_adapter_options *adapter_options, void *context) { +static void s_mqtt_request_response_client_initialize_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task; - struct aws_mqtt_client_connection *client = context; + AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED); - return aws_mqtt_protocol_adapter_new_from_311(rr_client->allocator, adapter_options, client); -} + struct aws_mqtt_request_response_client *client = arg; -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options) { + if (client->state == AWS_RRCS_UNINITIALIZED) { + s_aws_rr_client_init_subscription_manager(client, client->allocator); - struct aws_protocol_adapter_factory_options mqtt311_factory_options = { - .loop = aws_mqtt_client_connection_get_event_loop(client), - .creation_context = client, - .mqtt_protocol_adaptor_factory_fn = s_mqtt_protocol_adaptor_factory_from_mqtt311_client, - }; + client->state = AWS_RRCS_ACTIVE; + } - return aws_mqtt_request_response_client_new_from_adaptor_factory(allocator, &mqtt311_factory_options, options); -} + if (client->config.initialized_callback != NULL) { + (*client->config.initialized_callback)(client->config.user_data); + } -static struct aws_mqtt_protocol_adapter *s_mqtt_protocol_adaptor_factory_from_mqtt5_client(struct aws_mqtt_request_response_client *rr_client, struct aws_mqtt_protocol_adapter_options *adapter_options, void *context) { + // give up the internal ref we held while the task was pending + aws_ref_count_release(&client->internal_ref_count); +} - struct aws_mqtt5_client *client = context; +static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client * rr_client) { + // now that it exists, 1 internal ref belongs to protocol adapter termination + aws_ref_count_acquire(&rr_client->internal_ref_count); - return aws_mqtt_protocol_adapter_new_from_5(rr_client->allocator, adapter_options, client); + // 1 internal ref belongs to the initialize task until it runs + aws_ref_count_acquire(&rr_client->internal_ref_count); + aws_task_init(&rr_client->initialize_task, s_mqtt_request_response_client_initialize_task_fn, rr_client, "mqtt_rr_client_initialize"); + aws_event_loop_schedule_task_now(rr_client->loop, &rr_client->initialize_task); } -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options) { +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options) { + + struct aws_mqtt_request_response_client *rr_client = s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client)); - struct aws_protocol_adapter_factory_options mqtt5_factory_options = { - .loop = client->loop, - .creation_context = client, - .mqtt_protocol_adaptor_factory_fn = s_mqtt_protocol_adaptor_factory_from_mqtt5_client, + struct aws_mqtt_protocol_adapter_options adapter_options = { + .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback, + .incoming_publish_callback = s_aws_rr_client_protocol_adapter_incoming_publish_callback, + .terminate_callback = s_aws_rr_client_protocol_adapter_terminate_callback, + .connection_event_callback = s_aws_rr_client_protocol_adapter_connection_event_callback, + .user_data = rr_client, }; - return aws_mqtt_request_response_client_new_from_adaptor_factory(allocator, &mqtt5_factory_options, options); + rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_311(rr_client->allocator, &adapter_options, client); + if (rr_client->client_adapter == NULL) { + goto error; + } + + s_setup_cross_thread_initialization(rr_client); + + return rr_client; + +error: + + // even on construction failures we still need to walk through the async shutdown process + aws_mqtt_request_response_client_release(rr_client); + + return NULL; } -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_adaptor_factory(struct aws_allocator *allocator, const struct aws_protocol_adapter_factory_options *factory_options, const struct aws_mqtt_request_response_client_options *client_options) { - struct aws_mqtt_request_response_client *rr_client = s_aws_mqtt_request_response_client_new(allocator, client_options, factory_options->loop); +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options) { + + struct aws_mqtt_request_response_client * rr_client = s_aws_mqtt_request_response_client_new(allocator, options, client->loop); struct aws_mqtt_protocol_adapter_options adapter_options = { .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback, @@ -230,17 +276,12 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr .user_data = rr_client, }; - rr_client->client_adapter = (*factory_options->mqtt_protocol_adaptor_factory_fn)(rr_client, &adapter_options, factory_options->creation_context); + rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_5(rr_client->allocator, &adapter_options, client); if (rr_client->client_adapter == NULL) { goto error; } - // now that it exists, 1 internal ref belongs to protocol adapter termination - aws_ref_count_acquire(&rr_client->internal_ref_count); - - if (s_aws_rr_client_init_subscription_manager(rr_client, allocator)) { - goto error; - } + s_setup_cross_thread_initialization(rr_client); return rr_client; @@ -252,7 +293,6 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr return NULL; } - struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client) { if (client != NULL) { aws_ref_count_acquire(&client->external_ref_count); diff --git a/source/request-response/subscription_manager.c b/source/request-response/subscription_manager.c index 05286786..57694ddc 100644 --- a/source/request-response/subscription_manager.c +++ b/source/request-response/subscription_manager.c @@ -589,16 +589,22 @@ void aws_rr_subscription_manager_on_protocol_adapter_connection_event( } } -int aws_rr_subscription_manager_init( +bool aws_rr_subscription_manager_are_options_valid(const struct aws_rr_subscription_manager_options *options) { + if (options == NULL || options->max_subscriptions < 1 || options->operation_timeout_seconds == 0) { + return false; + } + + return true; +} + +void aws_rr_subscription_manager_init( struct aws_rr_subscription_manager *manager, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter *protocol_adapter, const struct aws_rr_subscription_manager_options *options) { AWS_ZERO_STRUCT(*manager); - if (options == NULL || options->max_subscriptions < 1 || options->operation_timeout_seconds == 0) { - return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); - } + AWS_FATAL_ASSERT(aws_rr_subscription_manager_are_options_valid(options)); manager->allocator = allocator; manager->config = *options; @@ -614,8 +620,6 @@ int aws_rr_subscription_manager_init( s_aws_rr_subscription_record_destroy); manager->is_protocol_client_connected = aws_mqtt_protocol_adapter_is_connected(protocol_adapter); - - return AWS_OP_SUCCESS; } void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 185cfc5e..4b9141b8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -507,6 +507,10 @@ add_test_case(rrsm_offline_acquire_pending_clean_up_unsubscribe_override) add_test_case(rrsm_acquire_success_offline_online_no_session_subscription_lost_can_reacquire) add_test_case(rrsm_subscription_lost_while_unsubscribing) +# "rrc" = request response client +add_test_case(rrc_mqtt5_create_destroy) +add_test_case(rrc_mqtt311_create_destroy) + generate_test_driver(${PROJECT_NAME}-tests) set(TEST_PAHO_CLIENT_BINARY_NAME ${PROJECT_NAME}-paho-client) diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index 26580c1b..633ee42b 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -1,99 +1,233 @@ /** -* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -* SPDX-License-Identifier: Apache-2.0. -*/ + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ #include -#include -#include #include #include -struct aws_mqtt_protocol_adapter_pinned_mock { +#include "../v3/mqtt311_testing_utils.h" +#include "../v5/mqtt5_testing_utils.h" + +enum rr_test_client_protocol { + RRCP_MQTT311, + RRCP_MQTT5, +}; + +struct aws_rr_client_test_fixture { struct aws_allocator *allocator; - struct aws_mqtt_protocol_adapter base; - struct aws_event_loop *loop; + struct aws_mqtt_request_response_client *client; + + enum rr_test_client_protocol test_protocol; + union { + struct aws_mqtt5_client_mock_test_fixture mqtt5_test_fixture; + struct mqtt_connection_state_test mqtt311_test_fixture; + } client_test_fixture; + void *test_context; - bool is_connected; + + struct aws_mutex lock; + struct aws_condition_variable signal; + + bool client_initialized; + bool client_destroyed; }; -static void s_aws_mqtt_protocol_adapter_pinned_mock_destroy(void *impl) { - struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; +static void s_aws_rr_client_test_fixture_on_initialized(void *user_data) { + struct aws_rr_client_test_fixture *fixture = user_data; - aws_mem_release(adapter->allocator, adapter); + aws_mutex_lock(&fixture->lock); + fixture->client_initialized = true; + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); } -static int s_aws_mqtt_protocol_adapter_pinned_mock_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { - (void)options; +static bool s_rr_client_test_fixture_initialized(void *context) { + struct aws_rr_client_test_fixture *fixture = context; - struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + return fixture->client_initialized; +} - return AWS_OP_SUCCESS; +static void s_aws_rr_client_test_fixture_wait_for_initialized(struct aws_rr_client_test_fixture *fixture) { + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_rr_client_test_fixture_initialized, fixture); + aws_mutex_unlock(&fixture->lock); } -static int s_aws_mqtt_protocol_adapter_pinned_mock_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { - (void)options; +static void s_aws_rr_client_test_fixture_on_terminated(void *user_data) { + struct aws_rr_client_test_fixture *fixture = user_data; + + aws_mutex_lock(&fixture->lock); + fixture->client_destroyed = true; + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static int s_aws_rr_client_test_fixture_init_from_mqtt5( + struct aws_rr_client_test_fixture *fixture, + struct aws_allocator *allocator, + struct aws_mqtt_request_response_client_options *rr_client_options, + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options *client_test_fixture_options, + void *test_context) { + AWS_ZERO_STRUCT(*fixture); + fixture->allocator = allocator; + fixture->test_protocol = RRCP_MQTT5; - struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + aws_mutex_init(&fixture->lock); + aws_condition_variable_init(&fixture->signal); + fixture->test_context = test_context; + + if (aws_mqtt5_client_mock_test_fixture_init( + &fixture->client_test_fixture.mqtt5_test_fixture, allocator, client_test_fixture_options)) { + return AWS_OP_ERR; + } + + struct aws_mqtt_request_response_client_options client_options = { + .max_subscriptions = 3, + .operation_timeout_seconds = 5, + }; + + if (rr_client_options != NULL) { + client_options = *rr_client_options; + } + + client_options.initialized_callback = s_aws_rr_client_test_fixture_on_initialized; + client_options.terminated_callback = s_aws_rr_client_test_fixture_on_terminated; + client_options.user_data = fixture; + + fixture->client = aws_mqtt_request_response_client_new_from_mqtt5_client( + allocator, fixture->client_test_fixture.mqtt5_test_fixture.client, &client_options); + AWS_FATAL_ASSERT(fixture->client != NULL); + + aws_mqtt5_client_start(fixture->client_test_fixture.mqtt5_test_fixture.client); + + aws_wait_for_connected_lifecycle_event(&fixture->client_test_fixture.mqtt5_test_fixture); + s_aws_rr_client_test_fixture_wait_for_initialized(fixture); return AWS_OP_SUCCESS; } -static int s_aws_mqtt_protocol_adapter_pinned_mock_publish(void *impl, struct aws_protocol_adapter_publish_options *options) { - (void)options; +static int s_aws_rr_client_test_fixture_init_from_mqtt311( + struct aws_rr_client_test_fixture *fixture, + struct aws_allocator *allocator, + struct aws_mqtt_request_response_client_options *rr_client_options, + void *test_context) { + AWS_ZERO_STRUCT(*fixture); + fixture->allocator = allocator; + fixture->test_protocol = RRCP_MQTT311; - struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + aws_mutex_init(&fixture->lock); + aws_condition_variable_init(&fixture->signal); + fixture->test_context = test_context; + + aws_test311_setup_mqtt_server_fn(allocator, &fixture->client_test_fixture.mqtt311_test_fixture); + + struct aws_mqtt_request_response_client_options client_options = { + .max_subscriptions = 3, + .operation_timeout_seconds = 5, + }; + + if (rr_client_options != NULL) { + client_options = *rr_client_options; + } + + client_options.initialized_callback = s_aws_rr_client_test_fixture_on_initialized; + client_options.terminated_callback = s_aws_rr_client_test_fixture_on_terminated; + client_options.user_data = fixture; + + struct aws_mqtt_client_connection *mqtt_client = fixture->client_test_fixture.mqtt311_test_fixture.mqtt_connection; + + fixture->client = aws_mqtt_request_response_client_new_from_mqtt311_client(allocator, mqtt_client, &client_options); + AWS_FATAL_ASSERT(fixture->client != NULL); + + struct aws_mqtt_connection_options connection_options = { + .user_data = &fixture->client_test_fixture.mqtt311_test_fixture, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(fixture->client_test_fixture.mqtt311_test_fixture.endpoint.address), + .socket_options = &fixture->client_test_fixture.mqtt311_test_fixture.socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .keep_alive_time_secs = 16960, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(mqtt_client, &connection_options)); + aws_test311_wait_for_connection_to_complete(&fixture->client_test_fixture.mqtt311_test_fixture); + + s_aws_rr_client_test_fixture_wait_for_initialized(fixture); return AWS_OP_SUCCESS; } -static bool s_aws_mqtt_protocol_adapter_pinned_mock_is_connected(void *impl) { - struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; +static bool s_rr_client_test_fixture_terminated(void *context) { + struct aws_rr_client_test_fixture *fixture = context; - return adapter->is_connected; + return fixture->client_destroyed; } +static void s_aws_rr_client_test_fixture_clean_up(struct aws_rr_client_test_fixture *fixture) { + aws_mqtt_request_response_client_release(fixture->client); -static struct aws_mqtt_protocol_adapter_vtable s_default_protocol_adapter_pinned_mock_vtable = { - .aws_mqtt_protocol_adapter_destroy_fn = s_aws_mqtt_protocol_adapter_pinned_mock_destroy, - .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_pinned_mock_subscribe, - .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_pinned_mock_unsubscribe, - .aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_pinned_mock_publish, - .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_pinned_mock_is_connected, -}; + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_rr_client_test_fixture_terminated, fixture); + aws_mutex_unlock(&fixture->lock); -static struct aws_mqtt_protocol_adapter_pinned_mock *s_aws_mqtt_protocol_adapter_new_pinned_mock( - struct aws_allocator *allocator, - const struct aws_mqtt_protocol_adapter_vtable *vtable, - struct aws_event_loop *loop, - void *test_context) { - struct aws_mqtt_protocol_adapter_pinned_mock *adapter = - aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_pinned_mock)); - - adapter->allocator = allocator; - adapter->test_context = test_context; - adapter->loop = loop; - adapter->base.impl = adapter; - if (vtable != NULL) { - adapter->base.vtable = vtable; + if (fixture->test_protocol == RRCP_MQTT5) { + aws_mqtt5_client_mock_test_fixture_clean_up(&fixture->client_test_fixture.mqtt5_test_fixture); } else { - adapter->base.vtable = &s_default_protocol_adapter_pinned_mock_vtable; + struct mqtt_connection_state_test *mqtt311_test_fixture = &fixture->client_test_fixture.mqtt311_test_fixture; + aws_mqtt_client_connection_disconnect( + mqtt311_test_fixture->mqtt_connection, aws_test311_on_disconnect_fn, mqtt311_test_fixture); + aws_test311_clean_up_mqtt_server_fn( + fixture->allocator, AWS_OP_SUCCESS, &fixture->client_test_fixture.mqtt311_test_fixture); } - return adapter; + aws_mutex_clean_up(&fixture->lock); + aws_condition_variable_clean_up(&fixture->signal); } -struct aws_rr_client_test_fixture { - struct aws_allocator *allocator; +static int s_rrc_mqtt5_create_destroy_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; - struct aws_event_loop_group *elg; - struct aws_event_loop *loop; + aws_mqtt_library_init(allocator); - struct aws_request_response_client *client; + struct mqtt5_client_test_options client_test_options; + aws_mqtt5_client_test_init_default_options(&client_test_options); - struct aws_mqtt_protocol_adapter_pinned_mock *mock_adapter; + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options client_test_fixture_options = { + .client_options = &client_test_options.client_options, + .server_function_table = &client_test_options.server_function_table, + }; - void *test_context; -}; \ No newline at end of file + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS( + s_aws_rr_client_test_fixture_init_from_mqtt5(&fixture, allocator, NULL, &client_test_fixture_options, NULL)); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrc_mqtt5_create_destroy, s_rrc_mqtt5_create_destroy_fn) + +static int s_rrc_mqtt311_create_destroy_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_aws_rr_client_test_fixture_init_from_mqtt311(&fixture, allocator, NULL, NULL)); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrc_mqtt311_create_destroy, s_rrc_mqtt311_create_destroy_fn) \ No newline at end of file diff --git a/tests/request-response/subscription_manager_tests.c b/tests/request-response/subscription_manager_tests.c index ffba44f2..2ffd58cf 100644 --- a/tests/request-response/subscription_manager_tests.c +++ b/tests/request-response/subscription_manager_tests.c @@ -77,7 +77,9 @@ static void s_aws_mqtt_protocol_adapter_mock_destroy(void *impl) { aws_mem_release(adapter->allocator, adapter); } -static int s_aws_mqtt_protocol_adapter_mock_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { +static int s_aws_mqtt_protocol_adapter_mock_subscribe( + void *impl, + struct aws_protocol_adapter_subscribe_options *options) { struct aws_mqtt_protocol_adapter_mock_impl *adapter = impl; struct aws_protocol_adapter_api_record record; @@ -89,7 +91,9 @@ static int s_aws_mqtt_protocol_adapter_mock_subscribe(void *impl, struct aws_pro return AWS_OP_SUCCESS; } -static int s_aws_mqtt_protocol_adapter_mock_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { +static int s_aws_mqtt_protocol_adapter_mock_unsubscribe( + void *impl, + struct aws_protocol_adapter_unsubscribe_options *options) { struct aws_mqtt_protocol_adapter_mock_impl *adapter = impl; struct aws_protocol_adapter_api_record record; @@ -292,8 +296,8 @@ static int s_aws_subscription_manager_test_fixture_init( .operation_timeout_seconds = options->operation_timeout_seconds, .subscription_status_callback = s_aws_rr_subscription_status_event_test_callback_fn, .userdata = fixture}; - ASSERT_SUCCESS(aws_rr_subscription_manager_init( - &fixture->subscription_manager, allocator, fixture->mock_protocol_adapter, &subscription_manager_options)); + aws_rr_subscription_manager_init( + &fixture->subscription_manager, allocator, fixture->mock_protocol_adapter, &subscription_manager_options); return AWS_OP_SUCCESS; } @@ -1399,8 +1403,8 @@ static int s_do_rrsm_acquire_clean_up_test( .subscription_status_callback = s_aws_rr_subscription_status_event_test_callback_fn, .userdata = &fixture, }; - ASSERT_SUCCESS(aws_rr_subscription_manager_init( - &fixture.subscription_manager, allocator, fixture.mock_protocol_adapter, &subscription_manager_options)); + aws_rr_subscription_manager_init( + &fixture.subscription_manager, allocator, fixture.mock_protocol_adapter, &subscription_manager_options); s_aws_subscription_manager_test_fixture_clean_up(&fixture); aws_mqtt_library_clean_up();