Skip to content

Commit

Permalink
Finish streaming tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Mar 30, 2024
1 parent f7fd38f commit 5b57a82
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 43 deletions.
11 changes: 8 additions & 3 deletions include/aws/mqtt/private/request-response/subscription_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ enum aws_rr_subscription_event_type {
/*
* A request subscription subscribe succeeded
*/
ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS,
ARRSET_REQUEST_SUBSCRIBE_SUCCESS,

/*
* A request subscription subscribe failed
*/
ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE,
ARRSET_REQUEST_SUBSCRIBE_FAILURE,

/*
* A previously successful request subscription has ended.
Expand All @@ -36,7 +36,7 @@ enum aws_rr_subscription_event_type {
*
* (1) failure to rejoin a session
*/
ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED,
ARRSET_REQUEST_SUBSCRIPTION_ENDED,

/*
* A streaming subscription subscribe succeeded
Expand All @@ -58,6 +58,11 @@ enum aws_rr_subscription_event_type {
*/
ARRSET_STREAMING_SUBSCRIPTION_HALTED,

/*
* A subscription has lost its last listener and can be purged
*/
ARRSET_SUBSCRIPTION_EMPTY,

/*
* A subscription has been unsubscribed from
*
Expand Down
8 changes: 4 additions & 4 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void
goto done;
}

if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE) {
if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE || event_task->type == ARRSET_SUBSCRIPTION_EMPTY) {
s_mqtt_request_response_client_wake_service(event_task->rr_client);
goto done;
}
Expand All @@ -796,9 +796,9 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void
struct aws_mqtt_rr_client_operation *operation = element->value;

switch (event_task->type) {
case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS:
case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE:
case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED:
case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
/* NYI */
break;

Expand Down
37 changes: 26 additions & 11 deletions source/request-response/subscription_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,25 @@ static void s_remove_listener_from_subscription_record(

aws_hash_table_remove(&record->listeners, &listener, NULL, NULL);

size_t listener_count = aws_hash_table_get_entry_count(&record->listeners);

AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"request-response subscription manager - removed listener %" PRIu64 " from subscription ('" PRInSTR
"'), %zu listeners left",
operation_id,
AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor),
aws_hash_table_get_entry_count(&record->listeners));
listener_count);

if (listener_count == 0) {
struct aws_rr_subscription_status_event event = {
.type = ARRSET_SUBSCRIPTION_EMPTY,
.topic_filter = record->topic_filter_cursor,
.operation_id = 0,
};

(*manager->config.subscription_status_callback)(&event, manager->config.userdata);
}
}

static void s_add_listener_to_subscription_record(struct aws_rr_subscription_record *record, uint64_t operation_id) {
Expand Down Expand Up @@ -314,13 +326,13 @@ static void s_cull_unused_subscriptions(struct aws_rr_subscription_manager *mana

static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscription_event_type type) {
switch (type) {
case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS:
case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
return "RequestSubscribeSuccess";

case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE:
case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
return "RequestSubscribeFailure";

case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED:
case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
return "RequestSubscriptionEnded";

case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
Expand All @@ -334,6 +346,9 @@ static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscriptio

case ARRSET_UNSUBSCRIBE_COMPLETE:
return "UnsubscribeComplete";

case ARRSET_SUBSCRIPTION_EMPTY:
return "SubscriptionEmpty";
}

return "Unknown";
Expand All @@ -343,9 +358,9 @@ static bool s_subscription_type_matches_event_type(
enum aws_rr_subscription_type subscription_type,
enum aws_rr_subscription_event_type event_type) {
switch (event_type) {
case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS:
case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE:
case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED:
case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
return subscription_type == ARRST_REQUEST_RESPONSE;

case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
Expand Down Expand Up @@ -421,7 +436,7 @@ static int s_rr_activate_idle_subscription(
aws_error_debug_str(error_code));

if (record->type == ARRST_REQUEST_RESPONSE) {
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE);
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE);
} else {
record->poisoned = true;
s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED);
Expand Down Expand Up @@ -567,9 +582,9 @@ static void s_handle_protocol_adapter_request_subscription_event(

if (event->error_code == AWS_ERROR_SUCCESS) {
record->status = ARRSST_SUBSCRIBED;
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS);
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_SUCCESS);
} else {
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE);
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIBE_FAILURE);
}
} else {
AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE);
Expand Down Expand Up @@ -677,7 +692,7 @@ static int s_apply_session_lost_wrapper(void *context, struct aws_hash_element *
record->status = ARRSST_NOT_SUBSCRIBED;

if (record->type == ARRST_REQUEST_RESPONSE) {
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED);
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_ENDED);

if (record->pending_action != ARRSPAT_UNSUBSCRIBING) {
s_aws_rr_subscription_record_destroy(record);
Expand Down
4 changes: 2 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,8 @@ add_test_case(rrc_streaming_operation_first_subscribe_times_out_resub_succeeds)
add_test_case(rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds)
add_test_case(rrc_streaming_operation_subscribe_unretryable_failure)
add_test_case(rrc_streaming_operation_failure_exceeds_subscription_budget)
#add_test_case(rrc_streaming_operation_success_delayed_by_request_operations)
#add_test_case(rrc_streaming_operation_success_sandwiched_by_request_operations)
add_test_case(rrc_streaming_operation_success_delayed_by_request_operations)
add_test_case(rrc_streaming_operation_success_sandwiched_by_request_operations)


generate_test_driver(${PROJECT_NAME}-tests)
Expand Down
Loading

0 comments on commit 5b57a82

Please sign in to comment.