From 0da55e2cbdf43095247b51ffc398c7420888fe8a Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 29 Feb 2024 15:53:30 -0800 Subject: [PATCH] Checkpoint --- include/aws/mqtt/private/client_impl_shared.h | 5 + .../request-response/protocol_adapter.h | 9 +- .../request_response_client.h | 28 ++ .../request_response_client.h | 33 +++ source/client.c | 11 +- source/client_impl_shared.c | 4 + source/request-response/protocol_adapter.c | 5 +- .../request_response_client.c | 270 ++++++++++++++++++ source/v5/mqtt5_to_mqtt3_adapter.c | 9 +- ...apter_tests.c => protocol_adapter_tests.c} | 6 +- .../request_response_client_tests.c | 99 +++++++ .../subscription_manager_tests.c | 8 +- 12 files changed, 472 insertions(+), 15 deletions(-) create mode 100644 include/aws/mqtt/private/request-response/request_response_client.h create mode 100644 include/aws/mqtt/request-response/request_response_client.h create mode 100644 source/request-response/request_response_client.c rename tests/request-response/{request_response_protocol_adapter_tests.c => protocol_adapter_tests.c} (99%) create mode 100644 tests/request-response/request_response_client_tests.c diff --git a/include/aws/mqtt/private/client_impl_shared.h b/include/aws/mqtt/private/client_impl_shared.h index fa5cbeb3..248b4658 100644 --- a/include/aws/mqtt/private/client_impl_shared.h +++ b/include/aws/mqtt/private/client_impl_shared.h @@ -123,6 +123,8 @@ struct aws_mqtt_client_connection_vtable { int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats); enum aws_mqtt311_impl_type (*get_impl_type)(const void *impl); + + struct aws_event_loop *(*get_event_loop)(const void *impl); }; struct aws_mqtt_client_connection { @@ -139,4 +141,7 @@ AWS_MQTT_API bool aws_mqtt_compare_uint16_t_eq(const void *a, const void *b); AWS_MQTT_API bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b); +AWS_MQTT_API struct aws_event_loop *aws_mqtt_client_connection_get_event_loop( + const struct aws_mqtt_client_connection *connection); + #endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_SHARED_H */ diff --git a/include/aws/mqtt/private/request-response/protocol_adapter.h b/include/aws/mqtt/private/request-response/protocol_adapter.h index f9e35a51..f5b784c2 100644 --- a/include/aws/mqtt/private/request-response/protocol_adapter.h +++ b/include/aws/mqtt/private/request-response/protocol_adapter.h @@ -103,13 +103,16 @@ struct aws_protocol_adapter_connection_event { }; typedef void( - aws_protocol_adapter_subscription_event_fn)(struct aws_protocol_adapter_subscription_event *event, void *user_data); + aws_protocol_adapter_subscription_event_fn)(const struct aws_protocol_adapter_subscription_event *event, void *user_data); + typedef void(aws_protocol_adapter_incoming_publish_fn)( - struct aws_protocol_adapter_incoming_publish_event *publish, + const struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data); + typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data); + typedef void( - aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data); + aws_protocol_adapter_connection_event_fn)(const struct aws_protocol_adapter_connection_event *event, void *user_data); /* * Set of callbacks invoked by the protocol adapter. These must all be set. diff --git a/include/aws/mqtt/private/request-response/request_response_client.h b/include/aws/mqtt/private/request-response/request_response_client.h new file mode 100644 index 00000000..31f90c9f --- /dev/null +++ b/include/aws/mqtt/private/request-response/request_response_client.h @@ -0,0 +1,28 @@ +#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H +#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +struct aws_mqtt_protocol_adapter; +struct aws_mqtt_protocol_adapter_options; +struct aws_mqtt_request_response_client; +struct aws_mqtt_request_response_client_options; + +struct aws_protocol_adapter_factory_options { + struct aws_event_loop *loop; + void *creation_context; + struct aws_mqtt_protocol_adapter * (*mqtt_protocol_adaptor_factory_fn)(struct aws_mqtt_request_response_client *, struct aws_mqtt_protocol_adapter_options *, void *); +}; + +AWS_EXTERN_C_BEGIN + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_adaptor_factory(struct aws_allocator *allocator, const struct aws_protocol_adapter_factory_options *factory_options, const struct aws_mqtt_request_response_client_options *client_options); + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */ diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h new file mode 100644 index 00000000..4e2e9fb4 --- /dev/null +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -0,0 +1,33 @@ +#ifndef AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H +#define AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include "aws/mqtt/mqtt.h" + +struct aws_mqtt_request_response_client; +struct aws_mqtt_client_connection; +struct aws_mqtt5_client; + +struct aws_mqtt_request_response_client_options { + size_t max_subscriptions; + uint32_t operation_timeout_seconds; +}; + +AWS_EXTERN_C_BEGIN + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options); + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options); + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client); + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(struct aws_mqtt_request_response_client *client); + + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */ diff --git a/source/client.c b/source/client.c index 5ba4ee5c..36c6d134 100644 --- a/source/client.c +++ b/source/client.c @@ -3322,12 +3322,18 @@ static void s_aws_mqtt_client_connection_311_release(void *impl) { aws_ref_count_release(&connection->ref_count); } -enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_3_get_impl(const void *impl) { +static enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_311_get_impl(const void *impl) { (void)impl; return AWS_MQTT311_IT_311_CONNECTION; } +static struct aws_event_loop *s_aws_mqtt_client_connection_311_get_event_loop(const void *impl) { + const struct aws_mqtt_client_connection_311_impl *connection = impl; + + return connection->loop; +} + static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311_vtable = { .acquire_fn = s_aws_mqtt_client_connection_311_acquire, .release_fn = s_aws_mqtt_client_connection_311_release, @@ -3351,7 +3357,8 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311 .unsubscribe_fn = s_aws_mqtt_client_connection_311_unsubscribe, .publish_fn = s_aws_mqtt_client_connection_311_publish, .get_stats_fn = s_aws_mqtt_client_connection_311_get_stats, - .get_impl_type = s_aws_mqtt_client_connection_3_get_impl, + .get_impl_type = s_aws_mqtt_client_connection_311_get_impl, + .get_event_loop = s_aws_mqtt_client_connection_311_get_event_loop, }; static struct aws_mqtt_client_connection_vtable *s_aws_mqtt_client_connection_311_vtable_ptr = diff --git a/source/client_impl_shared.c b/source/client_impl_shared.c index 525fb7e4..019adc5c 100644 --- a/source/client_impl_shared.c +++ b/source/client_impl_shared.c @@ -223,3 +223,7 @@ bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b) { return aws_byte_cursor_eq(a_cursor, b_cursor); } + +struct aws_event_loop *aws_mqtt_client_connection_get_event_loop(const struct aws_mqtt_client_connection *connection) { + return (*connection->vtable->get_event_loop)(connection->impl); +} diff --git a/source/request-response/protocol_adapter.c b/source/request-response/protocol_adapter.c index 8066e9eb..a59f09dc 100644 --- a/source/request-response/protocol_adapter.c +++ b/source/request-response/protocol_adapter.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -428,8 +429,8 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311( } if (aws_mqtt_client_connection_get_impl_type(connection) != AWS_MQTT311_IT_311_CONNECTION) { - aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); - return NULL; + struct aws_mqtt_client_connection_5_impl *adapter_impl = connection->impl; + return aws_mqtt_protocol_adapter_new_from_5(allocator, options, adapter_impl->client); } struct aws_mqtt_client_connection_311_impl *impl = connection->impl; diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c new file mode 100644 index 00000000..2d106f29 --- /dev/null +++ b/source/request-response/request_response_client.c @@ -0,0 +1,270 @@ +/** +* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +* SPDX-License-Identifier: Apache-2.0. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +enum aws_request_response_client_state { + AWS_RRCS_ACTIVE, + + // asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored + AWS_RRCS_SHUTTING_DOWN, +}; + +struct aws_mqtt_request_response_client { + struct aws_allocator *allocator; + + struct aws_ref_count external_ref_count; + struct aws_ref_count internal_ref_count; + + struct aws_mqtt_request_response_client_options config; + + struct aws_mqtt_protocol_adapter *client_adapter; + + struct aws_rr_subscription_manager subscription_manager; + + struct aws_event_loop *loop; + + struct aws_task external_shutdown_task; + struct aws_task internal_shutdown_task; + + enum aws_request_response_client_state state; +}; + +static void s_aws_rr_client_on_zero_internal_ref_count(void *context) { + struct aws_mqtt_request_response_client *client = context; + + aws_event_loop_schedule_task_now(client->loop, &client->internal_shutdown_task); +} + +static void s_aws_rr_client_on_zero_external_ref_count(void *context) { + struct aws_mqtt_request_response_client *client = context; + + aws_event_loop_schedule_task_now(client->loop, &client->external_shutdown_task); +} + +static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request_response_client *client) { + aws_mem_release(client->allocator, client); +} + +static void s_mqtt_request_response_client_internal_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task; + (void)task_status; + + struct aws_mqtt_request_response_client *client = arg; + + /* + * All internal and external refs are gone; it is safe to clean up synchronously. + * + * The subscription manager is cleaned up and the protocol adapter has been shut down. All that's left is to + * free memory. + */ + s_mqtt_request_response_client_final_destroy(client); +} + +static void s_mqtt_request_response_client_external_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task; + + AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED); + + struct aws_mqtt_request_response_client *client = arg; + + /* stop handling adapter event callbacks */ + client->state = AWS_RRCS_SHUTTING_DOWN; + + aws_rr_subscription_manager_clean_up(&client->subscription_manager); + + if (client->client_adapter != NULL) { + aws_mqtt_protocol_adapter_destroy(client->client_adapter); + } + + aws_ref_count_release(&client->internal_ref_count); +} + +static void s_aws_rr_client_subscription_status_event_callback(const struct aws_rr_subscription_status_event *event, void *userdata) { + (void)event; + (void)userdata; + + /* + * We must be on the event loop, but it's safer overall to process this event as a top-level event loop task. The subscription + * manager assumes that we won't call APIs on it while iterating subscription records and listeners. + * + * These tasks hold an internal reference while they exist. + */ + + // NYI +} + +static void s_aws_rr_client_protocol_adapter_subscription_event_callback(const struct aws_protocol_adapter_subscription_event *event, void *user_data) { + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + + if (rr_client->state != AWS_RRCS_ACTIVE) { + return; + } + + aws_rr_subscription_manager_on_protocol_adapter_subscription_event(&rr_client->subscription_manager, event); +} + +static void s_aws_rr_client_protocol_adapter_incoming_publish_callback( + const struct aws_protocol_adapter_incoming_publish_event *publish, + void *user_data) { + (void)publish; + + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + + if (rr_client->state != AWS_RRCS_ACTIVE) { + return; + } + + // NYI +} + +static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) { + struct aws_mqtt_request_response_client *rr_client = user_data; + + // release the internal ref count "held" by the protocol adapter's existence + aws_ref_count_release(&rr_client->internal_ref_count); +} + +static void s_aws_rr_client_protocol_adapter_connection_event_callback(const struct aws_protocol_adapter_connection_event *event, void *user_data) { + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + + if (rr_client->state != AWS_RRCS_ACTIVE) { + return; + } + + aws_rr_subscription_manager_on_protocol_adapter_connection_event(&rr_client->subscription_manager, event); +} + +static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(struct aws_allocator *allocator, const struct aws_mqtt_request_response_client_options *options, struct aws_event_loop *loop) { + struct aws_mqtt_request_response_client *rr_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client)); + + rr_client->allocator = allocator; + rr_client->config = *options; + rr_client->loop = loop; + rr_client->state = AWS_RRCS_ACTIVE; + + aws_task_init(&rr_client->external_shutdown_task, s_mqtt_request_response_client_external_shutdown_task_fn, rr_client, "mqtt_rr_client_external_shutdown"); + aws_task_init(&rr_client->internal_shutdown_task, s_mqtt_request_response_client_internal_shutdown_task_fn, rr_client, "mqtt_rr_client_internal_shutdown"); + + // 1 external ref to the caller + aws_ref_count_init(&rr_client->external_ref_count, rr_client, s_aws_rr_client_on_zero_external_ref_count); + + // 1 internal ref count belongs to ourselves (the external ref count shutdown task) + aws_ref_count_init(&rr_client->internal_ref_count, rr_client, s_aws_rr_client_on_zero_internal_ref_count); + + return rr_client; +} + +static int s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_response_client *rr_client, struct aws_allocator *allocator) { + struct aws_rr_subscription_manager_options subscription_manager_options = { + .operation_timeout_seconds = rr_client->config.operation_timeout_seconds, + .max_subscriptions = rr_client->config.max_subscriptions, + .subscription_status_callback = s_aws_rr_client_subscription_status_event_callback, + .userdata = rr_client, + }; + + return aws_rr_subscription_manager_init(&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options); +} + +static struct aws_mqtt_protocol_adapter *s_mqtt_protocol_adaptor_factory_from_mqtt311_client(struct aws_mqtt_request_response_client *rr_client, struct aws_mqtt_protocol_adapter_options *adapter_options, void *context) { + + struct aws_mqtt_client_connection *client = context; + + return aws_mqtt_protocol_adapter_new_from_311(rr_client->allocator, adapter_options, client); +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options) { + + struct aws_protocol_adapter_factory_options mqtt311_factory_options = { + .loop = aws_mqtt_client_connection_get_event_loop(client), + .creation_context = client, + .mqtt_protocol_adaptor_factory_fn = s_mqtt_protocol_adaptor_factory_from_mqtt311_client, + }; + + return aws_mqtt_request_response_client_new_from_adaptor_factory(allocator, &mqtt311_factory_options, options); +} + +static struct aws_mqtt_protocol_adapter *s_mqtt_protocol_adaptor_factory_from_mqtt5_client(struct aws_mqtt_request_response_client *rr_client, struct aws_mqtt_protocol_adapter_options *adapter_options, void *context) { + + struct aws_mqtt5_client *client = context; + + return aws_mqtt_protocol_adapter_new_from_5(rr_client->allocator, adapter_options, client); +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options) { + + struct aws_protocol_adapter_factory_options mqtt5_factory_options = { + .loop = client->loop, + .creation_context = client, + .mqtt_protocol_adaptor_factory_fn = s_mqtt_protocol_adaptor_factory_from_mqtt5_client, + }; + + return aws_mqtt_request_response_client_new_from_adaptor_factory(allocator, &mqtt5_factory_options, options); +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_adaptor_factory(struct aws_allocator *allocator, const struct aws_protocol_adapter_factory_options *factory_options, const struct aws_mqtt_request_response_client_options *client_options) { + struct aws_mqtt_request_response_client *rr_client = s_aws_mqtt_request_response_client_new(allocator, client_options, factory_options->loop); + + struct aws_mqtt_protocol_adapter_options adapter_options = { + .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback, + .incoming_publish_callback = s_aws_rr_client_protocol_adapter_incoming_publish_callback, + .terminate_callback = s_aws_rr_client_protocol_adapter_terminate_callback, + .connection_event_callback = s_aws_rr_client_protocol_adapter_connection_event_callback, + .user_data = rr_client, + }; + + rr_client->client_adapter = (*factory_options->mqtt_protocol_adaptor_factory_fn)(rr_client, &adapter_options, factory_options->creation_context); + if (rr_client->client_adapter == NULL) { + goto error; + } + + // now that it exists, 1 internal ref belongs to protocol adapter termination + aws_ref_count_acquire(&rr_client->internal_ref_count); + + if (s_aws_rr_client_init_subscription_manager(rr_client, allocator)) { + goto error; + } + + return rr_client; + +error: + + // even on construction failures we still need to walk through the async shutdown process + aws_mqtt_request_response_client_release(rr_client); + + return NULL; +} + + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client) { + if (client != NULL) { + aws_ref_count_acquire(&client->external_ref_count); + } + + return client; +} + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(struct aws_mqtt_request_response_client *client) { + if (client != NULL) { + aws_ref_count_release(&client->external_ref_count); + } + + return NULL; +} diff --git a/source/v5/mqtt5_to_mqtt3_adapter.c b/source/v5/mqtt5_to_mqtt3_adapter.c index 72128a97..6f800431 100644 --- a/source/v5/mqtt5_to_mqtt3_adapter.c +++ b/source/v5/mqtt5_to_mqtt3_adapter.c @@ -2854,12 +2854,18 @@ static uint16_t s_aws_mqtt_5_resubscribe_existing_topics( return 0; } -enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_5_get_impl(const void *impl) { +static enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_5_get_impl(const void *impl) { (void)impl; return AWS_MQTT311_IT_5_ADAPTER; } +static struct aws_event_loop *s_aws_mqtt_client_connection_5_get_event_loop(const void *impl) { + const struct aws_mqtt_client_connection_5_impl *adapter = impl; + + return adapter->client->loop; +} + static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_vtable = { .acquire_fn = s_aws_mqtt_client_connection_5_acquire, .release_fn = s_aws_mqtt_client_connection_5_release, @@ -2884,6 +2890,7 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_v .publish_fn = s_aws_mqtt_client_connection_5_publish, .get_stats_fn = s_aws_mqtt_client_connection_5_get_stats, .get_impl_type = s_aws_mqtt_client_connection_5_get_impl, + .get_event_loop = s_aws_mqtt_client_connection_5_get_event_loop, }; static struct aws_mqtt_client_connection_vtable *s_aws_mqtt_client_connection_5_vtable_ptr = diff --git a/tests/request-response/request_response_protocol_adapter_tests.c b/tests/request-response/protocol_adapter_tests.c similarity index 99% rename from tests/request-response/request_response_protocol_adapter_tests.c rename to tests/request-response/protocol_adapter_tests.c index 2e45e065..ec2d6a1c 100644 --- a/tests/request-response/request_response_protocol_adapter_tests.c +++ b/tests/request-response/protocol_adapter_tests.c @@ -95,7 +95,7 @@ struct aws_request_response_protocol_adapter_test_fixture { }; static void s_rr_mqtt_protocol_adapter_test_on_subscription_event( - struct aws_protocol_adapter_subscription_event *event, + const struct aws_protocol_adapter_subscription_event *event, void *user_data) { struct aws_request_response_protocol_adapter_test_fixture *fixture = user_data; @@ -110,7 +110,7 @@ static void s_rr_mqtt_protocol_adapter_test_on_subscription_event( } static void s_rr_mqtt_protocol_adapter_test_on_incoming_publish( - struct aws_protocol_adapter_incoming_publish_event *publish, + const struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data) { struct aws_request_response_protocol_adapter_test_fixture *fixture = user_data; @@ -135,7 +135,7 @@ static void s_rr_mqtt_protocol_adapter_test_on_terminate_callback(void *user_dat } static void s_rr_mqtt_protocol_adapter_test_on_connection_event( - struct aws_protocol_adapter_connection_event *event, + const struct aws_protocol_adapter_connection_event *event, void *user_data) { struct aws_request_response_protocol_adapter_test_fixture *fixture = user_data; diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c new file mode 100644 index 00000000..26580c1b --- /dev/null +++ b/tests/request-response/request_response_client_tests.c @@ -0,0 +1,99 @@ +/** +* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +* SPDX-License-Identifier: Apache-2.0. +*/ + +#include +#include +#include +#include + +#include + +struct aws_mqtt_protocol_adapter_pinned_mock { + struct aws_allocator *allocator; + struct aws_mqtt_protocol_adapter base; + + struct aws_event_loop *loop; + void *test_context; + bool is_connected; +}; + +static void s_aws_mqtt_protocol_adapter_pinned_mock_destroy(void *impl) { + struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + + aws_mem_release(adapter->allocator, adapter); +} + +static int s_aws_mqtt_protocol_adapter_pinned_mock_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { + (void)options; + + struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + + return AWS_OP_SUCCESS; +} + +static int s_aws_mqtt_protocol_adapter_pinned_mock_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { + (void)options; + + struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + + return AWS_OP_SUCCESS; +} + +static int s_aws_mqtt_protocol_adapter_pinned_mock_publish(void *impl, struct aws_protocol_adapter_publish_options *options) { + (void)options; + + struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + + return AWS_OP_SUCCESS; +} + +static bool s_aws_mqtt_protocol_adapter_pinned_mock_is_connected(void *impl) { + struct aws_mqtt_protocol_adapter_pinned_mock *adapter = impl; + + return adapter->is_connected; +} + + +static struct aws_mqtt_protocol_adapter_vtable s_default_protocol_adapter_pinned_mock_vtable = { + .aws_mqtt_protocol_adapter_destroy_fn = s_aws_mqtt_protocol_adapter_pinned_mock_destroy, + .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_pinned_mock_subscribe, + .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_pinned_mock_unsubscribe, + .aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_pinned_mock_publish, + .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_pinned_mock_is_connected, +}; + +static struct aws_mqtt_protocol_adapter_pinned_mock *s_aws_mqtt_protocol_adapter_new_pinned_mock( + struct aws_allocator *allocator, + const struct aws_mqtt_protocol_adapter_vtable *vtable, + struct aws_event_loop *loop, + void *test_context) { + struct aws_mqtt_protocol_adapter_pinned_mock *adapter = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_pinned_mock)); + + adapter->allocator = allocator; + adapter->test_context = test_context; + adapter->loop = loop; + adapter->base.impl = adapter; + if (vtable != NULL) { + adapter->base.vtable = vtable; + } else { + adapter->base.vtable = &s_default_protocol_adapter_pinned_mock_vtable; + } + + return adapter; +} + +struct aws_rr_client_test_fixture { + struct aws_allocator *allocator; + + struct aws_event_loop_group *elg; + struct aws_event_loop *loop; + + struct aws_request_response_client *client; + + struct aws_mqtt_protocol_adapter_pinned_mock *mock_adapter; + + void *test_context; +}; \ No newline at end of file diff --git a/tests/request-response/subscription_manager_tests.c b/tests/request-response/subscription_manager_tests.c index 670749fb..ffba44f2 100644 --- a/tests/request-response/subscription_manager_tests.c +++ b/tests/request-response/subscription_manager_tests.c @@ -77,7 +77,7 @@ static void s_aws_mqtt_protocol_adapter_mock_destroy(void *impl) { aws_mem_release(adapter->allocator, adapter); } -int s_aws_mqtt_protocol_adapter_mock_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { +static int s_aws_mqtt_protocol_adapter_mock_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { struct aws_mqtt_protocol_adapter_mock_impl *adapter = impl; struct aws_protocol_adapter_api_record record; @@ -89,7 +89,7 @@ int s_aws_mqtt_protocol_adapter_mock_subscribe(void *impl, struct aws_protocol_a return AWS_OP_SUCCESS; } -int s_aws_mqtt_protocol_adapter_mock_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { +static int s_aws_mqtt_protocol_adapter_mock_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { struct aws_mqtt_protocol_adapter_mock_impl *adapter = impl; struct aws_protocol_adapter_api_record record; @@ -101,7 +101,7 @@ int s_aws_mqtt_protocol_adapter_mock_unsubscribe(void *impl, struct aws_protocol return AWS_OP_SUCCESS; } -static bool s_aws_mqtt_protocol_adapter_mqtt_is_connected(void *impl) { +static bool s_aws_mqtt_protocol_adapter_mock_is_connected(void *impl) { struct aws_mqtt_protocol_adapter_mock_impl *adapter = impl; return adapter->is_connected; @@ -112,7 +112,7 @@ static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mock_vtable = .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_mock_subscribe, .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_mock_unsubscribe, .aws_mqtt_protocol_adapter_publish_fn = NULL, - .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_mqtt_is_connected, + .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_mock_is_connected, }; static struct aws_mqtt_protocol_adapter *s_aws_mqtt_mock_protocol_adapter_new(