diff --git a/include/aws/mqtt/private/request-response/request_response_subscription_set.h b/include/aws/mqtt/private/request-response/request_response_subscription_set.h new file mode 100644 index 00000000..3f60d359 --- /dev/null +++ b/include/aws/mqtt/private/request-response/request_response_subscription_set.h @@ -0,0 +1,138 @@ +#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H +#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include + +/* + * Handles subscriptions for request-response client. + * Lifetime of this struct is bound to request-response client. + */ +struct aws_request_response_subscriptions { + struct aws_allocator *allocator; + + /* + * Map from cursor (topic filter) -> list of streaming operations using that filter + * + * We don't garbage collect this table over the course of normal client operation. We only clean it up + * when the client is shutting down. + */ + struct aws_hash_table streaming_operation_subscription_lists; + + /* + * Map from cursor (topic filter with wildcards) -> list of streaming operations using that filter + * + * We don't garbage collect this table over the course of normal client operation. We only clean it up + * when the client is shutting down. + */ + struct aws_hash_table streaming_operation_wildcards_subscription_lists; + + /* + * Map from cursor (topic) -> request response path (topic, correlation token json path) + */ + struct aws_hash_table request_response_paths; +}; + +/* + * This is the (key and) value in stream subscriptions tables. + */ +struct aws_rr_operation_list_topic_filter_entry { + struct aws_allocator *allocator; + + struct aws_byte_cursor topic_filter_cursor; + struct aws_byte_buf topic_filter; + + struct aws_linked_list operations; +}; + +/* + * Value in request subscriptions table. + */ +struct aws_rr_response_path_entry { + struct aws_allocator *allocator; + + size_t ref_count; + + struct aws_byte_cursor topic_cursor; + struct aws_byte_buf topic; + + struct aws_byte_buf correlation_token_json_path; +}; + +/* + * Callback type for matched stream subscriptions. + */ +typedef void(aws_mqtt_stream_operation_subscription_match_fn)( + const struct aws_linked_list *operations, + const struct aws_byte_cursor *topic_filter, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + void *user_data); + +/* + * Callback type for matched request subscriptions. + */ +typedef void(aws_mqtt_request_operation_subscription_match_fn)( + struct aws_rr_response_path_entry *entry, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + void *user_data); + +AWS_EXTERN_C_BEGIN + +/* + * Initialize internal state of a provided request-response subscriptions structure. + */ +AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_init( + struct aws_request_response_subscriptions *subscriptions, + struct aws_allocator *allocator); + +/* + * Clean up internals of a provided request-response subscriptions structure. + */ +AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_clean_up( + struct aws_request_response_subscriptions *subscriptions); + +/* + * Add a subscription for stream operations. + * If subscription with the same topic filter is already added, previously created + * aws_rr_operation_list_topic_filter_entry instance is returned. + */ +AWS_MQTT_API struct aws_rr_operation_list_topic_filter_entry * + aws_mqtt_request_response_client_subscriptions_add_stream_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter); + +/* + * Add a subscription for request operation. + */ +AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter, + const struct aws_byte_cursor *correlation_token_json_path); + +/* + * Remove a subscription for a given request operation. + */ +AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_remove_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter); + +/* + * Call specified callbacks for all stream and request operations with subscriptions matching a provided publish event. + */ +AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_match( + const struct aws_request_response_subscriptions *subscriptions, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match, + void *user_data); + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H */ diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 448f7c98..3fb2d691 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -12,13 +12,13 @@ #include #include #include +#include #include #include #include #define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50 -#define MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE 50 struct aws_mqtt_request_operation_storage { struct aws_mqtt_request_operation_options options; @@ -136,89 +136,6 @@ state, removed on operation completion/destruction */ -/* - * This is the (key and) value in hash table (4) above. - */ -struct aws_rr_operation_list_topic_filter_entry { - struct aws_allocator *allocator; - - struct aws_byte_cursor topic_filter_cursor; - struct aws_byte_buf topic_filter; - - struct aws_linked_list operations; -}; - -static struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_topic_filter_entry_new( - struct aws_allocator *allocator, - struct aws_byte_cursor topic_filter) { - struct aws_rr_operation_list_topic_filter_entry *entry = - aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_operation_list_topic_filter_entry)); - - entry->allocator = allocator; - aws_byte_buf_init_copy_from_cursor(&entry->topic_filter, allocator, topic_filter); - entry->topic_filter_cursor = aws_byte_cursor_from_buf(&entry->topic_filter); - - aws_linked_list_init(&entry->operations); - - return entry; -} - -static void s_aws_rr_operation_list_topic_filter_entry_destroy(struct aws_rr_operation_list_topic_filter_entry *entry) { - if (entry == NULL) { - return; - } - - aws_byte_buf_clean_up(&entry->topic_filter); - - aws_mem_release(entry->allocator, entry); -} - -static void s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy(void *value) { - s_aws_rr_operation_list_topic_filter_entry_destroy(value); -} - -struct aws_rr_response_path_entry { - struct aws_allocator *allocator; - - size_t ref_count; - - struct aws_byte_cursor topic_cursor; - struct aws_byte_buf topic; - - struct aws_byte_buf correlation_token_json_path; -}; - -static struct aws_rr_response_path_entry *s_aws_rr_response_path_entry_new( - struct aws_allocator *allocator, - struct aws_byte_cursor topic, - struct aws_byte_cursor correlation_token_json_path) { - struct aws_rr_response_path_entry *entry = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_response_path_entry)); - - entry->allocator = allocator; - entry->ref_count = 1; - aws_byte_buf_init_copy_from_cursor(&entry->topic, allocator, topic); - entry->topic_cursor = aws_byte_cursor_from_buf(&entry->topic); - - aws_byte_buf_init_copy_from_cursor(&entry->correlation_token_json_path, allocator, correlation_token_json_path); - - return entry; -} - -static void s_aws_rr_response_path_entry_destroy(struct aws_rr_response_path_entry *entry) { - if (entry == NULL) { - return; - } - - aws_byte_buf_clean_up(&entry->topic); - aws_byte_buf_clean_up(&entry->correlation_token_json_path); - - aws_mem_release(entry->allocator, entry); -} - -static void s_aws_rr_response_path_table_hash_element_destroy(void *value) { - s_aws_rr_response_path_entry_destroy(value); -} - struct aws_mqtt_rr_client_operation { struct aws_allocator *allocator; @@ -348,17 +265,9 @@ struct aws_mqtt_request_response_client { struct aws_priority_queue operations_by_timeout; /* - * Map from cursor (topic filter) -> list of streaming operations using that filter + * Structure to handle stream and request subscriptions. */ - struct aws_hash_table streaming_operation_subscription_lists; - - /* - * Map from cursor (topic) -> request response path (topic, correlation token json path) - * - * We don't garbage collect this table over the course of normal client operation. We only clean it up - * when the client is shutting down. - */ - struct aws_hash_table request_response_paths; + struct aws_request_response_subscriptions subscriptions; /* * Map from cursor (correlation token) -> request operation @@ -406,8 +315,8 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request aws_hash_table_clean_up(&client->operations); aws_priority_queue_clean_up(&client->operations_by_timeout); - aws_hash_table_clean_up(&client->streaming_operation_subscription_lists); - aws_hash_table_clean_up(&client->request_response_paths); + + aws_mqtt_request_response_client_subscriptions_clean_up(&client->subscriptions); aws_hash_table_clean_up(&client->operations_by_correlation_tokens); aws_mem_release(client->allocator, client); @@ -872,12 +781,25 @@ static void s_aws_rr_client_protocol_adapter_subscription_event_callback( } static void s_apply_publish_to_streaming_operation_list( - struct aws_rr_operation_list_topic_filter_entry *entry, - const struct aws_protocol_adapter_incoming_publish_event *publish_event) { - AWS_FATAL_ASSERT(entry != NULL); + const struct aws_linked_list *operations, + const struct aws_byte_cursor *topic_filter, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + void *user_data) { + + AWS_FATAL_ASSERT(operations != NULL); + + struct aws_mqtt_request_response_client *rr_client = user_data; - struct aws_linked_list_node *node = aws_linked_list_begin(&entry->operations); - while (node != aws_linked_list_end(&entry->operations)) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on topic '" PRInSTR + "' matches streaming subscription on topic filter '" PRInSTR "'", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic), + AWS_BYTE_CURSOR_PRI(*topic_filter)); + + struct aws_linked_list_node *node = aws_linked_list_begin(operations); + while (node != aws_linked_list_end(operations)) { struct aws_mqtt_rr_client_operation *operation = AWS_CONTAINER_OF(node, struct aws_mqtt_rr_client_operation, node); node = aws_linked_list_next(node); @@ -896,8 +818,8 @@ static void s_apply_publish_to_streaming_operation_list( continue; } - void *user_data = operation->storage.streaming_storage.options.user_data; - (*incoming_publish_callback)(publish_event->payload, publish_event->topic, user_data); + void *operation_user_data = operation->storage.streaming_storage.options.user_data; + (*incoming_publish_callback)(publish_event->payload, publish_event->topic, operation_user_data); AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, @@ -962,9 +884,17 @@ static void s_complete_operation_with_correlation_token( } static void s_apply_publish_to_response_path_entry( - struct aws_mqtt_request_response_client *rr_client, struct aws_rr_response_path_entry *entry, - const struct aws_protocol_adapter_incoming_publish_event *publish_event) { + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + void *user_data) { + + struct aws_mqtt_request_response_client *rr_client = user_data; + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on topic '" PRInSTR "' matches response path", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic)); struct aws_json_value *json_payload = NULL; @@ -1054,34 +984,12 @@ static void s_aws_rr_client_protocol_adapter_incoming_publish_callback( return; } - /* Streaming operation handling */ - struct aws_hash_element *subscription_filter_element = NULL; - if (aws_hash_table_find( - &rr_client->streaming_operation_subscription_lists, &publish_event->topic, &subscription_filter_element) == - AWS_OP_SUCCESS && - subscription_filter_element != NULL) { - AWS_LOGF_DEBUG( - AWS_LS_MQTT_REQUEST_RESPONSE, - "id=%p: request-response client incoming publish on topic '" PRInSTR "' matches streaming topic", - (void *)rr_client, - AWS_BYTE_CURSOR_PRI(publish_event->topic)); - - s_apply_publish_to_streaming_operation_list(subscription_filter_element->value, publish_event); - } - - /* Request-Response handling */ - struct aws_hash_element *response_path_element = NULL; - if (aws_hash_table_find(&rr_client->request_response_paths, &publish_event->topic, &response_path_element) == - AWS_OP_SUCCESS && - response_path_element != NULL) { - AWS_LOGF_DEBUG( - AWS_LS_MQTT_REQUEST_RESPONSE, - "id=%p: request-response client incoming publish on topic '" PRInSTR "' matches response path", - (void *)rr_client, - AWS_BYTE_CURSOR_PRI(publish_event->topic)); - - s_apply_publish_to_response_path_entry(rr_client, response_path_element->value, publish_event); - } + aws_mqtt_request_response_client_subscriptions_match( + &rr_client->subscriptions, + publish_event, + s_apply_publish_to_streaming_operation_list, + s_apply_publish_to_response_path_entry, + rr_client); } static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) { @@ -1171,23 +1079,7 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie sizeof(struct aws_mqtt_rr_client_operation *), s_compare_rr_operation_timeouts); - aws_hash_table_init( - &rr_client->streaming_operation_subscription_lists, - allocator, - MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, - aws_hash_byte_cursor_ptr, - aws_mqtt_byte_cursor_hash_equality, - NULL, - s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy); - - aws_hash_table_init( - &rr_client->request_response_paths, - allocator, - MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE, - aws_hash_byte_cursor_ptr, - aws_mqtt_byte_cursor_hash_equality, - NULL, - s_aws_rr_response_path_table_hash_element_destroy); + aws_mqtt_request_response_client_subscriptions_init(&rr_client->subscriptions, allocator); aws_hash_table_init( &rr_client->operations_by_correlation_tokens, @@ -1288,26 +1180,13 @@ static int s_add_streaming_operation_to_subscription_topic_filter_table( struct aws_byte_cursor topic_filter_cursor = operation->storage.streaming_storage.options.topic_filter; - struct aws_hash_element *element = NULL; - if (aws_hash_table_find(&client->streaming_operation_subscription_lists, &topic_filter_cursor, &element)) { - return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); - } - - struct aws_rr_operation_list_topic_filter_entry *entry = NULL; - if (element == NULL) { - entry = s_aws_rr_operation_list_topic_filter_entry_new(client->allocator, topic_filter_cursor); - aws_hash_table_put(&client->streaming_operation_subscription_lists, &entry->topic_filter_cursor, entry, NULL); - AWS_LOGF_DEBUG( - AWS_LS_MQTT_REQUEST_RESPONSE, - "id=%p: request-response client adding topic filter '" PRInSTR "' to streaming subscriptions table", - (void *)client, - AWS_BYTE_CURSOR_PRI(topic_filter_cursor)); - } else { - entry = element->value; + struct aws_rr_operation_list_topic_filter_entry *entry = + aws_mqtt_request_response_client_subscriptions_add_stream_subscription( + &client->subscriptions, &topic_filter_cursor); + if (entry == NULL) { + return AWS_OP_ERR; } - AWS_FATAL_ASSERT(entry != NULL); - if (aws_linked_list_node_is_in_list(&operation->node)) { aws_linked_list_remove(&operation->node); } @@ -1334,25 +1213,11 @@ static int s_add_request_operation_to_response_path_table( for (size_t i = 0; i < path_count; ++i) { struct aws_mqtt_request_operation_response_path path; aws_array_list_get_at(paths, &path, i); - - struct aws_hash_element *element = NULL; - if (aws_hash_table_find(&client->request_response_paths, &path.topic, &element)) { - return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); - } - - if (element != NULL) { - struct aws_rr_response_path_entry *entry = element->value; - ++entry->ref_count; - continue; - } - - struct aws_rr_response_path_entry *entry = - s_aws_rr_response_path_entry_new(client->allocator, path.topic, path.correlation_token_json_path); - if (aws_hash_table_put(&client->request_response_paths, &entry->topic_cursor, entry, NULL)) { - return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + if (aws_mqtt_request_response_client_subscriptions_add_request_subscription( + &client->subscriptions, &path.topic, &path.correlation_token_json_path)) { + return AWS_OP_ERR; } } - return AWS_OP_SUCCESS; } @@ -1841,38 +1706,18 @@ static void s_remove_operation_from_client_tables(struct aws_mqtt_rr_client_oper NULL); struct aws_array_list *paths = &operation->storage.request_storage.operation_response_paths; + size_t path_count = aws_array_list_length(paths); for (size_t i = 0; i < path_count; ++i) { struct aws_mqtt_request_operation_response_path path; aws_array_list_get_at(paths, &path, i); - - struct aws_hash_element *element = NULL; - if (aws_hash_table_find(&client->request_response_paths, &path.topic, &element) || element == NULL) { + if (aws_mqtt_request_response_client_subscriptions_remove_request_subscription( + &client->subscriptions, &path.topic) == AWS_OP_ERR) { AWS_LOGF_ERROR( AWS_LS_MQTT_REQUEST_RESPONSE, "id=%p: internal state error removing reference to response path for topic " PRInSTR, (void *)client, AWS_BYTE_CURSOR_PRI(path.topic)); - continue; - } - - struct aws_rr_response_path_entry *entry = element->value; - --entry->ref_count; - - if (entry->ref_count == 0) { - AWS_LOGF_DEBUG( - AWS_LS_MQTT_REQUEST_RESPONSE, - "id=%p: removing last reference to response path for topic " PRInSTR, - (void *)client, - AWS_BYTE_CURSOR_PRI(path.topic)); - aws_hash_table_remove(&client->request_response_paths, &path.topic, NULL, NULL); - } else { - AWS_LOGF_DEBUG( - AWS_LS_MQTT_REQUEST_RESPONSE, - "id=%p: removing reference to response path for topic " PRInSTR ", %zu references remain", - (void *)client, - AWS_BYTE_CURSOR_PRI(path.topic), - entry->ref_count); } } } diff --git a/source/request-response/request_response_subscription_set.c b/source/request-response/request_response_subscription_set.c new file mode 100644 index 00000000..48403cfe --- /dev/null +++ b/source/request-response/request_response_subscription_set.c @@ -0,0 +1,326 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include +#include + +#define MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE 50 +#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50 + +static void s_aws_rr_operation_list_topic_filter_entry_destroy(struct aws_rr_operation_list_topic_filter_entry *entry) { + if (entry == NULL) { + return; + } + + aws_byte_buf_clean_up(&entry->topic_filter); + + aws_mem_release(entry->allocator, entry); +} + +static void s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy(void *value) { + s_aws_rr_operation_list_topic_filter_entry_destroy(value); +} + +static struct aws_rr_response_path_entry *s_aws_rr_response_path_entry_new( + struct aws_allocator *allocator, + struct aws_byte_cursor topic, + struct aws_byte_cursor correlation_token_json_path) { + struct aws_rr_response_path_entry *entry = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_response_path_entry)); + + entry->allocator = allocator; + entry->ref_count = 1; + aws_byte_buf_init_copy_from_cursor(&entry->topic, allocator, topic); + entry->topic_cursor = aws_byte_cursor_from_buf(&entry->topic); + + aws_byte_buf_init_copy_from_cursor(&entry->correlation_token_json_path, allocator, correlation_token_json_path); + + return entry; +} + +static void s_aws_rr_response_path_entry_destroy(struct aws_rr_response_path_entry *entry) { + if (entry == NULL) { + return; + } + + aws_byte_buf_clean_up(&entry->topic); + aws_byte_buf_clean_up(&entry->correlation_token_json_path); + + aws_mem_release(entry->allocator, entry); +} + +static void s_aws_rr_response_path_table_hash_element_destroy(void *value) { + s_aws_rr_response_path_entry_destroy(value); +} + +int aws_mqtt_request_response_client_subscriptions_init( + struct aws_request_response_subscriptions *subscriptions, + struct aws_allocator *allocator) { + AWS_FATAL_ASSERT(subscriptions); + + subscriptions->allocator = allocator; + + if (aws_hash_table_init( + &subscriptions->streaming_operation_subscription_lists, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) { + goto clean_up; + } + + if (aws_hash_table_init( + &subscriptions->streaming_operation_wildcards_subscription_lists, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) { + goto clean_up; + } + + if (aws_hash_table_init( + &subscriptions->request_response_paths, + allocator, + MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_response_path_table_hash_element_destroy)) { + goto clean_up; + } + + return AWS_OP_SUCCESS; + +clean_up: + aws_mqtt_request_response_client_subscriptions_clean_up(subscriptions); + return AWS_OP_ERR; +} + +void aws_mqtt_request_response_client_subscriptions_clean_up(struct aws_request_response_subscriptions *subscriptions) { + if (subscriptions == NULL) { + return; + } + + if (aws_hash_table_is_valid(&subscriptions->streaming_operation_subscription_lists)) { + aws_hash_table_clean_up(&subscriptions->streaming_operation_subscription_lists); + } + if (aws_hash_table_is_valid(&subscriptions->streaming_operation_wildcards_subscription_lists)) { + aws_hash_table_clean_up(&subscriptions->streaming_operation_wildcards_subscription_lists); + } + if (aws_hash_table_is_valid(&subscriptions->request_response_paths)) { + aws_hash_table_clean_up(&subscriptions->request_response_paths); + } +} + +static struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_topic_filter_entry_new( + struct aws_allocator *allocator, + struct aws_byte_cursor topic_filter) { + struct aws_rr_operation_list_topic_filter_entry *entry = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_operation_list_topic_filter_entry)); + + entry->allocator = allocator; + aws_byte_buf_init_copy_from_cursor(&entry->topic_filter, allocator, topic_filter); + entry->topic_filter_cursor = aws_byte_cursor_from_buf(&entry->topic_filter); + + aws_linked_list_init(&entry->operations); + + return entry; +} + +struct aws_rr_operation_list_topic_filter_entry *aws_mqtt_request_response_client_subscriptions_add_stream_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter) { + AWS_FATAL_ASSERT(subscriptions); + + bool is_topic_with_wildcard = + (memchr(topic_filter->ptr, '+', topic_filter->len) || memchr(topic_filter->ptr, '#', topic_filter->len)); + + struct aws_hash_table *subscription_lists = is_topic_with_wildcard + ? &subscriptions->streaming_operation_wildcards_subscription_lists + : &subscriptions->streaming_operation_subscription_lists; + + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(subscription_lists, topic_filter, &element)) { + aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + return NULL; + } + + struct aws_rr_operation_list_topic_filter_entry *entry = NULL; + if (element == NULL) { + entry = s_aws_rr_operation_list_topic_filter_entry_new(subscriptions->allocator, *topic_filter); + aws_hash_table_put(subscription_lists, &entry->topic_filter_cursor, entry, NULL); + } else { + entry = element->value; + } + + AWS_FATAL_ASSERT(entry != NULL); + + return entry; +} + +int aws_mqtt_request_response_client_subscriptions_add_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter, + const struct aws_byte_cursor *correlation_token_json_path) { + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(&subscriptions->request_response_paths, topic_filter, &element)) { + return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + } + + if (element != NULL) { + struct aws_rr_response_path_entry *entry = element->value; + ++entry->ref_count; + return AWS_OP_SUCCESS; + } + + struct aws_rr_response_path_entry *entry = + s_aws_rr_response_path_entry_new(subscriptions->allocator, *topic_filter, *correlation_token_json_path); + if (aws_hash_table_put(&subscriptions->request_response_paths, &entry->topic_cursor, entry, NULL)) { + s_aws_rr_response_path_entry_destroy(entry); + return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + } + + return AWS_OP_SUCCESS; +} + +int aws_mqtt_request_response_client_subscriptions_remove_request_subscription( + struct aws_request_response_subscriptions *subscriptions, + const struct aws_byte_cursor *topic_filter) { + + AWS_FATAL_ASSERT(subscriptions); + AWS_FATAL_ASSERT(topic_filter); + + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(&subscriptions->request_response_paths, topic_filter, &element) || element == NULL) { + return AWS_OP_ERR; + } + + struct aws_rr_response_path_entry *entry = element->value; + --entry->ref_count; + + if (entry->ref_count == 0) { + aws_hash_table_remove(&subscriptions->request_response_paths, topic_filter, NULL, NULL); + } + + return AWS_OP_SUCCESS; +} + +static void s_match_stream_subscriptions( + const struct aws_hash_table *subscriptions, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + void *user_data) { + struct aws_hash_element *subscription_filter_element = NULL; + if (aws_hash_table_find(subscriptions, &publish_event->topic, &subscription_filter_element) == AWS_OP_SUCCESS && + subscription_filter_element != NULL) { + + struct aws_rr_operation_list_topic_filter_entry *entry = subscription_filter_element->value; + on_stream_operation_subscription_match( + &entry->operations, &entry->topic_filter_cursor, publish_event, user_data); + } +} + +static void s_match_wildcard_stream_subscriptions( + const struct aws_hash_table *subscriptions, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + void *user_data) { + + /* + * Incoming event's topic is checked against each registered stream with wildcard. While this approach is far from + * optimal, it should be sufficient for request-response client where not many subscriptions with wildcards are + * used. + */ + for (struct aws_hash_iter iter = aws_hash_iter_begin(subscriptions); !aws_hash_iter_done(&iter); + aws_hash_iter_next(&iter)) { + struct aws_rr_operation_list_topic_filter_entry *entry = iter.element.value; + + struct aws_byte_cursor subscription_topic_filter_segment; + AWS_ZERO_STRUCT(subscription_topic_filter_segment); + + struct aws_byte_cursor topic_segment; + AWS_ZERO_STRUCT(topic_segment); + + bool match = true; + bool multi_level_wildcard = false; + + while (aws_byte_cursor_next_split(&entry->topic_filter_cursor, '/', &subscription_topic_filter_segment)) { + if (!aws_byte_cursor_next_split(&publish_event->topic, '/', &topic_segment)) { + match = false; + break; + } + + if (aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "#")) { + multi_level_wildcard = true; + match = true; + break; + } + + if (!aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "+") && + !aws_byte_cursor_eq_ignore_case(&topic_segment, &subscription_topic_filter_segment)) { + match = false; + break; + } + } + + if (!multi_level_wildcard && aws_byte_cursor_next_split(&publish_event->topic, '/', &topic_segment)) { + match = false; + } + + if (match) { + on_stream_operation_subscription_match( + &entry->operations, &entry->topic_filter_cursor, publish_event, user_data); + } + } +} + +void s_match_request_response_subscriptions( + const struct aws_hash_table *request_response_paths, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match, + void *user_data) { + + struct aws_hash_element *response_path_element = NULL; + if (aws_hash_table_find(request_response_paths, &publish_event->topic, &response_path_element) == AWS_OP_SUCCESS && + response_path_element != NULL) { + + on_request_operation_subscription_match(response_path_element->value, publish_event, user_data); + } +} + +void aws_mqtt_request_response_client_subscriptions_match( + const struct aws_request_response_subscriptions *subscriptions, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match, + aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match, + void *user_data) { + + AWS_FATAL_PRECONDITION(subscriptions); + AWS_FATAL_PRECONDITION(publish_event); + AWS_FATAL_PRECONDITION(on_stream_operation_subscription_match); + AWS_FATAL_PRECONDITION(on_request_operation_subscription_match); + + /* Streaming operation handling */ + s_match_stream_subscriptions( + &subscriptions->streaming_operation_subscription_lists, + publish_event, + on_stream_operation_subscription_match, + user_data); + + s_match_wildcard_stream_subscriptions( + &subscriptions->streaming_operation_wildcards_subscription_lists, + publish_event, + on_stream_operation_subscription_match, + user_data); + + /* Request-Response handling */ + s_match_request_response_subscriptions( + &subscriptions->request_response_paths, publish_event, on_request_operation_subscription_match, user_data); +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 67aeb1e7..6dbe3a12 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -577,6 +577,19 @@ add_test_case(rrc_request_response_failure_invalid_correlation_token_type) add_test_case(rrc_request_response_failure_non_matching_correlation_token) add_test_case(rrc_request_response_multi_operation_sequence) +# "rrs" = request-response subscriptions +add_test_case(rrs_init_cleanup) +add_test_case(rrs_stream_subscriptions_match_single_level_wildcards) +add_test_case(rrs_stream_subscriptions_match_multi_level_wildcards) +add_test_case(rrs_stream_subscriptions_add_duplicate) +add_test_case(rrs_stream_subscriptions_too_long_publish_topic) +add_test_case(rrs_stream_subscriptions_too_short_publish_topic) +add_test_case(rrs_request_subscriptions_add_single_subscription) +add_test_case(rrs_request_subscriptions_remove_subscription) +add_test_case(rrs_request_subscriptions_add_duplicate_then_remove) +add_test_case(rrs_request_subscriptions_remove_nonexistent_subscription) +add_test_case(rrs_stream_and_request_subscriptions_add_same_subscription) + generate_test_driver(${PROJECT_NAME}-tests) set(TEST_PAHO_CLIENT_BINARY_NAME ${PROJECT_NAME}-paho-client) diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index a12e2bf9..5cbb7405 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -1249,8 +1250,9 @@ static int s_rrc_streaming_operation_success_single_fn(struct aws_allocator *all ASSERT_SUCCESS(s_init_fixture_streaming_operation_success(&fixture, &client_test_options, allocator, NULL, NULL)); struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic = aws_byte_cursor_from_c_str("topic/+"); struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); - struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic); s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); @@ -1994,12 +1996,14 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( &fixture, &client_test_options, allocator, s_rrc_unsubscribe_success_config, NULL)); struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); - struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1/+"); + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/1/abc"); struct aws_mqtt_rr_client_operation *operation1 = s_create_streaming_operation(&fixture, record_key1, topic_filter1); struct aws_byte_cursor record_key2 = aws_byte_cursor_from_c_str("key2"); - struct aws_byte_cursor topic_filter2 = aws_byte_cursor_from_c_str("topic/2"); + struct aws_byte_cursor topic_filter2 = aws_byte_cursor_from_c_str("/topic/2/+"); + struct aws_byte_cursor topic2 = aws_byte_cursor_from_c_str("/topic/2/def"); struct aws_mqtt_rr_client_operation *operation2 = s_create_streaming_operation(&fixture, record_key2, topic_filter2); @@ -2022,24 +2026,24 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( - &fixture, record_key2, AWS_ARRAY_SIZE(expected_failure_events), expected_failure_events)); + &fixture, record_key2, AWS_ARRAY_SIZE(expected_success_events), expected_failure_events)); // two publishes on the mqtt client that get reflected into our subscription topic1 struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); - ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); - ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic1, payload2)); s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { { payload1, - topic_filter1, + topic1, }, { payload2, - topic_filter1, + topic1, }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); @@ -2057,8 +2061,7 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( // make a third using topic filter 2 struct aws_byte_cursor record_key3 = aws_byte_cursor_from_c_str("key3"); - struct aws_mqtt_rr_client_operation *operation3 = - s_create_streaming_operation(&fixture, record_key3, topic_filter2); + struct aws_mqtt_rr_client_operation *operation3 = s_create_streaming_operation(&fixture, record_key3, topic2); s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key3, 1); ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( @@ -2066,7 +2069,7 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( // publish again struct aws_byte_cursor payload3 = aws_byte_cursor_from_c_str("payload3"); - ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter2, payload3)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic2, payload3)); // verify third operation got the new publish s_rrc_wait_for_n_streaming_publishes(&fixture, record_key3, 1); @@ -2074,7 +2077,7 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( struct aws_rr_client_fixture_publish_message_view third_expected_publishes[] = { { payload3, - topic_filter2, + topic2, }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( @@ -3237,3 +3240,760 @@ static int s_rrc_request_response_multi_operation_sequence_fn(struct aws_allocat } AWS_TEST_CASE(rrc_request_response_multi_operation_sequence, s_rrc_request_response_multi_operation_sequence_fn) + +struct aws_rr_client_fixture_matched_stream_subscription { + struct aws_allocator *allocator; + struct aws_byte_buf payload; + struct aws_byte_buf topic; + struct aws_byte_buf topic_filter; +}; + +struct aws_rr_client_fixture_matched_stream_subscription_view { + struct aws_byte_cursor payload; + struct aws_byte_cursor topic; + struct aws_byte_cursor topic_filter; +}; + +struct aws_rr_client_fixture_matched_request_subscription { + struct aws_allocator *allocator; + struct aws_byte_buf payload; + struct aws_byte_buf topic; + struct aws_byte_buf topic_filter; + struct aws_byte_buf token_path; +}; + +struct aws_rr_client_fixture_matched_request_subscription_view { + struct aws_byte_cursor payload; + struct aws_byte_cursor topic; + struct aws_byte_cursor topic_filter; + struct aws_byte_cursor token_path; +}; + +struct aws_rr_client_fixture_subscriptions_matches_record { + struct aws_allocator *allocator; + // table: topic_filter -> aws_rr_client_fixture_matched_stream_subscription + struct aws_hash_table stream_matches; + // table: topic_filter -> aws_rr_client_fixture_matched_request_subscription + struct aws_hash_table request_matches; + size_t stream_matches_count; + size_t request_matches_count; +}; + +static void s_aws_rr_client_fixture_stream_subscription_destroy(void *value) { + struct aws_rr_client_fixture_matched_stream_subscription *matched_subscription = value; + aws_byte_buf_clean_up(&matched_subscription->payload); + aws_byte_buf_clean_up(&matched_subscription->topic); + aws_byte_buf_clean_up(&matched_subscription->topic_filter); + aws_mem_release(matched_subscription->allocator, matched_subscription); +} + +static void s_aws_rr_client_fixture_request_subscription_destroy(void *value) { + struct aws_rr_client_fixture_matched_request_subscription *matched_subscription = value; + aws_byte_buf_clean_up(&matched_subscription->payload); + aws_byte_buf_clean_up(&matched_subscription->topic); + aws_byte_buf_clean_up(&matched_subscription->topic_filter); + aws_byte_buf_clean_up(&matched_subscription->token_path); + aws_mem_release(matched_subscription->allocator, matched_subscription); +} + +struct aws_rr_client_fixture_subscriptions_matches_record *s_aws_rr_client_fixture_subscriptions_matches_record_new( + struct aws_allocator *allocator) { + struct aws_rr_client_fixture_subscriptions_matches_record *record = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_client_fixture_subscriptions_matches_record)); + + record->allocator = allocator; + aws_hash_table_init( + &record->stream_matches, + allocator, + 10, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_client_fixture_stream_subscription_destroy); + record->stream_matches_count = 0; + + aws_hash_table_init( + &record->request_matches, + allocator, + 10, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_client_fixture_request_subscription_destroy); + record->request_matches_count = 0; + + return record; +} + +void s_aws_rr_client_fixture_subscriptions_macthes_record_delete( + struct aws_rr_client_fixture_subscriptions_matches_record *record) { + aws_hash_table_clean_up(&record->stream_matches); + aws_hash_table_clean_up(&record->request_matches); + aws_mem_release(record->allocator, record); +} + +static int s_rrc_verify_subscriptions_publishes( + struct aws_rr_client_fixture_subscriptions_matches_record *record, + size_t expected_stream_matches_count, + struct aws_rr_client_fixture_matched_stream_subscription_view *expected_stream_matched_subscriptions, + size_t expected_request_matches_count, + struct aws_rr_client_fixture_matched_request_subscription_view *expected_request_matched_subscriptions) { + + ASSERT_INT_EQUALS(expected_stream_matches_count, record->stream_matches_count); + + for (size_t i = 0; i < expected_stream_matches_count; ++i) { + struct aws_rr_client_fixture_matched_stream_subscription_view *expected_matched_subscription = + &expected_stream_matched_subscriptions[i]; + + struct aws_hash_element *element = NULL; + ASSERT_SUCCESS( + aws_hash_table_find(&record->stream_matches, &expected_matched_subscription->topic_filter, &element)); + + struct aws_rr_client_fixture_matched_stream_subscription *actual_matched_subscription = element->value; + + ASSERT_BIN_ARRAYS_EQUALS( + expected_matched_subscription->payload.ptr, + expected_matched_subscription->payload.len, + actual_matched_subscription->payload.buffer, + actual_matched_subscription->payload.len); + ASSERT_BIN_ARRAYS_EQUALS( + expected_matched_subscription->topic.ptr, + expected_matched_subscription->topic.len, + actual_matched_subscription->topic.buffer, + actual_matched_subscription->topic.len); + ASSERT_BIN_ARRAYS_EQUALS( + expected_matched_subscription->topic_filter.ptr, + expected_matched_subscription->topic_filter.len, + actual_matched_subscription->topic_filter.buffer, + actual_matched_subscription->topic_filter.len); + } + + ASSERT_INT_EQUALS(expected_request_matches_count, record->request_matches_count); + + for (size_t i = 0; i < expected_request_matches_count; ++i) { + struct aws_rr_client_fixture_matched_request_subscription_view *expected_matched_subscription = + &expected_request_matched_subscriptions[i]; + + struct aws_hash_element *element = NULL; + ASSERT_SUCCESS( + aws_hash_table_find(&record->request_matches, &expected_matched_subscription->topic_filter, &element)); + + struct aws_rr_client_fixture_matched_request_subscription *actual_matched_subscription = element->value; + + ASSERT_BIN_ARRAYS_EQUALS( + expected_matched_subscription->payload.ptr, + expected_matched_subscription->payload.len, + actual_matched_subscription->payload.buffer, + actual_matched_subscription->payload.len); + ASSERT_BIN_ARRAYS_EQUALS( + expected_matched_subscription->topic.ptr, + expected_matched_subscription->topic.len, + actual_matched_subscription->topic.buffer, + actual_matched_subscription->topic.len); + ASSERT_BIN_ARRAYS_EQUALS( + expected_matched_subscription->topic_filter.ptr, + expected_matched_subscription->topic_filter.len, + actual_matched_subscription->topic_filter.buffer, + actual_matched_subscription->topic_filter.len); + ASSERT_BIN_ARRAYS_EQUALS( + expected_matched_subscription->token_path.ptr, + expected_matched_subscription->token_path.len, + actual_matched_subscription->token_path.buffer, + actual_matched_subscription->token_path.len); + } + + return AWS_OP_SUCCESS; +} + +static void s_rrs_fixture_on_stream_operation_subscription_match( + const struct aws_linked_list *operations, + const struct aws_byte_cursor *topic_filter, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + void *user_data) { + + (void)operations; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = user_data; + + struct aws_rr_client_fixture_matched_stream_subscription *matched_subscription = + aws_mem_calloc(record->allocator, 1, sizeof(struct aws_rr_client_fixture_matched_stream_subscription)); + matched_subscription->allocator = record->allocator; + aws_byte_buf_init_copy_from_cursor(&matched_subscription->payload, record->allocator, publish_event->payload); + aws_byte_buf_init_copy_from_cursor(&matched_subscription->topic, record->allocator, publish_event->topic); + aws_byte_buf_init_copy_from_cursor(&matched_subscription->topic_filter, record->allocator, *topic_filter); + + aws_hash_table_put(&record->stream_matches, topic_filter, matched_subscription, NULL); + ++record->stream_matches_count; +} + +static void s_rrs_fixture_on_request_operation_subscription_match( + struct aws_rr_response_path_entry *entry, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, + void *user_data) { + (void)entry; + (void)publish_event; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = user_data; + + struct aws_rr_client_fixture_matched_request_subscription *matched_subscription = + aws_mem_calloc(record->allocator, 1, sizeof(struct aws_rr_client_fixture_matched_request_subscription)); + matched_subscription->allocator = record->allocator; + aws_byte_buf_init_copy_from_cursor(&matched_subscription->payload, record->allocator, publish_event->payload); + aws_byte_buf_init_copy_from_cursor(&matched_subscription->topic, record->allocator, publish_event->topic); + aws_byte_buf_init_copy_from_cursor(&matched_subscription->topic_filter, record->allocator, publish_event->topic); + aws_byte_buf_init_copy(&matched_subscription->token_path, record->allocator, &entry->correlation_token_json_path); + + aws_hash_table_put(&record->request_matches, &publish_event->topic, matched_subscription, NULL); + ++record->request_matches_count; +} + +static int s_rrs_init_cleanup_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + ASSERT_SUCCESS(aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator)); + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrs_init_cleanup, s_rrs_init_cleanup_fn) + +static int s_rrs_stream_subscriptions_match_single_level_wildcards_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor topic_filter2 = aws_byte_cursor_from_c_str("topic/123/+"); + struct aws_byte_cursor topic_filter3 = aws_byte_cursor_from_c_str("topic/+/abc"); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter2); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter3); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + struct aws_rr_client_fixture_matched_stream_subscription_view matched_subscriptions[] = { + {payload1, topic1, topic_filter1}, + {payload1, topic1, topic_filter2}, + {payload1, topic1, topic_filter3}, + }; + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + AWS_ARRAY_SIZE(matched_subscriptions), + matched_subscriptions, + 0, /* expected_request_matches_count */ + NULL)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrs_stream_subscriptions_match_single_level_wildcards, + s_rrs_stream_subscriptions_match_single_level_wildcards_fn) + +static int s_rrs_stream_subscriptions_match_multi_level_wildcards_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor topic_filter2 = aws_byte_cursor_from_c_str("topic/123/#"); + struct aws_byte_cursor topic_filter3 = aws_byte_cursor_from_c_str("topic/#"); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter2); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter3); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + struct aws_rr_client_fixture_matched_stream_subscription_view matched_subscriptions[] = { + {payload1, topic1, topic_filter1}, + {payload1, topic1, topic_filter2}, + {payload1, topic1, topic_filter3}, + }; + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + AWS_ARRAY_SIZE(matched_subscriptions), + matched_subscriptions, + 0, /* expected_request_matches_count */ + NULL)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrs_stream_subscriptions_match_multi_level_wildcards, + s_rrs_stream_subscriptions_match_multi_level_wildcards_fn) + +/* Adding multiple identical stream subscriptions should add only one record. */ +static int s_rrs_stream_subscriptions_add_duplicate_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/123/+"); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + struct aws_rr_client_fixture_matched_stream_subscription_view matched_subscriptions[] = { + {payload1, topic1, topic_filter1}, + }; + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + AWS_ARRAY_SIZE(matched_subscriptions), + matched_subscriptions, + 0, /* expected_request_matches_count */ + NULL)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrs_stream_subscriptions_add_duplicate, s_rrs_stream_subscriptions_add_duplicate_fn) + +static int s_rrs_stream_subscriptions_too_long_publish_topic_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/123/+"); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc/def"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + 0, /* expected_stream_matches_count */ + NULL, + 0, /* expected_request_matches_count */ + NULL)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrs_stream_subscriptions_too_long_publish_topic, s_rrs_stream_subscriptions_too_long_publish_topic_fn) + +static int s_rrs_stream_subscriptions_too_short_publish_topic_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/123/+"); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic_filter1); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + 0, /* expected_stream_matches_count */ + NULL, + 0, /* expected_request_matches_count */ + NULL)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrs_stream_subscriptions_too_short_publish_topic, s_rrs_stream_subscriptions_too_short_publish_topic_fn) + +static int s_rrs_request_subscriptions_add_single_subscription_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor token_path1 = aws_byte_cursor_from_c_str("token1"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_request_subscription(&subscriptions, &topic1, &token_path1); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + struct aws_rr_client_fixture_matched_request_subscription_view matched_subscriptions[] = { + {payload1, topic1, topic1, token_path1}, + }; + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + 0 /* expected_stream_matches_count */, + NULL, + AWS_ARRAY_SIZE(matched_subscriptions), + matched_subscriptions)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrs_request_subscriptions_add_single_subscription, s_rrs_request_subscriptions_add_single_subscription_fn) + +static int s_rrs_request_subscriptions_remove_subscription_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor token_path1 = aws_byte_cursor_from_c_str("token1"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_request_subscription(&subscriptions, &topic1, &token_path1); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + struct aws_rr_client_fixture_matched_request_subscription_view matched_subscriptions[] = { + {payload1, topic1, topic1, token_path1}, + }; + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + 0 /* expected_stream_matches_count */, + NULL, + AWS_ARRAY_SIZE(matched_subscriptions), + matched_subscriptions)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_remove_request_subscription(&subscriptions, &topic1); + + struct aws_rr_client_fixture_subscriptions_matches_record *record2 = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record2); + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record2, 0 /* expected_stream_matches_count */, NULL, 0 /* expected_request_matches_count */, NULL)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record2); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrs_request_subscriptions_remove_subscription, s_rrs_request_subscriptions_remove_subscription_fn) + +/* After adding multiple identical request subscriptions, the same number of removals is required to actually remove + * the subscription. */ +static int s_rrs_request_subscriptions_add_duplicate_then_remove_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor token_path1 = aws_byte_cursor_from_c_str("token1"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_request_subscription(&subscriptions, &topic1, &token_path1); + aws_mqtt_request_response_client_subscriptions_add_request_subscription(&subscriptions, &topic1, &token_path1); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + struct aws_rr_client_fixture_matched_request_subscription_view matched_subscriptions[] = { + {payload1, topic1, topic1, token_path1}, + }; + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + 0 /* expected_stream_matches_count */, + NULL, + AWS_ARRAY_SIZE(matched_subscriptions), + matched_subscriptions)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + // First remove, decrement subscription's internal counter to 1. + aws_mqtt_request_response_client_subscriptions_remove_request_subscription(&subscriptions, &topic1); + + struct aws_rr_client_fixture_subscriptions_matches_record *record2 = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record2); + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record2, + 0 /* expected_stream_matches_count */, + NULL, + AWS_ARRAY_SIZE(matched_subscriptions), + matched_subscriptions)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record2); + + // Second remove, decrement subscription's internal counter to 0 and remove it. + aws_mqtt_request_response_client_subscriptions_remove_request_subscription(&subscriptions, &topic1); + + struct aws_rr_client_fixture_subscriptions_matches_record *record3 = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record3); + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record3, 0 /* expected_stream_matches_count */, NULL, 0 /* expected_request_matches_count */, NULL)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record3); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrs_request_subscriptions_add_duplicate_then_remove, + s_rrs_request_subscriptions_add_duplicate_then_remove_fn) + +static int s_rrs_request_subscriptions_remove_nonexistent_subscription_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + aws_mqtt_request_response_client_subscriptions_remove_request_subscription(&subscriptions, &topic1); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrs_request_subscriptions_remove_nonexistent_subscription, + s_rrs_request_subscriptions_remove_nonexistent_subscription_fn) + +static int s_rrs_stream_and_request_subscriptions_add_same_subscription_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_request_response_subscriptions subscriptions; + + aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator); + + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("topic/123/abc"); + struct aws_byte_cursor topic2 = aws_byte_cursor_from_c_str("topic/123/+"); + struct aws_byte_cursor token_path1 = aws_byte_cursor_from_c_str("token1"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic1); + aws_mqtt_request_response_client_subscriptions_add_stream_subscription(&subscriptions, &topic2); + aws_mqtt_request_response_client_subscriptions_add_request_subscription(&subscriptions, &topic1, &token_path1); + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = topic1, + .payload = payload1, + }; + + struct aws_rr_client_fixture_subscriptions_matches_record *record = + s_aws_rr_client_fixture_subscriptions_matches_record_new(allocator); + + aws_mqtt_request_response_client_subscriptions_match( + &subscriptions, + &publish_event, + s_rrs_fixture_on_stream_operation_subscription_match, + s_rrs_fixture_on_request_operation_subscription_match, + record); + + struct aws_rr_client_fixture_matched_stream_subscription_view matched_stream_subscriptions[] = { + {payload1, topic1, topic1}, + {payload1, topic1, topic2}, + }; + + struct aws_rr_client_fixture_matched_request_subscription_view matched_request_subscriptions[] = { + {payload1, topic1, topic1, token_path1}, + }; + + ASSERT_SUCCESS(s_rrc_verify_subscriptions_publishes( + record, + AWS_ARRAY_SIZE(matched_stream_subscriptions), + matched_stream_subscriptions, + AWS_ARRAY_SIZE(matched_request_subscriptions), + matched_request_subscriptions)); + + s_aws_rr_client_fixture_subscriptions_macthes_record_delete(record); + + aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrs_stream_and_request_subscriptions_add_same_subscription, + s_rrs_stream_and_request_subscriptions_add_same_subscription_fn)