Skip to content

Commit

Permalink
Commenting
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 19, 2024
1 parent 30ed0c3 commit 8c23d4b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 1 deletion.
72 changes: 71 additions & 1 deletion include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,91 @@ struct aws_allocator;
struct aws_mqtt_client_connection;
struct aws_mqtt5_client;

/*
* The request-response protocol adapter is a translation layer that sits between the request-response native client
* implementation and a protocol client capable of subscribing, unsubscribing, and publishing MQTT messages.
* Valid protocol clients include the CRT MQTT5 client, the CRT MQTT311 client, and an eventstream RPC connection
* that belongs to a Greengrass IPC client. Each of these protocol clients has a different (or even implicit)
* contract for carrying out pub-sub operations. The protocol adapter abstracts these details with a simple,
* minimal interface based on the requirements identigied in the request-response design documents.
*/

/*
* Minimal MQTT subscribe options
*/
struct aws_protocol_adapter_subscribe_options {
struct aws_byte_cursor topic_filter;
uint32_t ack_timeout_seconds;
};

/*
* Minimal MQTT unsubscribe options
*/
struct aws_protocol_adapter_unsubscribe_options {
struct aws_byte_cursor topic_filter;
uint32_t ack_timeout_seconds;
};

/*
* Minimal MQTT publish options
*/
struct aws_protocol_adapter_publish_options {
struct aws_byte_cursor topic;
struct aws_byte_cursor payload;
uint32_t ack_timeout_seconds;

/*
* Invoked on success/failure of the publish itself. Our implementations use QoS1 which means that success
* will be on puback receipt.
*/
void (*completion_callback_fn)(bool, void *);

/*
* User data to pass in when invoking the completion callback
*/
void *user_data;
uint32_t ack_timeout_seconds;
};

/*
* Describes the type of subscription event (relative to a topic filter)
*/
enum aws_protocol_adapter_subscription_event_type {
AWS_PASET_SUBSCRIBE_SUCCESS,
AWS_PASET_SUBSCRIBE_FAILURE,
AWS_PASET_UNSUBSCRIBE_SUCCESS,
AWS_PASET_UNSUBSCRIBE_FAILURE,
};

/*
* An event emitted by the protocol adapter when a subscribe or unsubscribe is completed by the adapted protocol
* client.
*/
struct aws_protocol_adapter_subscription_event {
struct aws_byte_cursor topic_filter;
enum aws_protocol_adapter_subscription_event_type event_type;
};

/*
* An event emitted by the protocol adapter whenever a publish is received by the protocol client. This will
* potentially include messages that are completely unrelated to MQTT request-response. The topic is the first
* thing that should be checked for relevance.
*/
struct aws_protocol_adapter_incoming_publish_event {
struct aws_byte_cursor topic;
struct aws_byte_cursor payload;
};

/*
* Describes the type of connection event emitted by the protocol adapter
*/
enum aws_protocol_adapter_connection_event_type {
AWS_PACET_OFFLINE,
AWS_PACET_ONLINE,
};

/*
* An event emitted by the protocol adapter whenever the protocol client encounters a change in connectivity state.
*/
struct aws_protocol_adapter_connection_event {
enum aws_protocol_adapter_connection_event_type event_type;
bool rejoined_session;
Expand All @@ -70,12 +114,19 @@ typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);
typedef void(
aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data);

/*
* Set of callbacks invoked by the protocol adapter. These must all be set.
*/
struct aws_mqtt_protocol_adapter_options {
aws_protocol_adapter_subscription_event_fn *subscription_event_callback;
aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback;
aws_protocol_adapter_terminate_callback_fn *terminate_callback;
aws_protocol_adapter_connection_event_fn *connection_event_callback;

/*
* User data to pass into all singleton protocol adapter callbacks. Likely either the request-response client
* or the subscription manager component of the request-response client.
*/
void *user_data;
};

Expand All @@ -97,26 +148,45 @@ struct aws_mqtt_protocol_adapter {

AWS_EXTERN_C_BEGIN

/*
* Creates a new request-response protocol adapter from an MQTT311 client
*/
AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt_client_connection *connection);

/*
* Creates a new request-response protocol adapter from an MQTT5 client
*/
AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt5_client *client);

/*
* Destroys a request-response protocol adapter. Destruction is an asynchronous process and the caller must
* wait for the termination callback to be invoked before assuming that no further callbacks will be invoked.
*/
AWS_MQTT_API void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter);

/*
* Asks the adapted protocol client to perform an MQTT subscribe operation
*/
AWS_MQTT_API int aws_mqtt_protocol_adapter_subscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_subscribe_options *options);

/*
* Asks the adapted protocol client to perform an MQTT unsubscribe operation
*/
AWS_MQTT_API int aws_mqtt_protocol_adapter_unsubscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_unsubscribe_options *options);

/*
* Asks the adapted protocol client to perform an MQTT publish operation
*/
AWS_MQTT_API int aws_mqtt_protocol_adapter_publish(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_publish_options *options);
Expand Down
33 changes: 33 additions & 0 deletions include/aws/mqtt/private/request-response/weak_ref.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,51 @@

#include <aws/common/common.h>

/*
* This is a simplification of the notion of a weak reference particular to the needs of the request-response
* MQTT service clients. This is not suitable for general use but could be extended
* for general use in the future. Until then, it stays private, here.
*
* This weak reference is a ref-counted object with an opaque value. The opaque value may be cleared or
* queried. These two operations *do not* provide any thread safety.
*
* The primary use is to allow one object to safely use asynchronous callback-driven APIs on a second object, despite
* the fact that the first object may get destroyed unpredictably. The two objects must be exclusive to a single
* event loop (because there's no thread safety or mutual exclusion on the opaque value held by the weak ref).
*
* The initial use is the request-response protocol adapter submitting operations to an MQTT client or an
* eventstream RPC connection. We use a single weak ref to the protocol adapter and zero its opaque value when
* the protocol adapter is destroyed. Operation callbacks that subsequently resolve can then short circuit and do
* nothing rather than call into garbage and crash.
*/
struct aws_weak_ref;

AWS_EXTERN_C_BEGIN

/*
* Creates a new weak reference to an opaque value.
*/
AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_new(struct aws_allocator *allocator, void *referenced);

/*
* Acquires a reference to the weak ref object.
*/
AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_acquire(struct aws_weak_ref *weak_ref);

/*
* Removes a reference to the weak ref object. When the last reference is removed, the weak ref object will be
* destroyed. This has no effect on the opaque value held by the weak ref.
*/
AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_release(struct aws_weak_ref *weak_ref);

/*
* Gets the current value of the opaque data held by the weak ref.
*/
AWS_MQTT_API void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref);

/*
* Clears the opaque data held by the weak ref.
*/
AWS_MQTT_API void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref);

AWS_EXTERN_C_END
Expand Down

0 comments on commit 8c23d4b

Please sign in to comment.