diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h index 450675d1..5cad26b7 100644 --- a/include/aws/mqtt/request-response/request_response_client.h +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -8,8 +8,6 @@ #include -#include - struct aws_mqtt_request_response_client; struct aws_mqtt_client_connection; struct aws_mqtt5_client; @@ -46,8 +44,30 @@ struct aws_mqtt_request_operation_storage { struct aws_byte_buf operation_data; }; +/* + * Describes a change to the state of a request operation subscription + */ +enum aws_rr_streaming_subscription_event_type { + + /* + * The streaming operation is successfully subscribed to its topic (filter) + */ + ARRSSET_SUBSCRIPTION_ESTABLISHED, + + /* + * The streaming operation has temporarily lost its subscription to its topic (filter) + */ + ARRSSET_SUBSCRIPTION_LOST, + + /* + * The streaming operation has entered a terminal state where it has given up trying to subscribe + * to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy). + */ + ARRSSET_SUBSCRIPTION_HALTED, +}; + typedef void(aws_mqtt_streaming_operation_subscription_status_fn)( - enum aws_rr_subscription_event_type status, + enum aws_rr_streaming_subscription_event_type status, int error_code, void *user_data); typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data); diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index 5dc0fa5c..7ababf77 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -330,6 +330,104 @@ static void s_rrc_wait_on_streaming_termination( aws_mutex_unlock(&fixture->lock); } +struct rrc_streaming_event_wait_context { + struct aws_byte_cursor operation_key; + struct aws_rr_client_test_fixture *fixture; + size_t event_count; +}; + +static bool s_streaming_operation_has_n_publishes(void *context) { + struct rrc_streaming_event_wait_context *streaming_publish_context = context; + + struct aws_hash_element *element = NULL; + aws_hash_table_find( + &streaming_publish_context->fixture->streaming_records, &streaming_publish_context->operation_key, &element); + + AWS_FATAL_ASSERT(element != NULL && element->value != NULL); + + struct aws_rr_client_fixture_streaming_record *record = element->value; + + return aws_array_list_length(&record->publishes) >= streaming_publish_context->event_count; +} + +static void s_rrc_wait_for_n_streaming_publishes( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor key, + size_t count) { + struct rrc_streaming_event_wait_context context = { + .operation_key = key, + .fixture = fixture, + .event_count = count, + }; + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_streaming_operation_has_n_publishes, &context); + aws_mutex_unlock(&fixture->lock); +} + +static int s_rrc_verify_streaming_publishes( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor key, + size_t expected_publish_count, + struct aws_byte_cursor *expected_publishes) { + + aws_mutex_lock(&fixture->lock); + + struct aws_hash_element *element = NULL; + aws_hash_table_find(&fixture->streaming_records, &key, &element); + + AWS_FATAL_ASSERT(element != NULL && element->value != NULL); + + struct aws_rr_client_fixture_streaming_record *record = element->value; + + size_t actual_publish_count = aws_array_list_length(&record->publishes); + ASSERT_INT_EQUALS(expected_publish_count, actual_publish_count); + + for (size_t i = 0; i < actual_publish_count; ++i) { + struct aws_byte_buf actual_payload; + aws_array_list_get_at(&record->publishes, &actual_payload, i); + + struct aws_byte_cursor *expected_payload = &expected_publishes[i]; + + ASSERT_BIN_ARRAYS_EQUALS( + expected_payload->ptr, expected_payload->len, actual_payload.buffer, actual_payload.len); + } + + aws_mutex_unlock(&fixture->lock); + + return AWS_OP_SUCCESS; +} + +static bool s_streaming_operation_has_n_subscription_events(void *context) { + struct rrc_streaming_event_wait_context *streaming_publish_context = context; + + struct aws_hash_element *element = NULL; + aws_hash_table_find( + &streaming_publish_context->fixture->streaming_records, &streaming_publish_context->operation_key, &element); + + AWS_FATAL_ASSERT(element != NULL && element->value != NULL); + + struct aws_rr_client_fixture_streaming_record *record = element->value; + + return aws_array_list_length(&record->subscription_events) >= streaming_publish_context->event_count; +} + +static void s_rrc_wait_for_n_streaming_subscription_events( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor key, + size_t count) { + struct rrc_streaming_event_wait_context context = { + .operation_key = key, + .fixture = fixture, + .event_count = count, + }; + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred( + &fixture->signal, &fixture->lock, s_streaming_operation_has_n_subscription_events, &context); + aws_mutex_unlock(&fixture->lock); +} + static int s_rrc_verify_streaming_record_subscription_events( struct aws_rr_client_test_fixture *fixture, struct aws_byte_cursor key,