diff --git a/include/aws/mqtt/private/request-response/subscription_manager.h b/include/aws/mqtt/private/request-response/subscription_manager.h index 4d644af2..20e1b57f 100644 --- a/include/aws/mqtt/private/request-response/subscription_manager.h +++ b/include/aws/mqtt/private/request-response/subscription_manager.h @@ -22,12 +22,12 @@ enum aws_rr_subscription_event_type { /* * A request subscription subscribe succeeded */ - ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + ARRSET_REQUEST_SUBSCRIBE_SUCCESS, /* * A request subscription subscribe failed */ - ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE, + ARRSET_REQUEST_SUBSCRIBE_FAILURE, /* * A previously successful request subscription has ended. @@ -36,7 +36,7 @@ enum aws_rr_subscription_event_type { * * (1) failure to rejoin a session */ - ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED, + ARRSET_REQUEST_SUBSCRIPTION_ENDED, /* * A streaming subscription subscribe succeeded @@ -58,6 +58,11 @@ enum aws_rr_subscription_event_type { */ ARRSET_STREAMING_SUBSCRIPTION_HALTED, + /* + * A subscription has lost its last listener and can be purged + */ + ARRSET_SUBSCRIPTION_EMPTY, + /* * A subscription has been unsubscribed from * diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index a4c3819f..66264104 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -782,7 +782,7 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void goto done; } - if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE) { + if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE || event_task->type == ARRSET_SUBSCRIPTION_EMPTY) { s_mqtt_request_response_client_wake_service(event_task->rr_client); goto done; } @@ -796,9 +796,9 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void struct aws_mqtt_rr_client_operation *operation = element->value; switch (event_task->type) { - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS: - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE: - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED: + case ARRSET_REQUEST_SUBSCRIBE_SUCCESS: + case ARRSET_REQUEST_SUBSCRIBE_FAILURE: + case ARRSET_REQUEST_SUBSCRIPTION_ENDED: /* NYI */ break; diff --git a/source/request-response/subscription_manager.c b/source/request-response/subscription_manager.c index 84165693..ecf3a6db 100644 --- a/source/request-response/subscription_manager.c +++ b/source/request-response/subscription_manager.c @@ -252,13 +252,25 @@ static void s_remove_listener_from_subscription_record( aws_hash_table_remove(&record->listeners, &listener, NULL, NULL); + size_t listener_count = aws_hash_table_get_entry_count(&record->listeners); + AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, "request-response subscription manager - removed listener %" PRIu64 " from subscription ('" PRInSTR "'), %zu listeners left", operation_id, AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor), - aws_hash_table_get_entry_count(&record->listeners)); + listener_count); + + if (listener_count == 0) { + struct aws_rr_subscription_status_event event = { + .type = ARRSET_SUBSCRIPTION_EMPTY, + .topic_filter = record->topic_filter_cursor, + .operation_id = 0, + }; + + (*manager->config.subscription_status_callback)(&event, manager->config.userdata); + } } static void s_add_listener_to_subscription_record(struct aws_rr_subscription_record *record, uint64_t operation_id) { @@ -314,13 +326,13 @@ static void s_cull_unused_subscriptions(struct aws_rr_subscription_manager *mana static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscription_event_type type) { switch (type) { - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS: + case ARRSET_REQUEST_SUBSCRIBE_SUCCESS: return "RequestSubscribeSuccess"; - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE: + case ARRSET_REQUEST_SUBSCRIBE_FAILURE: return "RequestSubscribeFailure"; - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED: + case ARRSET_REQUEST_SUBSCRIPTION_ENDED: return "RequestSubscriptionEnded"; case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: @@ -334,6 +346,9 @@ static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscriptio case ARRSET_UNSUBSCRIBE_COMPLETE: return "UnsubscribeComplete"; + + case ARRSET_SUBSCRIPTION_EMPTY: + return "SubscriptionEmpty"; } return "Unknown"; @@ -343,9 +358,9 @@ static bool s_subscription_type_matches_event_type( enum aws_rr_subscription_type subscription_type, enum aws_rr_subscription_event_type event_type) { switch (event_type) { - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS: - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE: - case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED: + case ARRSET_REQUEST_SUBSCRIBE_SUCCESS: + case ARRSET_REQUEST_SUBSCRIBE_FAILURE: + case ARRSET_REQUEST_SUBSCRIPTION_ENDED: return subscription_type == ARRST_REQUEST_RESPONSE; case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: @@ -421,7 +436,7 @@ static int s_rr_activate_idle_subscription( aws_error_debug_str(error_code)); if (record->type == ARRST_REQUEST_RESPONSE) { - s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE); + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE); } else { record->poisoned = true; s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED); @@ -567,9 +582,9 @@ static void s_handle_protocol_adapter_request_subscription_event( if (event->error_code == AWS_ERROR_SUCCESS) { record->status = ARRSST_SUBSCRIBED; - s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS); + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_SUCCESS); } else { - s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE); + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE); } } else { AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE); @@ -677,7 +692,7 @@ static int s_apply_session_lost_wrapper(void *context, struct aws_hash_element * record->status = ARRSST_NOT_SUBSCRIBED; if (record->type == ARRST_REQUEST_RESPONSE) { - s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED); + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_ENDED); if (record->pending_action != ARRSPAT_UNSUBSCRIBING) { s_aws_rr_subscription_record_destroy(record); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 91c6e310..1ccd9bd6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -544,8 +544,8 @@ add_test_case(rrc_streaming_operation_first_subscribe_times_out_resub_succeeds) add_test_case(rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds) add_test_case(rrc_streaming_operation_subscribe_unretryable_failure) add_test_case(rrc_streaming_operation_failure_exceeds_subscription_budget) -#add_test_case(rrc_streaming_operation_success_delayed_by_request_operations) -#add_test_case(rrc_streaming_operation_success_sandwiched_by_request_operations) +add_test_case(rrc_streaming_operation_success_delayed_by_request_operations) +add_test_case(rrc_streaming_operation_success_sandwiched_by_request_operations) generate_test_driver(${PROJECT_NAME}-tests) diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index 1633437c..514dfca4 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -4,6 +4,7 @@ */ #include +#include #include #include #include @@ -46,9 +47,8 @@ struct aws_rr_client_fixture_request_response_record { struct aws_rr_client_test_fixture *fixture; - struct aws_byte_cursor payload_cursor; - - struct aws_byte_buf payload; + struct aws_byte_cursor record_key_cursor; + struct aws_byte_buf record_key; bool completed; int error_code; @@ -65,15 +65,15 @@ struct aws_rr_client_fixture_request_response_record *s_aws_rr_client_fixture_re record->allocator = allocator; record->fixture = fixture; - aws_byte_buf_init_copy_from_cursor(&record->payload, allocator, request_payload); - record->payload_cursor = aws_byte_cursor_from_buf(&record->payload); + aws_byte_buf_init_copy_from_cursor(&record->record_key, allocator, request_payload); + record->record_key_cursor = aws_byte_cursor_from_buf(&record->record_key); return record; } void s_aws_rr_client_fixture_request_response_record_delete( struct aws_rr_client_fixture_request_response_record *record) { - aws_byte_buf_clean_up(&record->payload); + aws_byte_buf_clean_up(&record->record_key); aws_byte_buf_clean_up(&record->response); aws_mem_release(record->allocator, record); @@ -111,11 +111,11 @@ static void s_rrc_fixture_request_completion_callback( static struct aws_rr_client_fixture_request_response_record *s_rrc_fixture_add_request_record( struct aws_rr_client_test_fixture *fixture, - struct aws_byte_cursor request_payload) { + struct aws_byte_cursor record_key) { struct aws_rr_client_fixture_request_response_record *record = - s_aws_rr_client_fixture_request_response_record_new(fixture->allocator, fixture, request_payload); + s_aws_rr_client_fixture_request_response_record_new(fixture->allocator, fixture, record_key); - aws_hash_table_put(&fixture->request_response_records, &record->payload_cursor, record, NULL); + aws_hash_table_put(&fixture->request_response_records, &record->record_key_cursor, record, NULL); return record; } @@ -140,9 +140,9 @@ static bool s_is_request_complete(void *context) { static void s_rrc_wait_on_request_completion( struct aws_rr_client_test_fixture *fixture, - struct aws_byte_cursor request_payload) { + struct aws_byte_cursor record_key) { struct rrc_operation_completion_context context = { - .key = request_payload, + .key = record_key, .fixture = fixture, }; @@ -153,13 +153,13 @@ static void s_rrc_wait_on_request_completion( static int s_rrc_verify_request_completion( struct aws_rr_client_test_fixture *fixture, - struct aws_byte_cursor request_payload, + struct aws_byte_cursor record_key, int expected_error_code, struct aws_byte_cursor *expected_response) { aws_mutex_lock(&fixture->lock); struct aws_hash_element *element = NULL; - aws_hash_table_find(&fixture->request_response_records, &request_payload, &element); + aws_hash_table_find(&fixture->request_response_records, &record_key, &element); AWS_FATAL_ASSERT(element != NULL && element->value != NULL); @@ -1966,3 +1966,219 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( AWS_TEST_CASE( rrc_streaming_operation_failure_exceeds_subscription_budget, s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn) + +static int s_submit_request_operation_from_prefix( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor record_key, + struct aws_byte_cursor prefix) { + char accepted_path[128]; + char rejected_path[128]; + char subscription_topic_filter[128]; + char publish_topic[128]; + + snprintf(accepted_path, AWS_ARRAY_SIZE(accepted_path), PRInSTR "/accepted", AWS_BYTE_CURSOR_PRI(prefix)); + snprintf(rejected_path, AWS_ARRAY_SIZE(rejected_path), PRInSTR "/rejected", AWS_BYTE_CURSOR_PRI(prefix)); + snprintf( + subscription_topic_filter, + AWS_ARRAY_SIZE(subscription_topic_filter), + PRInSTR "/+", + AWS_BYTE_CURSOR_PRI(prefix)); + snprintf(publish_topic, AWS_ARRAY_SIZE(publish_topic), PRInSTR "/get", AWS_BYTE_CURSOR_PRI(prefix)); + + char correlation_token[128]; + struct aws_byte_buf correlation_token_buf = + aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token)); + + struct aws_uuid uuid; + aws_uuid_init(&uuid); + aws_uuid_to_str(&uuid, &correlation_token_buf); + + struct aws_mqtt_request_operation_response_path response_paths[] = { + { + .topic = aws_byte_cursor_from_c_str(accepted_path), + .correlation_token_json_path = aws_byte_cursor_from_c_str("client_token"), + }, + { + .topic = aws_byte_cursor_from_c_str(rejected_path), + .correlation_token_json_path = aws_byte_cursor_from_c_str("client_token"), + }, + }; + + struct aws_rr_client_fixture_request_response_record *record = + s_rrc_fixture_add_request_record(fixture, record_key); + + struct aws_mqtt_request_operation_options request = { + .subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .response_paths = response_paths, + .response_path_count = AWS_ARRAY_SIZE(response_paths), + .publish_topic = aws_byte_cursor_from_c_str(publish_topic), + .serialized_request = aws_byte_cursor_from_c_str("{}"), + .correlation_token = aws_byte_cursor_from_buf(&correlation_token_buf), + .completion_callback = s_rrc_fixture_request_completion_callback, + .user_data = record, + }; + + return aws_mqtt_request_response_client_submit_request(fixture->rr_client, &request); +} + +/* + * Configure server to only respond to subscribes that match a streaming filter. Submit a couple of + * request-response operations ahead of a streaming operation. Verify they both time out and that the streaming + * operation successfully subscribes and receives publishes. + */ +static int s_rrc_streaming_operation_success_delayed_by_request_operations_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, &client_test_options, allocator, s_rrc_unsubscribe_success_config, NULL)); + + struct aws_byte_cursor request_key1 = aws_byte_cursor_from_c_str("requestkey1"); + struct aws_byte_cursor request_key2 = aws_byte_cursor_from_c_str("requestkey2"); + + ASSERT_SUCCESS(s_submit_request_operation_from_prefix(&fixture, request_key1, request_key1)); + ASSERT_SUCCESS(s_submit_request_operation_from_prefix(&fixture, request_key2, request_key2)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + s_rrc_wait_on_request_completion(&fixture, request_key1); + ASSERT_SUCCESS( + s_rrc_verify_request_completion(&fixture, request_key1, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, NULL)); + s_rrc_wait_on_request_completion(&fixture, request_key2); + ASSERT_SUCCESS( + s_rrc_verify_request_completion(&fixture, request_key2, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, NULL)); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrc_streaming_operation_success_delayed_by_request_operations, + s_rrc_streaming_operation_success_delayed_by_request_operations_fn) + +/* + * Variant of previous test where we sandwich the streaming operation by multiple request response operations and + * verify all request-response operations fail with a timeout. + */ +static int s_rrc_streaming_operation_success_sandwiched_by_request_operations_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, &client_test_options, allocator, s_rrc_unsubscribe_success_config, NULL)); + + struct aws_byte_cursor request_key1 = aws_byte_cursor_from_c_str("requestkey1"); + struct aws_byte_cursor request_key2 = aws_byte_cursor_from_c_str("requestkey2"); + struct aws_byte_cursor request_key3 = aws_byte_cursor_from_c_str("requestkey3"); + struct aws_byte_cursor request_key4 = aws_byte_cursor_from_c_str("requestkey4"); + + ASSERT_SUCCESS(s_submit_request_operation_from_prefix(&fixture, request_key1, request_key1)); + ASSERT_SUCCESS(s_submit_request_operation_from_prefix(&fixture, request_key2, request_key2)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + ASSERT_SUCCESS(s_submit_request_operation_from_prefix(&fixture, request_key3, request_key3)); + ASSERT_SUCCESS(s_submit_request_operation_from_prefix(&fixture, request_key4, request_key4)); + + s_rrc_wait_on_request_completion(&fixture, request_key1); + ASSERT_SUCCESS( + s_rrc_verify_request_completion(&fixture, request_key1, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, NULL)); + s_rrc_wait_on_request_completion(&fixture, request_key2); + ASSERT_SUCCESS( + s_rrc_verify_request_completion(&fixture, request_key2, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, NULL)); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); + + s_rrc_wait_on_request_completion(&fixture, request_key3); + ASSERT_SUCCESS( + s_rrc_verify_request_completion(&fixture, request_key3, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, NULL)); + s_rrc_wait_on_request_completion(&fixture, request_key4); + ASSERT_SUCCESS( + s_rrc_verify_request_completion(&fixture, request_key4, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, NULL)); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrc_streaming_operation_success_sandwiched_by_request_operations, + s_rrc_streaming_operation_success_sandwiched_by_request_operations_fn) + +/* +#add_test_case() +#add_test_case(rrc_streaming_operation_success_sandwiched_by_request_operations) + */ \ No newline at end of file diff --git a/tests/request-response/subscription_manager_tests.c b/tests/request-response/subscription_manager_tests.c index 4abd726c..8684d3a4 100644 --- a/tests/request-response/subscription_manager_tests.c +++ b/tests/request-response/subscription_manager_tests.c @@ -575,7 +575,7 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_subscription_status_record expected_subscription_events[] = { { - .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), .operation_id = 1, }, @@ -907,12 +907,12 @@ static int s_rrsm_release_unsubscribes_request_fn(struct aws_allocator *allocato // verify two success callbacks struct aws_subscription_status_record expected_subscription_events[] = { { - .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }, { - .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 2, }}; @@ -1283,7 +1283,7 @@ static int s_rrsm_acquire_request_subscribe_failure_event_fn(struct aws_allocato // verify subscribe failure event emission struct aws_subscription_status_record expected_subscription_event = { - .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE, + .type = ARRSET_REQUEST_SUBSCRIBE_FAILURE, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1360,9 +1360,9 @@ static enum aws_rr_subscription_event_type s_compute_expected_subscription_event bool success) { if (subscription_type == ARRST_REQUEST_RESPONSE) { if (success) { - return ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS; + return ARRSET_REQUEST_SUBSCRIBE_SUCCESS; } else { - return ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE; + return ARRSET_REQUEST_SUBSCRIBE_FAILURE; } } else { if (success) { @@ -1602,7 +1602,7 @@ static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { - .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS + .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIBE_SUCCESS : ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, @@ -1714,7 +1714,7 @@ static int s_do_rrsm_acquire_clean_up_test( aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { - .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS + .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIBE_SUCCESS : ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, @@ -1871,7 +1871,7 @@ static int s_rrsm_do_no_session_subscription_ended_test( aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { - .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1921,7 +1921,7 @@ static int s_rrsm_do_no_session_subscription_ended_test( // verify subscription lost emitted if (!offline_while_unsubscribing) { struct aws_subscription_status_record expected_subscription_ended_event = { - .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED, + .type = ARRSET_REQUEST_SUBSCRIPTION_ENDED, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, };