Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Apr 15, 2024
1 parent f8248f8 commit 9dfbb86
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 2 deletions.
105 changes: 103 additions & 2 deletions include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,41 @@ typedef void(aws_mqtt_request_operation_completion_fn)(
* Configuration options for a request-response operation.
*/
struct aws_mqtt_request_operation_options {

/*
* Set of topic filters that should be subscribed to in order to cover all possible response paths. Sometimes
* we can use wildcards to cut down on the subscriptions needed; sometimes we can't.
*/
struct aws_byte_cursor *subscription_topic_filters;
size_t subscription_topic_filter_count;

/*
* Set of all possible response paths associated with this request type
*/
struct aws_mqtt_request_operation_response_path *response_paths;
size_t response_path_count;

/*
* Topic to publish the request to once response subscriptions have been established.
*/
struct aws_byte_cursor publish_topic;

/*
* Payload to publish in order to initiate the request
*/
struct aws_byte_cursor serialized_request;

/*
* Correlation token embedded in the request that must be found in a response message. This can be the
* empty cursor to support certain services which don't use correlation tokens. In this case, the client
* only allows one request at a time to use the associated subscriptions; no concurrency is possible. There
* are some optimizations we could make here but for now, they're not worth the complexity.
*/
struct aws_byte_cursor correlation_token;

/*
* Callback (and associated user data) to invoke when the request is completed.
*/
aws_mqtt_request_operation_completion_fn *completion_callback;
void *user_data;
};
Expand All @@ -84,36 +109,94 @@ enum aws_rr_streaming_subscription_event_type {
ARRSSET_SUBSCRIPTION_HALTED,
};

/*
* Callback signature for when the subscription status of a streaming operation changes.
*/
typedef void(aws_mqtt_streaming_operation_subscription_status_fn)(
enum aws_rr_streaming_subscription_event_type status,
int error_code,
void *user_data);

/*
* Callback signature for when a publish arrives that matches a streaming operation's subscription
*/
typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data);

/*
* Callback signature for when a streaming operation is fully destroyed and no more events will be emitted.
*/
typedef void(aws_mqtt_streaming_operation_terminated_fn)(void *user_data);

/*
* Configuration options for a streaming operation.
*/
struct aws_mqtt_streaming_operation_options {

/*
* Topic filter that the streaming operation should listen on
*/
struct aws_byte_cursor topic_filter;

/*
* Callback for subscription status events
*/
aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback;

/*
* Callback for publish messages that match the operation's topic filter
*/
aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback;

/*
* Callback for streaming operation final shutdown
*/
aws_mqtt_streaming_operation_terminated_fn *terminated_callback;

/*
* Data passed to all streaming operation callbacks
*/
void *user_data;
};

typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data);
typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data);

/*
* Request-response client configuration options
*/
struct aws_mqtt_request_response_client_options {

/*
* Maximum number of subscriptions that the client will concurrently use for request-response operations
*/
size_t max_request_response_subscriptions;

/*
* Maximum number of subscriptions that the client will concurrently use for streaming operations
*/
size_t max_streaming_subscriptions;

/*
* Duration, in seconds, that a request-response operation will wait for completion before giving up
*/
uint32_t operation_timeout_seconds;

/* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */
/*
* Request-response client initialization is asynchronous. This callback is invoked when the client is fully
* initialized.
*
* Do not bind the initialized callback; it exists mostly for tests and should not be exposed
*/
aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback;

/*
* Callback invoked when the client's asynchronous destruction process has fully completed.
*/
aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback;

/*
* Arbitrary data to pass to the client callbacks
*/
void *user_data;
};

Expand Down Expand Up @@ -142,22 +225,40 @@ AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_
struct aws_mqtt_request_response_client *client);

/*
* Remove a reference to a request-response client
* Remove a reference from a request-response client
*/
AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(
struct aws_mqtt_request_response_client *client);

/*
* Submits a request operation to the client
*/
AWS_MQTT_API int aws_mqtt_request_response_client_submit_request(
struct aws_mqtt_request_response_client *client,
const struct aws_mqtt_request_operation_options *request_options);

/*
* Creates a new streaming operation. Streaming operations start in an inactive state and must be
* activated before its subscription can be established.
*/
AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_streaming_operation(
struct aws_mqtt_request_response_client *client,
const struct aws_mqtt_streaming_operation_options *streaming_options);

/*
* Initiates a streaming operation's subscription process.
*/
AWS_MQTT_API int aws_mqtt_rr_client_operation_activate(struct aws_mqtt_rr_client_operation *operation);

/*
* Add a reference to a streaming operation.
*/
AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(
struct aws_mqtt_rr_client_operation *operation);

/*
* Remove a reference from a streaming operation
*/
AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release(
struct aws_mqtt_rr_client_operation *operation);

Expand Down
16 changes: 16 additions & 0 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct aws_mqtt_streaming_operation_storage {
struct aws_mqtt_streaming_operation_options options;

struct aws_byte_buf operation_data;

struct aws_atomic_var activated;
};

enum aws_mqtt_request_response_operation_type {
Expand Down Expand Up @@ -2166,6 +2168,8 @@ void s_aws_mqtt_streaming_operation_storage_init_from_options(

AWS_FATAL_ASSERT(
aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.topic_filter) == AWS_OP_SUCCESS);

aws_atomic_init_int(&storage->activated, 0);
}

static void s_log_streaming_operation(
Expand Down Expand Up @@ -2228,6 +2232,18 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_str
return operation;
}

int aws_mqtt_rr_client_operation_activate(struct aws_mqtt_rr_client_operation *operation) {
struct aws_atomic_var *activated = &operation->storage.streaming_storage.activated;
size_t unactivated = 0;
if (!aws_atomic_compare_exchange_int(activated, &unactivated, 1)) {
return aws_raise_error(??);
}

aws_event_loop_schedule_task_now(operation->client_internal_ref->loop, &operation->submit_task);

return AWS_OP_SUCCESS;
}

struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(
struct aws_mqtt_rr_client_operation *operation) {
if (operation != NULL) {
Expand Down

0 comments on commit 9dfbb86

Please sign in to comment.