Skip to content

Commit

Permalink
Relax validation on correlation token and path to support identity se…
Browse files Browse the repository at this point in the history
…rvice; correlation token tracking; fix potential issue where we could remove operations from tracking tables when they hadn't been added
  • Loading branch information
bretambrose committed Apr 2, 2024
1 parent e302be5 commit 4e6c21d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 64 deletions.
109 changes: 78 additions & 31 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,17 @@ Client Tables/Lookups
(Authoritative operation container)
1. &operation.id -> &operation // added on in-thread enqueue, removed on operation completion/destruction
(Response topic -> Correlation token extraction info)
2. &topic -> &{topic, topic_buffer, correlation token json path buffer} // per-message-path add/replace on in-thread
enqueue, removed on client destruction
(Response path topic -> Correlation token extraction info)
2. &topic -> &{topic, topic_buffer, correlation token json path buffer} // ref-counted, per-message-path add on
request dequeue into subscribing/subscribed state, decref/removed on operation completion/destruction
(Correlation token -> request operation)
3. &operation.correlation token -> (request) &operation // added on in-thread request op move to awaiting response
(Request correlation token -> request operation)
3. &operation.correlation token -> (request) &operation // added on request dequeue into subscribing/subscribed
state, removed on operation completion/destruction
(Subscription filter -> all operations using that filter)
4. &topic_filter -> &{topic_filter, linked_list} // added on in-thread pop from queue, removed from list on
operation completion/destruction also checked for empty and removed from table
Note: 4 tracks both streaming and request-response operations but each uses the table in different ways. Both use
the table to react to subscription status events to move the operation forward state-wise. Additionally,
streaming operations use the table to map incoming publishes to listening streaming operations. OTOH, request
operations use table 2 and then 3 to map incoming publishes to operations.
(Streaming subscription filter -> list of all operations using that filter)
4. &topic_filter -> &{topic_filter, linked_list} // added on request dequeue into subscribing/subscribed state,
removed from list on operation completion/destruction also checked for empty and removed from table
*/

Expand Down Expand Up @@ -446,6 +441,8 @@ struct aws_mqtt_rr_client_operation {

enum aws_mqtt_request_response_operation_state state;

bool in_client_tables;

struct aws_task submit_task;
struct aws_task destroy_task;
};
Expand Down Expand Up @@ -539,6 +536,11 @@ struct aws_mqtt_request_response_client {
* when the client is shutting down.
*/
struct aws_hash_table request_response_paths;

/*
* Map from cursor (correlation token) -> request operation
*/
struct aws_hash_table operations_by_correlation_tokens;
};

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire_internal(
Expand Down Expand Up @@ -583,6 +585,7 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request
aws_priority_queue_clean_up(&client->operations_by_timeout);
aws_hash_table_clean_up(&client->streaming_operation_subscription_lists);
aws_hash_table_clean_up(&client->request_response_paths);
aws_hash_table_clean_up(&client->operations_by_correlation_tokens);

aws_mem_release(client->allocator, client);

Expand Down Expand Up @@ -1241,6 +1244,15 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie
NULL,
s_aws_rr_response_path_table_hash_element_destroy);

aws_hash_table_init(
&rr_client->operations_by_correlation_tokens,
allocator,
MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
aws_hash_byte_cursor_ptr,
aws_mqtt_byte_cursor_hash_equality,
NULL,
NULL);

aws_linked_list_init(&rr_client->operation_queue);

