Skip to content

Commit

Permalink
fix mqtt3 suback return qos
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Jan 9, 2024
1 parent 17ee24a commit 10de2ea
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
21 changes: 19 additions & 2 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1945,17 +1945,28 @@ static void s_subscribe_complete(
for (size_t i = 0; i < list_len; i++) {
err |= aws_array_list_get_at(&task_arg->topics, &topic, i);
struct aws_mqtt_topic_subscription *subscription = &topic->request;
// If subscribe complete with error, set the qos value to AWS_MQTT_QOS_FAILURE
if(error_code != AWS_OP_SUCCESS)
{
subscription->qos = AWS_MQTT_QOS_FAILURE;
}
err |= aws_array_list_push_back(&cb_list, &subscription);
}
AWS_ASSUME(!err);
task_arg->on_suback.multi(&connection->base, packet_id, &cb_list, error_code, task_arg->on_suback_ud);
aws_array_list_clean_up(&cb_list);
} else if (task_arg->on_suback.single) {
// The topic->request.qos should be already updated to returned qos
enum aws_mqtt_qos returned_qos = topic->request.qos;
if (error_code != AWS_OP_SUCCESS)
{
returned_qos = AWS_MQTT_QOS_FAILURE;
}
task_arg->on_suback.single(
&connection->base,
packet_id,
&topic->request.topic,
topic->request.qos,
returned_qos,
error_code,
task_arg->on_suback_ud);
}
Expand Down Expand Up @@ -2121,11 +2132,17 @@ static void s_subscribe_single_complete(
if (task_arg->on_suback.single) {
AWS_ASSUME(aws_string_is_valid(topic->filter));
aws_mqtt_suback_fn *suback = task_arg->on_suback.single;
// The topic->request.qos should be already updated to returned qos
enum aws_mqtt_qos returned_qos = topic->request.qos;
if (error_code != AWS_OP_SUCCESS)
{
returned_qos = AWS_MQTT_QOS_FAILURE;
}
suback(
&connection->base,
packet_id,
&topic->request.topic,
topic->request.qos,
returned_qos,
error_code,
task_arg->on_suback_ud);
}
Expand Down
27 changes: 14 additions & 13 deletions tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,16 @@ static int s_clean_up_mqtt_server_fn(struct aws_allocator *allocator, int setup_
if (!setup_result) {
struct mqtt_connection_state_test *state_test_data = ctx;

s_received_publish_packet_list_clean_up(&state_test_data->published_messages);
s_received_publish_packet_list_clean_up(&state_test_data->any_published_messages);
aws_array_list_clean_up(&state_test_data->qos_returned);
aws_mqtt_client_connection_release(state_test_data->mqtt_connection);

s_wait_for_termination_to_complete(state_test_data);
ASSERT_UINT_EQUALS(1, state_test_data->connection_termination_calls);

// Clean up the state_test_data after the client is terminated.
s_received_publish_packet_list_clean_up(&state_test_data->published_messages);
s_received_publish_packet_list_clean_up(&state_test_data->any_published_messages);
aws_array_list_clean_up(&state_test_data->qos_returned);

aws_mqtt_client_release(state_test_data->mqtt_client);
aws_client_bootstrap_release(state_test_data->client_bootstrap);
aws_host_resolver_release(state_test_data->host_resolver);
Expand Down Expand Up @@ -582,9 +584,8 @@ static void s_on_suback(
struct mqtt_connection_state_test *state_test_data = userdata;

aws_mutex_lock(&state_test_data->lock);
if (!error_code) {
aws_array_list_push_back(&state_test_data->qos_returned, &qos);
}
aws_array_list_push_back(&state_test_data->qos_returned, &qos);

state_test_data->subscribe_completed = true;
state_test_data->subscribe_complete_error = error_code;
aws_mutex_unlock(&state_test_data->lock);
Expand Down Expand Up @@ -619,14 +620,14 @@ static void s_on_multi_suback(

aws_mutex_lock(&state_test_data->lock);
state_test_data->subscribe_completed = true;
if (!error_code) {
size_t length = aws_array_list_length(topic_subacks);
for (size_t i = 0; i < length; ++i) {
struct aws_mqtt_topic_subscription *subscription = NULL;
aws_array_list_get_at(topic_subacks, &subscription, i);
aws_array_list_push_back(&state_test_data->qos_returned, &subscription->qos);
}

size_t length = aws_array_list_length(topic_subacks);
for (size_t i = 0; i < length; ++i) {
struct aws_mqtt_topic_subscription *subscription = NULL;
aws_array_list_get_at(topic_subacks, &subscription, i);
aws_array_list_push_back(&state_test_data->qos_returned, &subscription->qos);
}

aws_mutex_unlock(&state_test_data->lock);
aws_condition_variable_notify_one(&state_test_data->cvar);
}
Expand Down

0 comments on commit 10de2ea

Please sign in to comment.