diff --git a/source/client.c b/source/client.c index d8cbe85e..fb436249 100644 --- a/source/client.c +++ b/source/client.c @@ -148,7 +148,7 @@ static struct request_timeout_task_arg *s_schedule_timeout_task( aws_mem_release(connection->allocator, timeout_task_arg); return NULL; } - timestamp = aws_add_u64_saturating(timestamp, connection->operation_timeout_ns); + timestamp = aws_add_u64_saturating(timestamp, timeout_duration_in_ns); aws_channel_schedule_task_future(connection->slot->channel, request_timeout_task, timestamp); return timeout_task_arg; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 658da749..018ad5d0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -78,8 +78,11 @@ add_test_case(mqtt_clean_session_not_retry) add_test_case(mqtt_clean_session_discard_previous) add_test_case(mqtt_clean_session_keep_next_session) add_test_case(mqtt_connection_publish_QoS1_timeout) +add_test_case(mqtt_connection_publish_QoS1_timeout_override) add_test_case(mqtt_connection_unsubscribe_timeout) +add_test_case(mqtt_connection_unsubscribe_timeout_override) add_test_case(mqtt_connection_subscribe_single_timeout) +add_test_case(mqtt_connection_subscribe_single_timeout_override) add_test_case(mqtt_connection_subscribe_multi_timeout) add_test_case(mqtt_connection_resubscribe_timeout) add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time) diff --git a/tests/v3/connection_state_test.c b/tests/v3/connection_state_test.c index 149d8c25..6447bb7b 100644 --- a/tests/v3/connection_state_test.c +++ b/tests/v3/connection_state_test.c @@ -9,6 +9,7 @@ #include "mqtt_mock_server_handler.h" #include +#include #include static struct mqtt_connection_state_test test_data = {0}; @@ -2376,6 +2377,66 @@ AWS_TEST_CASE_FIXTURE( s_clean_up_mqtt_server_fn, &test_data) +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the publish failed. + */ +static int s_test_mqtt_connection_publish_QoS1_timeout_override_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + /* make a publish with QoS 1 immediate. */ + aws_mutex_lock(&state_test_data->lock); + state_test_data->expected_ops_completed = 1; + aws_mutex_unlock(&state_test_data->lock); + uint16_t packet_id_1 = aws_mqtt_client_connection_311_publish( + state_test_data->mqtt_connection->impl, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, + &payload_1, + aws_test311_on_op_complete, + state_test_data, + aws_timestamp_convert(3, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + ASSERT_TRUE(packet_id_1 > 0); + + /* publish should complete after the shutdown */ + aws_test311_wait_for_ops_completed(state_test_data); + /* Check the publish has been completed with timeout error */ + ASSERT_UINT_EQUALS(state_test_data->op_complete_error, AWS_ERROR_MQTT_TIMEOUT); + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_publish_QoS1_timeout_override, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_publish_QoS1_timeout_override_fn, + s_clean_up_mqtt_server_fn, + &test_data) + /** * Test that connection is healthy, user set the timeout for request, and timeout happens and the unsubscribe failed. */ @@ -2430,6 +2491,63 @@ AWS_TEST_CASE_FIXTURE( s_clean_up_mqtt_server_fn, &test_data) +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the unsubscribe failed. + */ +static int s_test_mqtt_connection_unsubscribe_timeout_override_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + aws_mutex_lock(&state_test_data->lock); + state_test_data->expected_ops_completed = 1; + aws_mutex_unlock(&state_test_data->lock); + + /* unsubscribe to the first topic */ + uint16_t unsub_packet_id = aws_mqtt_client_connection_311_unsubscribe( + state_test_data->mqtt_connection->impl, + &pub_topic, + aws_test311_on_op_complete, + state_test_data, + aws_timestamp_convert(3, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + ASSERT_TRUE(unsub_packet_id > 0); + + /* unsubscribe should complete after the timeout */ + aws_test311_wait_for_ops_completed(state_test_data); + /* Check that the unsubscribe has been completed with a timeout error */ + ASSERT_UINT_EQUALS(state_test_data->op_complete_error, AWS_ERROR_MQTT_TIMEOUT); + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_unsubscribe_timeout_override, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_unsubscribe_timeout_override_fn, + s_clean_up_mqtt_server_fn, + &test_data) + /** * Test that connection is healthy, user set the timeout for request, and timeout happens and the subscribe failed. */ @@ -2487,6 +2605,63 @@ AWS_TEST_CASE_FIXTURE( s_clean_up_mqtt_server_fn, &test_data) +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the subscribe failed. + */ +static int s_test_mqtt_connection_subscribe_single_timeout_override_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + /* subscribe */ + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + uint16_t sub_packet_id = aws_mqtt_client_connection_311_subscribe( + state_test_data->mqtt_connection->impl, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + aws_test311_on_publish_received, + state_test_data, + NULL, + aws_test311_on_suback, + state_test_data, + aws_timestamp_convert(3, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + ASSERT_TRUE(sub_packet_id > 0); + + /* subscribe should complete after the timeout */ + aws_test311_wait_for_subscribe_to_complete(state_test_data); + /* Check that the subscribe has been completed with a timeout error */ + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_MQTT_TIMEOUT); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_subscribe_single_timeout_override, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_subscribe_single_timeout_override_fn, + s_clean_up_mqtt_server_fn, + &test_data) + /** * Test that connection is healthy, user set the timeout for request, and timeout happens and the multi-subscribe * failed. diff --git a/tests/v3/mqtt311_testing_utils.c b/tests/v3/mqtt311_testing_utils.c index 738443d9..4f3781d1 100644 --- a/tests/v3/mqtt311_testing_utils.c +++ b/tests/v3/mqtt311_testing_utils.c @@ -575,7 +575,7 @@ static bool s_is_ops_completed(void *arg) { void aws_test311_wait_for_ops_completed(struct mqtt_connection_state_test *state_test_data) { aws_mutex_lock(&state_test_data->lock); - aws_condition_variable_wait_for_pred( - &state_test_data->cvar, &state_test_data->lock, 10000000000, s_is_ops_completed, state_test_data); + aws_condition_variable_wait_pred( + &state_test_data->cvar, &state_test_data->lock, s_is_ops_completed, state_test_data); aws_mutex_unlock(&state_test_data->lock); } \ No newline at end of file