Skip to content

Commit

Permalink
Rebuild the branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 26, 2024
1 parent 426878f commit ca0c14f
Show file tree
Hide file tree
Showing 17 changed files with 2,005 additions and 993 deletions.
4 changes: 4 additions & 0 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/fixed_header.h>
#include <aws/mqtt/private/mqtt311_decoder.h>
#include <aws/mqtt/private/mqtt311_listener.h>
#include <aws/mqtt/private/topic_tree.h>

#include <aws/common/hash_table.h>
Expand Down Expand Up @@ -255,6 +256,9 @@ struct aws_mqtt_client_connection_311_impl {
aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics;
void *on_any_operation_statistics_ud;

/* listener callbacks */
struct aws_mqtt311_callback_set_manager callback_manager;

/* Connection tasks. */
struct aws_mqtt_reconnect_task *reconnect_task;
struct aws_channel_task ping_task;
Expand Down
4 changes: 2 additions & 2 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ struct aws_mqtt_client_connection_vtable {

int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats);

enum aws_mqtt311_impl_type (*get_impl_type)(void *impl);
enum aws_mqtt311_impl_type (*get_impl_type)(const void *impl);
};

struct aws_mqtt_client_connection {
Expand All @@ -131,7 +131,7 @@ struct aws_mqtt_client_connection {
};

AWS_MQTT_API enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(
struct aws_mqtt_client_connection *connection);
const struct aws_mqtt_client_connection *connection);

AWS_MQTT_API uint64_t aws_mqtt_hash_uint16_t(const void *item);

Expand Down
180 changes: 180 additions & 0 deletions include/aws/mqtt/private/mqtt311_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#ifndef AWS_MQTT_MQTT311_LISTENER_H
#define AWS_MQTT_MQTT311_LISTENER_H

#include <aws/mqtt/mqtt.h>

#include <aws/common/rw_lock.h>
#include <aws/mqtt/client.h>

AWS_PUSH_SANE_WARNING_LEVEL

/**
* Callback signature for when an mqtt311 listener has completely destroyed itself.
*/
typedef void(aws_mqtt311_listener_termination_completion_fn)(void *complete_ctx);

/**
* A record that tracks MQTT311 client connection callbacks which can be dynamically injected via a listener.
*
* All the callbacks that are supported here are invoked only on the 311 connection's event loop. With the
* add/remove callback set also on the event loop, everything is correctly serialized without data races.
*
* If binding additional callbacks, they must only be invoked from the connection's event loop.
*
* We only listen to connection-success because the only connection-level event we care about is a failure
* to rejoin a session (which invalidates all subscriptions that were considered valid)
*/
struct aws_mqtt311_callback_set {

/* Called from s_packet_handler_publish which is event-loop invoked */
aws_mqtt_client_publish_received_fn *publish_received_handler;

/* Called from s_packet_handler_connack which is event-loop invoked */
aws_mqtt_client_on_connection_success_fn *connection_success_handler;

void *user_data;
};

/**
* An internal type for managing chains of callbacks attached to an mqtt311 client connection. Supports chains for
* lifecycle events and incoming publish packet handling.
*
* Assumed to be owned and used only by an MQTT311 client connection.
*/
struct aws_mqtt311_callback_set_manager {
struct aws_allocator *allocator;

struct aws_mqtt_client_connection *connection;

struct aws_linked_list callback_set_entries;

uint64_t next_callback_set_entry_id;
};

/**
* Configuration options for MQTT311 listener objects.
*/
struct aws_mqtt311_listener_config {

/**
* MQTT311 client connection to listen to events on
*/
struct aws_mqtt_client_connection *connection;

/**
* Callbacks to invoke when events occur on the MQTT311 client connection
*/
struct aws_mqtt311_callback_set listener_callbacks;

/**
* Listener destruction is asynchronous and thus requires a termination callback and associated user data
* to notify the user that the listener has been fully destroyed and no further events will be received.
*/
aws_mqtt311_listener_termination_completion_fn *termination_callback;
void *termination_callback_user_data;
};

AWS_EXTERN_C_BEGIN

