diff --git a/include/aws/mqtt/private/mqtt311_listener.h b/include/aws/mqtt/private/mqtt311_listener.h index 1d5febee..e1e5162e 100644 --- a/include/aws/mqtt/private/mqtt311_listener.h +++ b/include/aws/mqtt/private/mqtt311_listener.h @@ -33,7 +33,7 @@ struct aws_mqtt311_callback_set { aws_mqtt_client_publish_received_fn *publish_received_handler; /* Called from s_packet_handler_connack which is event-loop invoked */ - aws_mqtt_client_on_connection_resumed_fn *connection_resumed_handler; + aws_mqtt_client_on_connection_success_fn *connection_success_handler; void *user_data; }; @@ -162,7 +162,7 @@ void aws_mqtt311_callback_set_manager_on_publish_received( bool retain); AWS_MQTT_API -void aws_mqtt311_callback_set_manager_on_connection_resumed( +void aws_mqtt311_callback_set_manager_on_connection_success( struct aws_mqtt311_callback_set_manager *manager, enum aws_mqtt_connect_return_code return_code, bool rejoined_session); diff --git a/source/client_channel_handler.c b/source/client_channel_handler.c index 198d3fe3..b95a43fc 100644 --- a/source/client_channel_handler.c +++ b/source/client_channel_handler.c @@ -240,9 +240,6 @@ static int s_packet_handler_connack(struct aws_byte_cursor message_cursor, void (void *)connection); MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_resumed, connack.connect_return_code, connack.session_present); - - aws_mqtt311_callback_set_manager_on_connection_resumed( - &connection->callback_manager, connack.connect_return_code, connack.session_present); } else { aws_create_reconnect_task(connection); @@ -266,6 +263,9 @@ static int s_packet_handler_connack(struct aws_byte_cursor message_cursor, void MQTT_CLIENT_CALL_CALLBACK_ARGS( connection, on_connection_success, connack.connect_return_code, connack.session_present); + aws_mqtt311_callback_set_manager_on_connection_success( + &connection->callback_manager, connack.connect_return_code, connack.session_present); + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: connection callback completed", (void *)connection); s_update_next_ping_time(connection); diff --git a/source/mqtt311_listener.c b/source/mqtt311_listener.c index 93eb5d17..4e827152 100644 --- a/source/mqtt311_listener.c +++ b/source/mqtt311_listener.c @@ -266,7 +266,7 @@ void aws_mqtt311_callback_set_manager_on_publish_received( } } -void aws_mqtt311_callback_set_manager_on_connection_resumed( +void aws_mqtt311_callback_set_manager_on_connection_success( struct aws_mqtt311_callback_set_manager *manager, enum aws_mqtt_connect_return_code return_code, bool rejoined_session) { @@ -281,8 +281,8 @@ void aws_mqtt311_callback_set_manager_on_connection_resumed( node = aws_linked_list_next(node); struct aws_mqtt311_callback_set *callback_set = &entry->callbacks; - if (callback_set->connection_resumed_handler != NULL) { - (*callback_set->connection_resumed_handler)( + if (callback_set->connection_success_handler != NULL) { + (*callback_set->connection_success_handler)( manager->connection, return_code, rejoined_session, callback_set->user_data); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6e656d92..e52138a6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -459,8 +459,9 @@ add_test_case(request_response_mqtt5_protocol_adapter_session_event_rejoin) add_test_case(request_response_mqtt5_protocol_adapter_incoming_publish) add_test_case(request_response_mqtt5_protocol_adapter_shutdown_while_pending) -add_test_case(mqtt311_listener_session_events) -#add_test_case(mqtt311_listener_publish_events) +add_test_case(mqtt311_listener_connection_success_event_no_session) +add_test_case(mqtt311_listener_connection_success_event_with_session) +add_test_case(mqtt311_listener_publish_event) generate_test_driver(${PROJECT_NAME}-tests) diff --git a/tests/v3/mqtt311_listener_test.c b/tests/v3/mqtt311_listener_test.c index 3491de5a..57b54c25 100644 --- a/tests/v3/mqtt311_listener_test.c +++ b/tests/v3/mqtt311_listener_test.c @@ -1,7 +1,7 @@ /** -* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -* SPDX-License-Identifier: Apache-2.0. -*/ + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ #include @@ -12,8 +12,8 @@ #include -struct mqtt311_listener_resumption_record { - bool rejoined_session; +struct mqtt311_listener_connection_success_record { + bool joined_session; }; struct mqtt311_listener_publish_record { @@ -29,7 +29,7 @@ struct mqtt311_listener_test_context { struct mqtt_connection_state_test *mqtt311_test_context; int mqtt311_test_context_setup_result; - struct aws_array_list resumption_events; + struct aws_array_list connection_success_events; struct aws_array_list publish_events; bool terminated; @@ -45,43 +45,78 @@ static void s_311_listener_test_on_publish_received( enum aws_mqtt_qos qos, bool retain, void *userdata) { + (void)dup; + (void)qos; + (void)retain; + struct mqtt311_listener_test_context *context = userdata; + + struct mqtt311_listener_publish_record publish_record; + AWS_ZERO_STRUCT(publish_record); + + aws_byte_buf_init_copy_from_cursor(&publish_record.topic, context->allocator, *topic); + aws_byte_buf_init_copy_from_cursor(&publish_record.payload, context->allocator, *payload); + + aws_mutex_lock(&context->lock); + aws_array_list_push_back(&context->publish_events, &publish_record); + aws_mutex_unlock(&context->lock); + aws_condition_variable_notify_all(&context->signal); } -static void s_311_listener_test_on_connection_resumed( +static void s_311_listener_test_on_connection_success( struct aws_mqtt_client_connection *connection, enum aws_mqtt_connect_return_code return_code, bool session_present, void *userdata) { + (void)return_code; + + struct mqtt311_listener_test_context *context = userdata; + + struct mqtt311_listener_connection_success_record connection_success_record = { + .joined_session = session_present, + }; + aws_mutex_lock(&context->lock); + aws_array_list_push_back(&context->connection_success_events, &connection_success_record); + aws_mutex_unlock(&context->lock); + aws_condition_variable_notify_all(&context->signal); } static void s_311_listener_test_on_termination(void *complete_ctx) { + struct mqtt311_listener_test_context *context = complete_ctx; + aws_mutex_lock(&context->lock); + context->terminated = true; + aws_mutex_unlock(&context->lock); + aws_condition_variable_notify_all(&context->signal); } -static int mqtt311_listener_test_context_init(struct mqtt311_listener_test_context *context, struct aws_allocator *allocator, struct mqtt_connection_state_test *mqtt311_test_context) { +static int mqtt311_listener_test_context_init( + struct mqtt311_listener_test_context *context, + struct aws_allocator *allocator, + struct mqtt_connection_state_test *mqtt311_test_context) { AWS_ZERO_STRUCT(*context); context->allocator = allocator; context->mqtt311_test_context = mqtt311_test_context; - aws_array_list_init_dynamic(&context->resumption_events, allocator, 10, sizeof(struct mqtt311_listener_resumption_record)); - aws_array_list_init_dynamic(&context->publish_events, allocator, 10, sizeof(struct mqtt311_listener_publish_record)); + aws_array_list_init_dynamic( + &context->connection_success_events, allocator, 10, sizeof(struct mqtt311_listener_connection_success_record)); + aws_array_list_init_dynamic( + &context->publish_events, allocator, 10, sizeof(struct mqtt311_listener_publish_record)); aws_mutex_init(&context->lock); aws_condition_variable_init(&context->signal); - context->mqtt311_test_context_setup_result = aws_test311_setup_mqtt_server_fn(allocator, &mqtt311_test_context); + context->mqtt311_test_context_setup_result = aws_test311_setup_mqtt_server_fn(allocator, mqtt311_test_context); ASSERT_SUCCESS(context->mqtt311_test_context_setup_result); struct aws_mqtt311_listener_config listener_config = { .connection = mqtt311_test_context->mqtt_connection, - .listener_callbacks = { - .publish_received_handler = s_311_listener_test_on_publish_received, - .connection_resumed_handler = s_311_listener_test_on_connection_resumed, - .user_data = context - }, + .listener_callbacks = + {.publish_received_handler = s_311_listener_test_on_publish_received, + .connection_success_handler = s_311_listener_test_on_connection_success, + .user_data = context}, .termination_callback = s_311_listener_test_on_termination, .termination_callback_user_data = context, }; @@ -91,21 +126,29 @@ static int mqtt311_listener_test_context_init(struct mqtt311_listener_test_conte return AWS_OP_SUCCESS; } -static void s_wait_for_listener_termination_callback(struct mqtt311_listener_test_context *context) { +static bool s_is_listener_terminated(void *userdata) { + struct mqtt311_listener_test_context *context = userdata; + return context->terminated; } -static void mqtt311_listener_test_context_clean_up(struct mqtt311_listener_test_context *context) { +static void s_wait_for_listener_termination_callback(struct mqtt311_listener_test_context *context) { + aws_mutex_lock(&context->lock); + aws_condition_variable_wait_pred(&context->signal, &context->lock, s_is_listener_terminated, context); + aws_mutex_unlock(&context->lock); +} +static void mqtt311_listener_test_context_clean_up(struct mqtt311_listener_test_context *context) { aws_mqtt311_listener_release(context->listener); s_wait_for_listener_termination_callback(context); - aws_test311_clean_up_mqtt_server_fn(context->allocator, context->mqtt311_test_context_setup_result, context->mqtt311_test_context); + aws_test311_clean_up_mqtt_server_fn( + context->allocator, context->mqtt311_test_context_setup_result, context->mqtt311_test_context); aws_mutex_clean_up(&context->lock); aws_condition_variable_clean_up(&context->signal); - aws_array_list_clean_up(&context->resumption_events); + aws_array_list_clean_up(&context->connection_success_events); for (size_t i = 0; i < aws_array_list_length(&context->publish_events); ++i) { struct mqtt311_listener_publish_record publish_record; @@ -120,7 +163,160 @@ static void mqtt311_listener_test_context_clean_up(struct mqtt311_listener_test_ aws_array_list_clean_up(&context->publish_events); } -static int s_mqtt311_listener_session_events_fn(struct aws_allocator *allocator, void *ctx) { +struct connection_success_event_test_context { + struct mqtt311_listener_test_context *context; + bool joined_session; + size_t expected_count; +}; + +static bool s_contains_connection_success_events(void *userdata) { + struct connection_success_event_test_context *wait_context = userdata; + struct mqtt311_listener_test_context *context = wait_context->context; + + size_t found = 0; + for (size_t i = 0; i < aws_array_list_length(&context->connection_success_events); ++i) { + struct mqtt311_listener_connection_success_record record; + aws_array_list_get_at(&context->connection_success_events, &record, i); + + if (record.joined_session == wait_context->joined_session) { + ++found; + } + } + + return found >= wait_context->expected_count; +} + +static void s_wait_for_connection_success_events( + struct mqtt311_listener_test_context *context, + bool joined_session, + size_t expected_count) { + struct connection_success_event_test_context wait_context = { + .context = context, + .joined_session = joined_session, + .expected_count = expected_count, + }; + + aws_mutex_lock(&context->lock); + aws_condition_variable_wait_pred( + &context->signal, &context->lock, s_contains_connection_success_events, &wait_context); + aws_mutex_unlock(&context->lock); +} + +static int s_do_mqtt311_listener_connection_success_event_test(struct aws_allocator *allocator, bool session_present) { + aws_mqtt_library_init(allocator); + + struct mqtt_connection_state_test mqtt311_context; + AWS_ZERO_STRUCT(mqtt311_context); + + struct mqtt311_listener_test_context test_context; + ASSERT_SUCCESS(mqtt311_listener_test_context_init(&test_context, allocator, &mqtt311_context)); + + mqtt_mock_server_set_session_present(mqtt311_context.mock_server, false); + + struct aws_mqtt_connection_options connection_options = { + .user_data = &mqtt311_context, + .clean_session = true, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(mqtt311_context.endpoint.address), + .socket_options = &mqtt311_context.socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(mqtt311_context.mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(&mqtt311_context); + + s_wait_for_connection_success_events(&test_context, false, 1); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + mqtt311_context.mqtt_connection, aws_test311_on_disconnect_fn, &mqtt311_context)); + aws_test311_wait_for_disconnect_to_complete(&mqtt311_context); + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(mqtt311_context.mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(&mqtt311_context); + + s_wait_for_connection_success_events(&test_context, false, 2); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + mqtt311_context.mqtt_connection, aws_test311_on_disconnect_fn, &mqtt311_context)); + aws_test311_wait_for_disconnect_to_complete(&mqtt311_context); + + mqtt311_listener_test_context_clean_up(&test_context); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +static int s_mqtt311_listener_connection_success_event_no_session_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_do_mqtt311_listener_connection_success_event_test(allocator, false); +} + +AWS_TEST_CASE( + mqtt311_listener_connection_success_event_no_session, + s_mqtt311_listener_connection_success_event_no_session_fn) + +static int s_mqtt311_listener_connection_success_event_with_session_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_do_mqtt311_listener_connection_success_event_test(allocator, true); +} + +AWS_TEST_CASE( + mqtt311_listener_connection_success_event_with_session, + s_mqtt311_listener_connection_success_event_with_session_fn) + +struct publish_event_test_context { + struct mqtt311_listener_test_context *context; + struct aws_byte_cursor expected_topic; + struct aws_byte_cursor expected_payload; + size_t expected_count; +}; + +static bool s_contains_publish_events(void *userdata) { + struct publish_event_test_context *wait_context = userdata; + struct mqtt311_listener_test_context *context = wait_context->context; + + size_t found = 0; + for (size_t i = 0; i < aws_array_list_length(&context->publish_events); ++i) { + struct mqtt311_listener_publish_record record; + aws_array_list_get_at(&context->publish_events, &record, i); + + struct aws_byte_cursor actual_topic = aws_byte_cursor_from_buf(&record.topic); + if (!aws_byte_cursor_eq(&wait_context->expected_topic, &actual_topic)) { + continue; + } + + struct aws_byte_cursor actual_payload = aws_byte_cursor_from_buf(&record.payload); + if (!aws_byte_cursor_eq(&wait_context->expected_payload, &actual_payload)) { + continue; + } + + ++found; + } + + return found >= wait_context->expected_count; +} + +static void s_wait_for_publish_events( + struct mqtt311_listener_test_context *context, + struct aws_byte_cursor topic, + struct aws_byte_cursor payload, + size_t expected_count) { + struct publish_event_test_context wait_context = { + .context = context, + .expected_topic = topic, + .expected_payload = payload, + .expected_count = expected_count, + }; + + aws_mutex_lock(&context->lock); + aws_condition_variable_wait_pred(&context->signal, &context->lock, s_contains_publish_events, &wait_context); + aws_mutex_unlock(&context->lock); +} + +static int s_mqtt311_listener_publish_event_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; aws_mqtt_library_init(allocator); @@ -131,9 +327,11 @@ static int s_mqtt311_listener_session_events_fn(struct aws_allocator *allocator, struct mqtt311_listener_test_context test_context; ASSERT_SUCCESS(mqtt311_listener_test_context_init(&test_context, allocator, &mqtt311_context)); + mqtt_mock_server_set_publish_reflection(mqtt311_context.mock_server, true); + struct aws_mqtt_connection_options connection_options = { .user_data = &mqtt311_context, - .clean_session = false, + .clean_session = true, .client_id = aws_byte_cursor_from_c_str("client1234"), .host_name = aws_byte_cursor_from_c_str(mqtt311_context.endpoint.address), .socket_options = &mqtt311_context.socket_options, @@ -143,6 +341,22 @@ static int s_mqtt311_listener_session_events_fn(struct aws_allocator *allocator, ASSERT_SUCCESS(aws_mqtt_client_connection_connect(mqtt311_context.mqtt_connection, &connection_options)); aws_test311_wait_for_connection_to_complete(&mqtt311_context); + struct aws_byte_cursor topic1 = aws_byte_cursor_from_c_str("hello/world/1"); + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("payload1"); + aws_mqtt_client_connection_publish( + mqtt311_context.mqtt_connection, &topic1, AWS_MQTT_QOS_AT_LEAST_ONCE, false, &payload1, NULL, NULL); + + s_wait_for_publish_events(&test_context, topic1, payload1, 1); + + struct aws_byte_cursor topic2 = aws_byte_cursor_from_c_str("nothing/important"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("somethingneeddoing?"); + aws_mqtt_client_connection_publish( + mqtt311_context.mqtt_connection, &topic2, AWS_MQTT_QOS_AT_LEAST_ONCE, false, &payload2, NULL, NULL); + aws_mqtt_client_connection_publish( + mqtt311_context.mqtt_connection, &topic2, AWS_MQTT_QOS_AT_LEAST_ONCE, false, &payload2, NULL, NULL); + + s_wait_for_publish_events(&test_context, topic2, payload2, 2); + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( mqtt311_context.mqtt_connection, aws_test311_on_disconnect_fn, &mqtt311_context)); aws_test311_wait_for_disconnect_to_complete(&mqtt311_context); @@ -154,4 +368,4 @@ static int s_mqtt311_listener_session_events_fn(struct aws_allocator *allocator, return AWS_OP_SUCCESS; } -AWS_TEST_CASE(mqtt311_listener_session_events, s_mqtt311_listener_session_events_fn) \ No newline at end of file +AWS_TEST_CASE(mqtt311_listener_publish_event, s_mqtt311_listener_publish_event_fn) \ No newline at end of file diff --git a/tests/v3/mqtt_mock_server_handler.c b/tests/v3/mqtt_mock_server_handler.c index 73576a7d..0e770690 100644 --- a/tests/v3/mqtt_mock_server_handler.c +++ b/tests/v3/mqtt_mock_server_handler.c @@ -30,6 +30,8 @@ struct mqtt_mock_server_handler { size_t pubacks_received; size_t ping_received; size_t connacks_avail; + bool session_present; + bool reflect_publishes; bool auto_ack; /* last ID used when sending PUBLISH (QoS1+) to client */ @@ -77,12 +79,14 @@ static int s_mqtt_mock_server_handler_process_packet( switch (packet_type) { case AWS_MQTT_PACKET_CONNECT: { size_t connacks_available = 0; + bool session_present = false; aws_mutex_lock(&server->synced.lock); AWS_LOGF_DEBUG( MOCK_LOG_SUBJECT, "server, CONNECT received, %llu available connacks.", (long long unsigned)server->synced.connacks_avail); connacks_available = server->synced.connacks_avail > 0 ? server->synced.connacks_avail-- : 0; + session_present = server->synced.session_present; aws_mutex_unlock(&server->synced.lock); if (connacks_available) { @@ -90,7 +94,7 @@ static int s_mqtt_mock_server_handler_process_packet( aws_channel_acquire_message_from_pool(server->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, 256); struct aws_mqtt_packet_connack conn_ack; - err |= aws_mqtt_packet_connack_init(&conn_ack, false, AWS_MQTT_CONNECT_ACCEPTED); + err |= aws_mqtt_packet_connack_init(&conn_ack, session_present, AWS_MQTT_CONNECT_ACCEPTED); err |= aws_mqtt_packet_connack_encode(&connack_msg->message_data, &conn_ack); if (aws_channel_slot_send_message(server->slot, connack_msg, AWS_CHANNEL_DIR_WRITE)) { err |= 1; @@ -185,6 +189,7 @@ static int s_mqtt_mock_server_handler_process_packet( aws_mutex_lock(&server->synced.lock); bool auto_ack = server->synced.auto_ack; + bool reflect_publishes = server->synced.reflect_publishes; aws_mutex_unlock(&server->synced.lock); uint8_t qos = (publish_packet.fixed_header.flags >> 1) & 0x3; @@ -197,6 +202,23 @@ static int s_mqtt_mock_server_handler_process_packet( err |= aws_mqtt_packet_ack_encode(&puback_msg->message_data, &puback); err |= aws_channel_slot_send_message(server->slot, puback_msg, AWS_CHANNEL_DIR_WRITE); } + + if (reflect_publishes) { + struct aws_io_message *publish_msg = + aws_channel_acquire_message_from_pool(server->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, 1024); + struct aws_mqtt_packet_publish publish; + // reusing the packet identifier here is weird but reasonably safe, they're separate id spaces + err |= aws_mqtt_packet_publish_init( + &publish, + false, + qos, + false, + publish_packet.topic_name, + publish_packet.packet_identifier, + publish_packet.payload); + err |= aws_mqtt_packet_publish_encode(&publish_msg->message_data, &publish); + err |= aws_channel_slot_send_message(server->slot, publish_msg, AWS_CHANNEL_DIR_WRITE); + } break; } @@ -466,6 +488,22 @@ void destroy_mqtt_mock_server(struct aws_channel_handler *handler) { aws_mem_release(handler->alloc, server); } +void mqtt_mock_server_set_session_present(struct aws_channel_handler *handler, bool session_present) { + struct mqtt_mock_server_handler *server = handler->impl; + + aws_mutex_lock(&server->synced.lock); + server->synced.session_present = session_present; + aws_mutex_unlock(&server->synced.lock); +} + +void mqtt_mock_server_set_publish_reflection(struct aws_channel_handler *handler, bool reflect_publishes) { + struct mqtt_mock_server_handler *server = handler->impl; + + aws_mutex_lock(&server->synced.lock); + server->synced.reflect_publishes = reflect_publishes; + aws_mutex_unlock(&server->synced.lock); +} + void mqtt_mock_server_set_max_ping_resp(struct aws_channel_handler *handler, size_t max_ping) { struct mqtt_mock_server_handler *server = handler->impl; diff --git a/tests/v3/mqtt_mock_server_handler.h b/tests/v3/mqtt_mock_server_handler.h index 170201e6..8187ae11 100644 --- a/tests/v3/mqtt_mock_server_handler.h +++ b/tests/v3/mqtt_mock_server_handler.h @@ -68,10 +68,21 @@ int mqtt_mock_server_send_publish_by_id( enum aws_mqtt_qos qos, bool retain); +/** + * Sets whether or not connacks return session present + */ +void mqtt_mock_server_set_session_present(struct aws_channel_handler *handler, bool session_present); + +/** + * Sets whether or not connacks return session present + */ +void mqtt_mock_server_set_publish_reflection(struct aws_channel_handler *handler, bool reflect_publishes); + /** * Set max number of PINGRESP that mock server will send back to client */ void mqtt_mock_server_set_max_ping_resp(struct aws_channel_handler *handler, size_t max_ping); + /** * Set max number of CONACK that mock server will send back to client */