From 468e73e7e3ecf12462e44ef5ce8ad9d3a18c661d Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 11 Apr 2024 10:33:33 -0700 Subject: [PATCH] Updates --- bin/elastishadow/main.c | 133 ++++++++++++++++++++++++++++++++++------ 1 file changed, 113 insertions(+), 20 deletions(-) diff --git a/bin/elastishadow/main.c b/bin/elastishadow/main.c index b87f0306..08639bf5 100644 --- a/bin/elastishadow/main.c +++ b/bin/elastishadow/main.c @@ -189,8 +189,8 @@ static void s_handle_get( size_t argument_count = aws_array_list_length(arguments) - 1; if (argument_count != 2) { - printf("invalid get options:\n"); - printf(" get \n"); + printf("invalid get-named-shadow options:\n"); + printf(" get-named-shadow \n"); return; } @@ -211,7 +211,7 @@ static void s_handle_get( char rejected_path[128]; snprintf(rejected_path, AWS_ARRAY_SIZE(rejected_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get/rejected", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); - struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str(""); + struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str("clientToken"); struct aws_mqtt_request_operation_response_path response_paths[] = { { .topic = aws_byte_cursor_from_c_str(accepted_path), @@ -247,7 +247,7 @@ static void s_handle_get( .user_data = aws_string_new_from_c_str(allocator, correlation_token), }; - printf("Submitting GetNamedShadow '" PRInSTR "' for thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); + printf("Submitting GetNamedShadow request for shadow '" PRInSTR "' of thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); if (aws_mqtt_request_response_client_submit_request(context->rr_client, &get_options) == AWS_OP_ERR) { int error_code = aws_last_error(); @@ -255,7 +255,7 @@ static void s_handle_get( } } -static void s_handle_update( +static void s_handle_update_reported( struct app_ctx *context, struct aws_allocator *allocator, struct aws_array_list *arguments, @@ -266,6 +266,102 @@ static void s_handle_update( (void)line_cursor; } +static void s_on_update_shadow_desired_complete( + const struct aws_byte_cursor *response_topic, + const struct aws_byte_cursor *payload, + int error_code, + void *user_data) { + + struct aws_string *correlation_token = user_data; + + if (payload != NULL) { + printf("UpdateNameShadowDesired request '%s' response received on topic '" PRInSTR "' with body:\n " PRInSTR "\n", correlation_token->bytes, AWS_BYTE_CURSOR_PRI(*response_topic), AWS_BYTE_CURSOR_PRI(*payload)); + } else { + printf("UpdateNameShadowDesired request '%s' failed with error code %d(%s)\n", correlation_token->bytes, error_code, aws_error_debug_str(error_code)); + } + + aws_string_destroy(correlation_token); +} + + +static void s_handle_update_desired( + struct app_ctx *context, + struct aws_allocator *allocator, + struct aws_array_list *arguments, + struct aws_byte_cursor line_cursor) { + + size_t argument_count = aws_array_list_length(arguments) - 1; + if (argument_count < 3) { + printf("invalid update-named-shadow-desired options:\n"); + printf(" delete-named-shadow \n"); + return; + } + + struct aws_byte_cursor thing_name_cursor; + AWS_ZERO_STRUCT(thing_name_cursor); + aws_array_list_get_at(arguments, &thing_name_cursor, 1); + + struct aws_byte_cursor shadow_name_cursor; + AWS_ZERO_STRUCT(shadow_name_cursor); + aws_array_list_get_at(arguments, &shadow_name_cursor, 2); + + struct aws_byte_cursor desired_state_cursor; + aws_array_list_get_at(arguments, &desired_state_cursor, 3); + desired_state_cursor.len = (size_t)(line_cursor.ptr + line_cursor.len - desired_state_cursor.ptr); + + char subscription_topic_filter[128]; + snprintf(subscription_topic_filter, AWS_ARRAY_SIZE(subscription_topic_filter), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/+", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + + char accepted_path[128]; + snprintf(accepted_path, AWS_ARRAY_SIZE(accepted_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/accepted", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + + char rejected_path[128]; + snprintf(rejected_path, AWS_ARRAY_SIZE(rejected_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/rejected", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + + struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str("clientToken"); + struct aws_mqtt_request_operation_response_path response_paths[] = { + { + .topic = aws_byte_cursor_from_c_str(accepted_path), + .correlation_token_json_path = correlation_token_path, + }, + { + .topic = aws_byte_cursor_from_c_str(rejected_path), + .correlation_token_json_path = correlation_token_path, + }, + }; + + char publish_topic[128]; + snprintf(publish_topic, AWS_ARRAY_SIZE(publish_topic), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + + char correlation_token[128]; + struct aws_byte_buf correlation_token_buf = aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token)); + + struct aws_uuid uuid; + aws_uuid_init(&uuid); + aws_uuid_to_str(&uuid, &correlation_token_buf); + + char request[256]; + snprintf(request, AWS_ARRAY_SIZE(request), "{\"clientToken\":\"%s\",\"state\":{\"desired\":" PRInSTR "}}", correlation_token, AWS_BYTE_CURSOR_PRI(desired_state_cursor)); + + struct aws_mqtt_request_operation_options get_options = { + .subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .response_paths = response_paths, + .response_path_count = 2, + .publish_topic = aws_byte_cursor_from_c_str(publish_topic), + .serialized_request = aws_byte_cursor_from_c_str(request), + .correlation_token = aws_byte_cursor_from_c_str(correlation_token), + .completion_callback = s_on_update_shadow_desired_complete, + .user_data = aws_string_new_from_c_str(allocator, correlation_token), + }; + + printf("Submitting UpdateNameShadowDesired request for shadow '" PRInSTR "' of thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); + + if (aws_mqtt_request_response_client_submit_request(context->rr_client, &get_options) == AWS_OP_ERR) { + int error_code = aws_last_error(); + printf("UpdateNameShadowDesired synchronous failure: %d(%s)", error_code, aws_error_debug_str(error_code)); + } +} + static void s_on_delete_shadow_complete( const struct aws_byte_cursor *response_topic, const struct aws_byte_cursor *payload, @@ -290,8 +386,8 @@ static void s_handle_delete( size_t argument_count = aws_array_list_length(arguments) - 1; if (argument_count != 2) { - printf("invalid delete options:\n"); - printf(" delete \n"); + printf("invalid delete-named-shadow options:\n"); + printf(" delete-named-shadow \n"); return; } @@ -312,7 +408,7 @@ static void s_handle_delete( char rejected_path[128]; snprintf(rejected_path, AWS_ARRAY_SIZE(rejected_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete/rejected", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); - struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str(""); + struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str("clientToken"); struct aws_mqtt_request_operation_response_path response_paths[] = { { .topic = aws_byte_cursor_from_c_str(accepted_path), @@ -348,7 +444,7 @@ static void s_handle_delete( .user_data = aws_string_new_from_c_str(allocator, correlation_token), }; - printf("Submitting DeleteNamedShadow '" PRInSTR "' for thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); + printf("Submitting DeleteNamedShadow request for shadow '" PRInSTR "' of thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); if (aws_mqtt_request_response_client_submit_request(context->rr_client, &get_options) == AWS_OP_ERR) { int error_code = aws_last_error(); @@ -363,9 +459,10 @@ static bool s_handle_input(struct app_ctx *context, struct aws_allocator *alloca struct aws_byte_cursor quit_cursor = aws_byte_cursor_from_c_str("quit"); struct aws_byte_cursor start_cursor = aws_byte_cursor_from_c_str("start"); struct aws_byte_cursor stop_cursor = aws_byte_cursor_from_c_str("stop"); - struct aws_byte_cursor get_cursor = aws_byte_cursor_from_c_str("get"); - struct aws_byte_cursor update_cursor = aws_byte_cursor_from_c_str("update"); - struct aws_byte_cursor delete_cursor = aws_byte_cursor_from_c_str("delete"); + struct aws_byte_cursor get_cursor = aws_byte_cursor_from_c_str("get-named-shadow"); + struct aws_byte_cursor update_reported_cursor = aws_byte_cursor_from_c_str("update-named-shadow-reported"); + struct aws_byte_cursor update_desired_cursor = aws_byte_cursor_from_c_str("update-named-shadow-desired"); + struct aws_byte_cursor delete_cursor = aws_byte_cursor_from_c_str("delete-named-shadow"); struct aws_array_list words; aws_array_list_init_dynamic(&words, allocator, 10, sizeof(struct aws_byte_cursor)); @@ -395,8 +492,10 @@ static bool s_handle_input(struct app_ctx *context, struct aws_allocator *alloca aws_mqtt5_client_stop(client, NULL, NULL); } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &get_cursor)) { s_handle_get(context, allocator, &words); - } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &update_cursor)) { - s_handle_update(context, allocator, &words, line_cursor); + } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &update_reported_cursor)) { + s_handle_update_reported(context, allocator, &words, line_cursor); + } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &update_desired_cursor)) { + s_handle_update_desired(context, allocator, &words, line_cursor); } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &delete_cursor)) { s_handle_delete(context, allocator, &words); } else { @@ -412,12 +511,6 @@ static bool s_handle_input(struct app_ctx *context, struct aws_allocator *alloca static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *publish, void *user_data) { (void)publish; (void)user_data; - - printf("PUBLISH received!\n"); - printf( - "Publish received to topic:'" PRInSTR "' payload '" PRInSTR "'\n", - AWS_BYTE_CURSOR_PRI(publish->topic), - AWS_BYTE_CURSOR_PRI(publish->payload)); } static void s_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) {