/**
* Creates a new MQTT311 listener object. For as long as the listener lives, incoming publishes and lifecycle events
* will be forwarded to the callbacks configured on the listener.
*
* @param allocator allocator to use
* @param config listener configuration
* @return a new aws_mqtt311_listener object
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_new(
struct aws_allocator *allocator,
struct aws_mqtt311_listener_config *config);

/**
* Adds a reference to an mqtt311 listener.
*
* @param listener listener to add a reference to
* @return the listener object
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener);

/**
* Removes a reference to an mqtt311 listener. When the reference count drops to zero, the listener's asynchronous
* destruction will be started.
*
* @param listener listener to remove a reference from
* @return NULL
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener);

/**
* Initializes a callback set manager
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_init(
struct aws_mqtt311_callback_set_manager *manager,
struct aws_allocator *allocator,
struct aws_mqtt_client_connection *connection);

/**
* Cleans up a callback set manager.
*
* aws_mqtt311_callback_set_manager_init must have been previously called or this will crash.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager);

/**
* Adds a callback set to the front of the handler chain. Returns an integer id that can be used to selectively
* remove the callback set from the manager.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
uint64_t aws_mqtt311_callback_set_manager_push_front(
struct aws_mqtt311_callback_set_manager *manager,
struct aws_mqtt311_callback_set *callback_set);

/**
* Removes a callback set from the handler chain.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_remove(
struct aws_mqtt311_callback_set_manager *manager,
uint64_t callback_set_id);

/**
* Walks the incoming publish handler chain for an MQTT311 connection, invoking each in sequence.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_publish_received(
struct aws_mqtt311_callback_set_manager *manager,
const struct aws_byte_cursor *topic,
const struct aws_byte_cursor *payload,
bool dup,
enum aws_mqtt_qos qos,
bool retain);

/**
* Invokes a connection success event on each listener in the manager's collection of callback sets.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_success(
struct aws_mqtt311_callback_set_manager *manager,
enum aws_mqtt_connect_return_code return_code,
bool rejoined_session);

AWS_EXTERN_C_END

AWS_POP_SANE_WARNING_LEVEL

#endif /* AWS_MQTT_MQTT311_LISTENER_H */
7 changes: 6 additions & 1 deletion source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ static void s_mqtt_client_shutdown(
(void)channel;

struct aws_mqtt_client_connection_311_impl *connection = user_data;
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection->loop));

AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code);
Expand Down Expand Up @@ -801,6 +802,8 @@ static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connec
termination_handler_user_data = connection->on_termination_ud;
}

aws_mqtt311_callback_set_manager_clean_up(&connection->callback_manager);

/* If the reconnect_task isn't freed, free it */
if (connection->reconnect_task) {
aws_mem_release(connection->reconnect_task->allocator, connection->reconnect_task);
Expand Down Expand Up @@ -3220,7 +3223,7 @@ static void s_aws_mqtt_client_connection_311_release(void *impl) {
aws_ref_count_release(&connection->ref_count);
}

enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_3_get_impl(void *impl) {
enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_3_get_impl(const void *impl) {
(void)impl;

return AWS_MQTT311_IT_311_CONNECTION;
Expand Down Expand Up @@ -3351,6 +3354,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt
connection->handler.vtable = aws_mqtt_get_client_channel_vtable();
connection->handler.impl = connection;

aws_mqtt311_callback_set_manager_init(&connection->callback_manager, connection->allocator, &connection->base);

return &connection->base;

failed_init_outstanding_requests_table:
Expand Down
6 changes: 6 additions & 0 deletions source/client_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ static int s_packet_handler_connack(struct aws_byte_cursor message_cursor, void
MQTT_CLIENT_CALL_CALLBACK_ARGS(
connection, on_connection_success, connack.connect_return_code, connack.session_present);

aws_mqtt311_callback_set_manager_on_connection_success(
&connection->callback_manager, connack.connect_return_code, connack.session_present);

AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: connection callback completed", (void *)connection);

s_update_next_ping_time(connection);
Expand Down Expand Up @@ -291,6 +294,9 @@ static int s_packet_handler_publish(struct aws_byte_cursor message_cursor, void

MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_any_publish, &publish.topic_name, &publish.payload, dup, qos, retain);

aws_mqtt311_callback_set_manager_on_publish_received(
&connection->callback_manager, &publish.topic_name, &publish.payload, dup, qos, retain);

AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: publish received with msg id=%" PRIu16 " dup=%d qos=%d retain=%d payload-size=%zu topic=" PRInSTR,
Expand Down
3 changes: 2 additions & 1 deletion source/client_impl_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ int aws_mqtt_client_connection_get_stats(
return (*connection->vtable->get_stats_fn)(connection->impl, stats);
}

enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(struct aws_mqtt_client_connection *connection) {
enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(
const struct aws_mqtt_client_connection *connection) {
return (*connection->vtable->get_impl_type)(connection->impl);
}

Expand Down
Loading

0 comments on commit ca0c14f

Please sign in to comment.