aws_task_init(
Expand Down Expand Up @@ -1412,14 +1424,37 @@ static int s_add_request_operation_to_response_path_table(
return AWS_OP_SUCCESS;
}

static int s_add_request_operation_to_correlation_token_table(
struct aws_mqtt_request_response_client *client,
struct aws_mqtt_rr_client_operation *operation) {

return aws_hash_table_put(
&client->operations_by_correlation_tokens,
&operation->storage.request_storage.options.correlation_token,
operation,
NULL);
}

static int s_add_in_progress_operation_to_tracking_tables(
struct aws_mqtt_request_response_client *client,
struct aws_mqtt_rr_client_operation *operation) {
if (operation->type == AWS_MRROT_STREAMING) {
return s_add_streaming_operation_to_subscription_topic_filter_table(client, operation);
if (s_add_streaming_operation_to_subscription_topic_filter_table(client, operation)) {
return AWS_OP_ERR;
}
} else {
return s_add_request_operation_to_response_path_table(client, operation);
if (s_add_request_operation_to_response_path_table(client, operation)) {
return AWS_OP_ERR;
}

if (s_add_request_operation_to_correlation_token_table(client, operation)) {
return AWS_OP_ERR;
}
}

operation->in_client_tables = true;

return AWS_OP_SUCCESS;
}

static void s_handle_operation_subscribe_result(
Expand Down Expand Up @@ -1462,12 +1497,34 @@ static enum aws_rr_subscription_type s_rr_operation_type_to_subscription_type(
return ARRST_EVENT_STREAM;
}

static bool s_can_operation_dequeue(
struct aws_mqtt_request_response_client *client,
struct aws_mqtt_rr_client_operation *operation) {
if (operation->type != AWS_MRROT_REQUEST) {
return true;
}

struct aws_hash_element *token_element = NULL;
if (aws_hash_table_find(
&client->operations_by_correlation_tokens,
&operation->storage.request_storage.options.correlation_token,
&token_element)) {
return false;
}

return token_element == NULL;
}

static void s_process_queued_operations(struct aws_mqtt_request_response_client *client) {
while (!aws_linked_list_empty(&client->operation_queue)) {
struct aws_linked_list_node *head = aws_linked_list_front(&client->operation_queue);
struct aws_mqtt_rr_client_operation *head_operation =
AWS_CONTAINER_OF(head, struct aws_mqtt_rr_client_operation, node);

if (!s_can_operation_dequeue(client, head_operation)) {
break;
}

struct aws_rr_acquire_subscription_options subscribe_options = {
.topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(head_operation),
.operation_id = head_operation->id,
Expand Down Expand Up @@ -1692,20 +1749,6 @@ static bool s_are_request_operation_options_valid(
AWS_BYTE_CURSOR_PRI(path->topic));
return false;
}

if (path->correlation_token_json_path.len == 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"(%p) rr client request options - empty correlation token json path",
(void *)client);
return false;
}
}

if (request_options->correlation_token.len == 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty correlation token", (void *)client);
return false;
}

if (!aws_mqtt_is_valid_topic(&request_options->publish_topic)) {
Expand Down Expand Up @@ -1815,11 +1858,15 @@ static void s_aws_mqtt_request_operation_storage_clean_up(struct aws_mqtt_reques
aws_byte_buf_clean_up(&storage->operation_data);
}

static void s_remove_request_operation_from_response_path_table(struct aws_mqtt_rr_client_operation *operation) {
static void s_remove_operation_from_client_tables(struct aws_mqtt_rr_client_operation *operation) {
if (operation->type != AWS_MRROT_REQUEST) {
return;
}

if (!operation->in_client_tables) {
return;
}

struct aws_mqtt_request_response_client *client = operation->client_internal_ref;
struct aws_array_list *paths = &operation->storage.request_storage.operation_response_paths;
size_t path_count = aws_array_list_length(paths);
Expand Down Expand Up @@ -1880,7 +1927,7 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg,
aws_rr_subscription_manager_release_subscription(&client->subscription_manager, &release_options);
}

s_remove_request_operation_from_response_path_table(operation);
s_remove_operation_from_client_tables(operation);

aws_mqtt_request_response_client_release_internal(operation->client_internal_ref);

Expand Down
2 changes: 0 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,6 @@ add_test_case(rrc_mqtt311_create_destroy)

add_test_case(rrc_submit_request_operation_failure_no_response_paths)
add_test_case(rrc_submit_request_operation_failure_invalid_response_topic)
add_test_case(rrc_submit_request_operation_failure_invalid_response_correlation_token_path)
add_test_case(rrc_submit_request_operation_failure_no_correlation_token)
add_test_case(rrc_submit_request_operation_failure_invalid_publish_topic)
add_test_case(rrc_submit_request_operation_failure_invalid_subscription_topic_filter)
add_test_case(rrc_submit_request_operation_failure_empty_request)
Expand Down
31 changes: 0 additions & 31 deletions tests/request-response/request_response_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -777,37 +777,6 @@ AWS_TEST_CASE(
rrc_submit_request_operation_failure_invalid_response_topic,
s_rrc_submit_request_operation_failure_invalid_response_topic_fn)

static void s_invalid_response_correlation_token_path_mutator(
struct aws_mqtt_request_operation_options *request_options) {
request_options->response_paths[0].correlation_token_json_path = aws_byte_cursor_from_c_str("");
}

static int s_rrc_submit_request_operation_failure_invalid_response_correlation_token_path_fn(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;

return s_rrc_do_submit_request_operation_failure_test(allocator, s_invalid_response_correlation_token_path_mutator);
}

AWS_TEST_CASE(
rrc_submit_request_operation_failure_invalid_response_correlation_token_path,
s_rrc_submit_request_operation_failure_invalid_response_correlation_token_path_fn)

static void s_no_correlation_token_mutator(struct aws_mqtt_request_operation_options *request_options) {
request_options->correlation_token = aws_byte_cursor_from_c_str("");
}

static int s_rrc_submit_request_operation_failure_no_correlation_token_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

return s_rrc_do_submit_request_operation_failure_test(allocator, s_no_correlation_token_mutator);
}

AWS_TEST_CASE(
rrc_submit_request_operation_failure_no_correlation_token,
s_rrc_submit_request_operation_failure_no_correlation_token_fn)

static void s_invalid_publish_topic_mutator(struct aws_mqtt_request_operation_options *request_options) {
request_options->publish_topic = aws_byte_cursor_from_c_str("a/b/#");
}
Expand Down

0 comments on commit 4e6c21d

Please sign in to comment.