diff --git a/include/aws/mqtt/private/client_impl.h b/include/aws/mqtt/private/client_impl.h index 1d0dd67a..ce041b2d 100644 --- a/include/aws/mqtt/private/client_impl.h +++ b/include/aws/mqtt/private/client_impl.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -255,6 +256,9 @@ struct aws_mqtt_client_connection_311_impl { aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics; void *on_any_operation_statistics_ud; + /* listener callbacks */ + struct aws_mqtt311_callback_set_manager callback_manager; + /* Connection tasks. */ struct aws_mqtt_reconnect_task *reconnect_task; struct aws_channel_task ping_task; diff --git a/include/aws/mqtt/private/mqtt311_listener.h b/include/aws/mqtt/private/mqtt311_listener.h new file mode 100644 index 00000000..a5c9bba9 --- /dev/null +++ b/include/aws/mqtt/private/mqtt311_listener.h @@ -0,0 +1,183 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#ifndef AWS_MQTT_MQTT311_LISTENER_H +#define AWS_MQTT_MQTT311_LISTENER_H + +#include + +#include +#include + +AWS_PUSH_SANE_WARNING_LEVEL + +/** + * Callback signature for when an mqtt311 listener has completely destroyed itself. + */ +typedef void(aws_mqtt311_listener_termination_completion_fn)(void *complete_ctx); + +/** + * A record that tracks MQTT311 client connection callbacks which can be dynamically injected via a listener. + * + * All the callbacks that are supported here are invoked only on the 311 connection's event loop. With the + * add/remove callback set also on the event loop, everything is correctly serialized without data races. + */ +struct aws_mqtt311_callback_set { + + /* Called from s_packet_handler_publish which is event-loop invoked */ + aws_mqtt_client_publish_received_fn *publish_received_handler; + + /* Called from s_mqtt_client_shutdown which is event-loop invoked */ + aws_mqtt_client_on_connection_interrupted_fn *connection_interrupted_handler; + + /* Called from s_packet_handler_connack which is event-loop invoked */ + aws_mqtt_client_on_connection_interrupted_fn *connection_resumed_handler; + + /* Also called from s_packet_handler_connack which is event-loop invoked */ + aws_mqtt_client_on_connection_success_fn *connection_success_handler; + + void *user_data; +}; + +/** + * An internal type for managing chains of callbacks attached to an mqtt311 client connection. Supports chains for + * lifecycle events and incoming publish packet handling. + * + * Assumed to be owned and used only by an MQTT311 client connection. + */ +struct aws_mqtt311_callback_set_manager { + struct aws_allocator *allocator; + + struct aws_mqtt_client_connection *connection; + + struct aws_linked_list callback_set_entries; + + uint64_t next_callback_set_entry_id; +}; + + +/** + * Configuration options for MQTT311 listener objects. + */ +struct aws_mqtt311_listener_config { + + /** + * MQTT311 client connection to listen to events on + */ + struct aws_mqtt_client_connection *connection; + + /** + * Callbacks to invoke when events occur on the MQTT311 client connection + */ + struct aws_mqtt311_callback_set listener_callbacks; + + /** + * Listener destruction is asynchronous and thus requires a termination callback and associated user data + * to notify the user that the listener has been fully destroyed and no further events will be received. + */ + aws_mqtt311_listener_termination_completion_fn *termination_callback; + void *termination_callback_user_data; +}; + +AWS_EXTERN_C_BEGIN + +/** + * Creates a new MQTT311 listener object. For as long as the listener lives, incoming publishes and lifecycle events + * will be forwarded to the callbacks configured on the listener. + * + * @param allocator allocator to use + * @param config listener configuration + * @return a new aws_mqtt311_listener object + */ +AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_new( + struct aws_allocator *allocator, + struct aws_mqtt311_listener_config *config); + +/** + * Adds a reference to an mqtt311 listener. + * + * @param listener listener to add a reference to + * @return the listener object + */ +AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener); + +/** + * Removes a reference to an mqtt311 listener. When the reference count drops to zero, the listener's asynchronous + * destruction will be started. + * + * @param listener listener to remove a reference from + * @return NULL + */ +AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener); + + +/** + * Initializes a callback set manager + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_init( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *connection); + +/** + * Cleans up a callback set manager. + * + * aws_mqtt311_callback_set_manager_init must have been previously called or this will crash. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager); + +/** + * Adds a callback set to the front of the handler chain. Returns an integer id that can be used to selectively + * remove the callback set from the manager. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +uint64_t aws_mqtt311_callback_set_manager_push_front( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_mqtt311_callback_set *callback_set); + +/** + * Removes a callback set from the handler chain. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_remove(struct aws_mqtt311_callback_set_manager *manager, uint64_t callback_set_id); + +/** + * Walks the incoming publish handler chain for an MQTT311 connection. The chain's callbacks will be invoked + * until either the end is reached or one of the callbacks returns true. + * + * May only be called on the client's event loop thread. + */ +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_publish_received( + struct aws_mqtt311_callback_set_manager *manager, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + bool dup, + enum aws_mqtt_qos qos, + bool retain); + +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_connection_interrupted( + struct aws_mqtt311_callback_set_manager *manager); + +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_connection_resumed( + struct aws_mqtt311_callback_set_manager *manager); + +AWS_MQTT_API +void aws_mqtt311_callback_set_manager_on_connection_success( + struct aws_mqtt311_callback_set_manager *manager); + +AWS_EXTERN_C_END + +AWS_POP_SANE_WARNING_LEVEL + +#endif /* AWS_MQTT_MQTT311_LISTENER_H */ diff --git a/source/client.c b/source/client.c index 42ab634c..a5161afe 100644 --- a/source/client.c +++ b/source/client.c @@ -259,6 +259,7 @@ static void s_mqtt_client_shutdown( (void)channel; struct aws_mqtt_client_connection_311_impl *connection = user_data; + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection->loop)); AWS_LOGF_TRACE( AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code); @@ -801,6 +802,8 @@ static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connec termination_handler_user_data = connection->on_termination_ud; } + aws_mqtt311_callback_set_manager_clean_up(&connection->callback_manager); + /* If the reconnect_task isn't freed, free it */ if (connection->reconnect_task) { aws_mem_release(connection->reconnect_task->allocator, connection->reconnect_task); @@ -3351,6 +3354,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt connection->handler.vtable = aws_mqtt_get_client_channel_vtable(); connection->handler.impl = connection; + aws_mqtt311_callback_set_manager_init(&connection->callback_manager, connection->allocator, connection); + return &connection->base; failed_init_outstanding_requests_table: diff --git a/source/mqtt311_listener.c b/source/mqtt311_listener.c new file mode 100644 index 00000000..b2fcab92 --- /dev/null +++ b/source/mqtt311_listener.c @@ -0,0 +1,251 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include +#include +#include +#include +#include + +#include + +struct aws_mqtt311_listener { + struct aws_allocator *allocator; + + struct aws_ref_count ref_count; + + struct aws_mqtt311_listener_config config; + + uint64_t callback_set_id; + + struct aws_task initialize_task; + struct aws_task terminate_task; +}; + +static void s_mqtt311_listener_destroy(struct aws_mqtt311_listener *listener) { + + aws_mqtt_client_connection_release(listener->config.connection); + + aws_mqtt311_listener_termination_completion_fn *termination_callback = listener->config.termination_callback; + void *termination_callback_user_data = listener->config.termination_callback_user_data; + + aws_mem_release(listener->allocator, listener); + + if (termination_callback != NULL) { + (*termination_callback)(termination_callback_user_data); + } +} + +static void s_mqtt311_listener_initialize_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task; + + struct aws_mqtt311_listener *listener = arg; + + if (task_status == AWS_TASK_STATUS_RUN_READY) { + struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl; + listener->callback_set_id = aws_mqtt311_callback_set_manager_push_front( + &connection_impl->callback_manager, &listener->config.listener_callbacks); + AWS_LOGF_INFO( + AWS_LS_MQTT_GENERAL, + "id=%p: Mqtt311 Listener initialized, listener id=%p", + (void *)listener->config.connection, + (void *)listener); + aws_mqtt311_listener_release(listener); + } else { + s_mqtt311_listener_destroy(listener); + } +} + +static void s_mqtt311_listener_terminate_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task; + + struct aws_mqtt311_listener *listener = arg; + + if (task_status == AWS_TASK_STATUS_RUN_READY) { + struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl; + aws_mqtt311_callback_set_manager_remove(&connection_impl->callback_manager, listener->callback_set_id); + } + + AWS_LOGF_INFO( + AWS_LS_MQTT_GENERAL, + "id=%p: Mqtt311 Listener terminated, listener id=%p", + (void *)listener->config.connection, + (void *)listener); + + s_mqtt311_listener_destroy(listener); +} + +static void s_aws_mqtt311_listener_on_zero_ref_count(void *context) { + struct aws_mqtt311_listener *listener = context; + struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl; + + aws_event_loop_schedule_task_now(connection_impl->loop, &listener->terminate_task); +} + +struct aws_mqtt311_listener *aws_mqtt311_listener_new( + struct aws_allocator *allocator, + struct aws_mqtt311_listener_config *config) { + if (config->connection == NULL) { + return NULL; + } + + if (aws_mqtt_client_connection_get_impl_type(config->connection) != AWS_MQTT311_IT_311_CONNECTION_IMPL) { + return NULL; + } + + struct aws_mqtt_client_connection_311_impl *connection_impl = config->connection->impl; + struct aws_mqtt311_listener *listener = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt311_listener)); + + listener->allocator = allocator; + listener->config = *config; + + aws_mqtt_client_connection_acquire(config->connection); + aws_ref_count_init(&listener->ref_count, listener, s_aws_mqtt311_listener_on_zero_ref_count); + + aws_task_init(&listener->initialize_task, s_mqtt311_listener_initialize_task_fn, listener, "Mqtt311ListenerInitialize"); + aws_task_init(&listener->terminate_task, s_mqtt311_listener_terminate_task_fn, listener, "Mqtt311ListenerTerminate"); + + aws_mqtt311_listener_acquire(listener); + aws_event_loop_schedule_task_now(connection_impl->loop, &listener->initialize_task); + + return listener; +} + +struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener) { + if (listener != NULL) { + aws_ref_count_acquire(&listener->ref_count); + } + + return listener; +} + +struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener) { + if (listener != NULL) { + aws_ref_count_release(&listener->ref_count); + } + + return NULL; +} + + +struct aws_mqtt311_callback_set_entry { + struct aws_allocator *allocator; + + struct aws_linked_list_node node; + + uint64_t id; + + struct aws_mqtt311_callback_set callbacks; +}; + +void aws_mqtt311_callback_set_manager_init( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *connection) { + + manager->allocator = allocator; + manager->connection = connection; /* no need to ref count, this is assumed to be owned by the client connection */ + manager->next_callback_set_entry_id = 1; + + aws_linked_list_init(&manager->callback_set_entries); +} + +void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager) { + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + aws_linked_list_remove(&entry->node); + aws_mem_release(entry->allocator, entry); + } +} + +static struct aws_mqtt311_callback_set_entry *s_new_311_callback_set_entry( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_mqtt311_callback_set *callback_set) { + struct aws_mqtt311_callback_set_entry *entry = + aws_mem_calloc(manager->allocator, 1, sizeof(struct aws_mqtt311_callback_set_entry)); + + entry->allocator = manager->allocator; + entry->id = manager->next_callback_set_entry_id++; + entry->callbacks = *callback_set; + + AWS_LOGF_INFO( + AWS_LS_MQTT5_GENERAL, + "id=%p: callback manager created new entry :%" PRIu64, + (void *)manager->connection, + entry->id); + + return entry; +} + +uint64_t aws_mqtt5_callback_set_manager_push_front( + struct aws_mqtt311_callback_set_manager *manager, + struct aws_mqtt311_callback_set *callback_set) { + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(manager->client->loop)); + + struct aws_mqtt311_callback_set_entry *entry = s_new_311_callback_set_entry(manager, callback_set); + + aws_linked_list_push_front(&manager->callback_set_entries, &entry->node); + + return entry->id; +} + +void aws_mqtt311_callback_set_manager_remove(struct aws_mqtt311_callback_set_manager *manager, uint64_t callback_set_id) { + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(manager->client->loop)); + + struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries); + while (node != aws_linked_list_end(&manager->callback_set_entries)) { + struct aws_mqtt311_callback_set_entry *entry = AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node); + node = aws_linked_list_next(node); + + if (entry->id == callback_set_id) { + aws_linked_list_remove(&entry->node); + + AWS_LOGF_INFO( + AWS_LS_MQTT5_GENERAL, + "id=%p: callback manager removed entry id=%" PRIu64, + (void *)manager->connection, + entry->id); + aws_mem_release(entry->allocator, entry); + return; + } + } + AWS_LOGF_INFO( + AWS_LS_MQTT5_GENERAL, + "id=%p: callback manager failed to remove entry id=%" PRIu64 ", callback set id not found.", + (void *)manager->connection, + callback_set_id); +} + +void aws_mqtt311_callback_set_manager_on_publish_received( + struct aws_mqtt311_callback_set_manager *manager, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + bool dup, + enum aws_mqtt_qos qos, + bool retain) { + +} + +void aws_mqtt311_callback_set_manager_on_connection_interrupted( + struct aws_mqtt311_callback_set_manager *manager) { + +} + +void aws_mqtt311_callback_set_manager_on_connection_resumed( + struct aws_mqtt311_callback_set_manager *manager) { + +} + +void aws_mqtt311_callback_set_manager_on_connection_success( + struct aws_mqtt311_callback_set_manager *manager) { + +} \ No newline at end of file