diff --git a/include/aws/mqtt/private/request-response/request_response_client.h b/include/aws/mqtt/private/request-response/request_response_client.h new file mode 100644 index 00000000..d0109f24 --- /dev/null +++ b/include/aws/mqtt/private/request-response/request_response_client.h @@ -0,0 +1,23 @@ +#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H +#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +struct aws_mqtt_request_response_client; + +AWS_EXTERN_C_BEGIN + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire_internal( + struct aws_mqtt_request_response_client *client); + +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release_internal( + struct aws_mqtt_request_response_client *client); + +AWS_EXTERN_C_END + +#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */ \ No newline at end of file diff --git a/include/aws/mqtt/private/request-response/weak_ref.h b/include/aws/mqtt/private/request-response/weak_ref.h deleted file mode 100644 index cff0a2d8..00000000 --- a/include/aws/mqtt/private/request-response/weak_ref.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H -#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H - -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include - -#include - -/* - * This is a simplification of the notion of a weak reference particular to the needs of the request-response - * MQTT service clients. This is not suitable for general use but could be extended - * for general use in the future. Until then, it stays private, here. - * - * This weak reference is a ref-counted object with an opaque value. The opaque value may be cleared or - * queried. These two operations *do not* provide any thread safety. - * - * The primary use is to allow one object to safely use asynchronous callback-driven APIs on a second object, despite - * the fact that the first object may get destroyed unpredictably. The two objects must be exclusive to a single - * event loop (because there's no thread safety or mutual exclusion on the opaque value held by the weak ref). - * - * The initial use is the request-response protocol adapter submitting operations to an MQTT client or an - * eventstream RPC connection. We use a single weak ref to the protocol adapter and zero its opaque value when - * the protocol adapter is destroyed. Operation callbacks that subsequently resolve can then short circuit and do - * nothing rather than call into garbage and crash. - * - * We use this rather than explicitly tracking and zeroing all pending operations (like the 3-to-5 adapter does) - * because this approach is simpler and our usage does not require any of these callbacks to be invoked once the - * request-response client is destroyed. - */ -struct aws_weak_ref; - -AWS_EXTERN_C_BEGIN - -/* - * Creates a new weak reference to an opaque value. - */ -AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_new(struct aws_allocator *allocator, void *referenced); - -/* - * Acquires a reference to the weak ref object. - */ -AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_acquire(struct aws_weak_ref *weak_ref); - -/* - * Removes a reference to the weak ref object. When the last reference is removed, the weak ref object will be - * destroyed. This has no effect on the opaque value held by the weak ref. - */ -AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_release(struct aws_weak_ref *weak_ref); - -/* - * Gets the current value of the opaque data held by the weak ref. - */ -AWS_MQTT_API void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref); - -/* - * Clears the opaque data held by the weak ref. - */ -AWS_MQTT_API void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref); - -AWS_EXTERN_C_END - -#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H */ diff --git a/source/request-response/protocol_adapter.c b/source/request-response/protocol_adapter.c index fefa7000..8aa244cb 100644 --- a/source/request-response/protocol_adapter.c +++ b/source/request-response/protocol_adapter.c @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -35,64 +34,84 @@ * Retries, when appropriate, are the responsibility of the caller. */ -// used by both subscribe and unsubscribe -struct aws_mqtt_protocol_adapter_subscription_op_data { - struct aws_allocator *allocator; +enum aws_mqtt_protocol_adapter_operation_type { + AMPAOT_SUBSCRIBE_UNSUBSCRIBE, + AMPAOT_PUBLISH, +}; +struct aws_mqtt_protocol_adapter_sub_unsub_data { struct aws_byte_buf topic_filter; - struct aws_weak_ref *callback_ref; }; -static struct aws_mqtt_protocol_adapter_subscription_op_data *s_aws_mqtt_protocol_adapter_subscription_op_data_new( +struct aws_mqtt_protocol_adapter_publish_data { + void (*completion_callback_fn)(int, void *); + void *user_data; +}; + +struct aws_mqtt_protocol_adapter_operation_userdata { + struct aws_allocator *allocator; + + struct aws_linked_list_node node; + void *adapter; + + enum aws_mqtt_protocol_adapter_operation_type operation_type; + + union { + struct aws_mqtt_protocol_adapter_sub_unsub_data sub_unsub_data; + struct aws_mqtt_protocol_adapter_publish_data publish_data; + } operation_data; +}; + +static struct aws_mqtt_protocol_adapter_operation_userdata *s_aws_mqtt_protocol_adapter_sub_unsub_data_new( struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, - struct aws_weak_ref *callback_ref) { - struct aws_mqtt_protocol_adapter_subscription_op_data *subscribe_data = - aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_subscription_op_data)); + void *adapter) { + + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_operation_userdata)); subscribe_data->allocator = allocator; - subscribe_data->callback_ref = aws_weak_ref_acquire(callback_ref); - aws_byte_buf_init_copy_from_cursor(&subscribe_data->topic_filter, allocator, topic_filter); + subscribe_data->operation_type = AMPAOT_SUBSCRIBE_UNSUBSCRIBE; + subscribe_data->adapter = adapter; + aws_byte_buf_init_copy_from_cursor( + &subscribe_data->operation_data.sub_unsub_data.topic_filter, allocator, topic_filter); return subscribe_data; } -static void s_aws_mqtt_protocol_adapter_subscription_op_data_destroy( - struct aws_mqtt_protocol_adapter_subscription_op_data *subscribe_data) { - aws_weak_ref_release(subscribe_data->callback_ref); - aws_byte_buf_clean_up(&subscribe_data->topic_filter); - - aws_mem_release(subscribe_data->allocator, subscribe_data); -} - -struct aws_mqtt_protocol_adapter_publish_op_data { - struct aws_allocator *allocator; - struct aws_weak_ref *callback_ref; - - void (*completion_callback_fn)(int, void *); - void *user_data; -}; - -static struct aws_mqtt_protocol_adapter_publish_op_data *s_aws_mqtt_protocol_adapter_publish_op_data_new( +static struct aws_mqtt_protocol_adapter_operation_userdata *s_aws_mqtt_protocol_adapter_publish_data_new( struct aws_allocator *allocator, const struct aws_protocol_adapter_publish_options *publish_options, - struct aws_weak_ref *callback_ref) { - struct aws_mqtt_protocol_adapter_publish_op_data *publish_data = - aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_publish_op_data)); + void *adapter) { + + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_operation_userdata)); publish_data->allocator = allocator; - publish_data->callback_ref = aws_weak_ref_acquire(callback_ref); - publish_data->completion_callback_fn = publish_options->completion_callback_fn; - publish_data->user_data = publish_options->user_data; + publish_data->operation_type = AMPAOT_PUBLISH; + publish_data->adapter = adapter; + + publish_data->operation_data.publish_data.completion_callback_fn = publish_options->completion_callback_fn; + publish_data->operation_data.publish_data.user_data = publish_options->user_data; return publish_data; } -static void s_aws_mqtt_protocol_adapter_publish_op_data_destroy( - struct aws_mqtt_protocol_adapter_publish_op_data *publish_data) { - aws_weak_ref_release(publish_data->callback_ref); +static void s_aws_mqtt_protocol_adapter_operation_user_data_destroy( + struct aws_mqtt_protocol_adapter_operation_userdata *userdata) { + if (userdata == NULL) { + return; + } + + if (aws_linked_list_node_next_is_valid(&userdata->node) && aws_linked_list_node_prev_is_valid(&userdata->node)) { + aws_linked_list_remove(&userdata->node); + } - aws_mem_release(publish_data->allocator, publish_data); + if (userdata->operation_type == AMPAOT_SUBSCRIBE_UNSUBSCRIBE) { + aws_byte_buf_clean_up(&userdata->operation_data.sub_unsub_data.topic_filter); + } + + aws_mem_release(userdata->allocator, userdata); } /*****************************************************************************************************************/ @@ -100,7 +119,8 @@ static void s_aws_mqtt_protocol_adapter_publish_op_data_destroy( struct aws_mqtt_protocol_adapter_311_impl { struct aws_allocator *allocator; struct aws_mqtt_protocol_adapter base; - struct aws_weak_ref *callback_ref; + + struct aws_linked_list incomplete_operations; struct aws_mqtt_protocol_adapter_options config; struct aws_event_loop *loop; @@ -128,8 +148,8 @@ static void s_protocol_adapter_311_subscribe_completion( (void)topic; (void)packet_id; - struct aws_mqtt_protocol_adapter_subscription_op_data *subscribe_data = userdata; - struct aws_mqtt_protocol_adapter_311_impl *adapter = aws_weak_ref_get_reference(subscribe_data->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = userdata; + struct aws_mqtt_protocol_adapter_311_impl *adapter = subscribe_data->adapter; if (adapter == NULL) { goto done; @@ -142,7 +162,7 @@ static void s_protocol_adapter_311_subscribe_completion( } struct aws_protocol_adapter_subscription_event subscribe_event = { - .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter), + .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->operation_data.sub_unsub_data.topic_filter), .event_type = AWS_PASET_SUBSCRIBE, .error_code = error_code, .retryable = true, @@ -152,16 +172,17 @@ static void s_protocol_adapter_311_subscribe_completion( done: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(subscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); } int s_aws_mqtt_protocol_adapter_311_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl; - struct aws_mqtt_protocol_adapter_subscription_op_data *subscribe_data = - s_aws_mqtt_protocol_adapter_subscription_op_data_new( - adapter->allocator, options->topic_filter, adapter->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &subscribe_data->node); uint64_t timeout_nanos = aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); @@ -182,7 +203,7 @@ int s_aws_mqtt_protocol_adapter_311_subscribe(void *impl, struct aws_protocol_ad error: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(subscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); return AWS_OP_ERR; } @@ -201,15 +222,15 @@ static void s_protocol_adapter_311_unsubscribe_completion( (void)connection; (void)packet_id; - struct aws_mqtt_protocol_adapter_subscription_op_data *unsubscribe_data = userdata; - struct aws_mqtt_protocol_adapter_311_impl *adapter = aws_weak_ref_get_reference(unsubscribe_data->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = userdata; + struct aws_mqtt_protocol_adapter_311_impl *adapter = unsubscribe_data->adapter; if (adapter == NULL) { goto done; } struct aws_protocol_adapter_subscription_event unsubscribe_event = { - .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->topic_filter), + .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->operation_data.sub_unsub_data.topic_filter), .event_type = AWS_PASET_UNSUBSCRIBE, .error_code = error_code, .retryable = s_is_retryable_unsubscribe311(error_code), @@ -219,16 +240,17 @@ static void s_protocol_adapter_311_unsubscribe_completion( done: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(unsubscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); } int s_aws_mqtt_protocol_adapter_311_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl; - struct aws_mqtt_protocol_adapter_subscription_op_data *unsubscribe_data = - s_aws_mqtt_protocol_adapter_subscription_op_data_new( - adapter->allocator, options->topic_filter, adapter->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &unsubscribe_data->node); uint64_t timeout_nanos = aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); @@ -245,7 +267,7 @@ int s_aws_mqtt_protocol_adapter_311_unsubscribe(void *impl, struct aws_protocol_ error: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(unsubscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); return AWS_OP_ERR; } @@ -261,26 +283,29 @@ static void s_protocol_adapter_311_publish_completion( (void)connection; (void)packet_id; - struct aws_mqtt_protocol_adapter_publish_op_data *publish_data = userdata; - struct aws_mqtt_protocol_adapter_311_impl *adapter = aws_weak_ref_get_reference(publish_data->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = userdata; + struct aws_mqtt_protocol_adapter_311_impl *adapter = publish_data->adapter; if (adapter == NULL) { goto done; } - (*publish_data->completion_callback_fn)(error_code, publish_data->user_data); + (*publish_data->operation_data.publish_data.completion_callback_fn)( + error_code, publish_data->operation_data.publish_data.user_data); done: - s_aws_mqtt_protocol_adapter_publish_op_data_destroy(publish_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); } int s_aws_mqtt_protocol_adapter_311_publish(void *impl, struct aws_protocol_adapter_publish_options *options) { struct aws_mqtt_protocol_adapter_311_impl *adapter = impl; struct aws_mqtt_client_connection_311_impl *connection_impl = adapter->connection->impl; - struct aws_mqtt_protocol_adapter_publish_op_data *publish_data = - s_aws_mqtt_protocol_adapter_publish_op_data_new(adapter->allocator, options, adapter->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = + s_aws_mqtt_protocol_adapter_publish_data_new(adapter->allocator, options, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &publish_data->node); uint64_t timeout_nanos = aws_timestamp_convert(options->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); @@ -300,7 +325,7 @@ int s_aws_mqtt_protocol_adapter_311_publish(void *impl, struct aws_protocol_adap error: - s_aws_mqtt_protocol_adapter_publish_op_data_destroy(publish_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); return AWS_OP_ERR; } @@ -390,14 +415,35 @@ static bool s_aws_mqtt_protocol_adapter_311_is_connected(void *impl) { return current_state == AWS_MQTT_CLIENT_STATE_CONNECTED; } +static void s_release_incomplete_operations(struct aws_linked_list *incomplete_operations) { + struct aws_linked_list dummy_list; + aws_linked_list_init(&dummy_list); + aws_linked_list_swap_contents(incomplete_operations, &dummy_list); + + while (!aws_linked_list_empty(&dummy_list)) { + struct aws_linked_list_node *head = aws_linked_list_pop_front(&dummy_list); + struct aws_mqtt_protocol_adapter_operation_userdata *userdata = + AWS_CONTAINER_OF(head, struct aws_mqtt_protocol_adapter_operation_userdata, node); + + userdata->adapter = NULL; + + if (userdata->operation_type == AMPAOT_PUBLISH) { + struct aws_mqtt_protocol_adapter_publish_data *publish_data = &userdata->operation_data.publish_data; + if (publish_data->completion_callback_fn != NULL) { + (*userdata->operation_data.publish_data.completion_callback_fn)( + AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, publish_data->user_data); + } + } + } +} + static void s_protocol_adapter_mqtt311_listener_termination_callback(void *user_data) { struct aws_mqtt_protocol_adapter_311_impl *adapter = user_data; struct aws_mqtt_client_connection_311_impl *impl = adapter->connection->impl; AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(impl->loop)); - aws_weak_ref_zero_reference(adapter->callback_ref); - aws_weak_ref_release(adapter->callback_ref); + s_release_incomplete_operations(&adapter->incomplete_operations); aws_mqtt_client_connection_release(adapter->connection); @@ -442,7 +488,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311( adapter->allocator = allocator; adapter->base.impl = adapter; adapter->base.vtable = &s_protocol_adapter_mqtt311_vtable; - adapter->callback_ref = aws_weak_ref_new(allocator, adapter); + aws_linked_list_init(&adapter->incomplete_operations); adapter->config = *options; adapter->loop = impl->loop; adapter->connection = aws_mqtt_client_connection_acquire(connection); @@ -471,7 +517,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311( struct aws_mqtt_protocol_adapter_5_impl { struct aws_allocator *allocator; struct aws_mqtt_protocol_adapter base; - struct aws_weak_ref *callback_ref; + struct aws_linked_list incomplete_operations; struct aws_mqtt_protocol_adapter_options config; struct aws_event_loop *loop; @@ -514,8 +560,8 @@ static void s_protocol_adapter_5_subscribe_completion( const struct aws_mqtt5_packet_suback_view *suback, int error_code, void *complete_ctx) { - struct aws_mqtt_protocol_adapter_subscription_op_data *subscribe_data = complete_ctx; - struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(subscribe_data->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = subscribe_data->adapter; if (adapter == NULL) { goto done; @@ -534,7 +580,7 @@ static void s_protocol_adapter_5_subscribe_completion( } struct aws_protocol_adapter_subscription_event subscribe_event = { - .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter), + .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->operation_data.sub_unsub_data.topic_filter), .event_type = AWS_PASET_SUBSCRIBE, .error_code = error_code, .retryable = is_retryable, @@ -544,15 +590,16 @@ static void s_protocol_adapter_5_subscribe_completion( done: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(subscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); } int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; - struct aws_mqtt_protocol_adapter_subscription_op_data *subscribe_data = - s_aws_mqtt_protocol_adapter_subscription_op_data_new( - adapter->allocator, options->topic_filter, adapter->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *subscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &subscribe_data->node); struct aws_mqtt5_subscription_view subscription_view = { .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, @@ -578,7 +625,7 @@ int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adap error: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(subscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(subscribe_data); return AWS_OP_ERR; } @@ -607,8 +654,8 @@ static void s_protocol_adapter_5_unsubscribe_completion( const struct aws_mqtt5_packet_unsuback_view *unsuback, int error_code, void *complete_ctx) { - struct aws_mqtt_protocol_adapter_subscription_op_data *unsubscribe_data = complete_ctx; - struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(unsubscribe_data->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = unsubscribe_data->adapter; if (adapter == NULL) { goto done; @@ -628,7 +675,7 @@ static void s_protocol_adapter_5_unsubscribe_completion( } struct aws_protocol_adapter_subscription_event unsubscribe_event = { - .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->topic_filter), + .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->operation_data.sub_unsub_data.topic_filter), .event_type = AWS_PASET_UNSUBSCRIBE, .error_code = error_code, .retryable = is_retryable, @@ -638,15 +685,16 @@ static void s_protocol_adapter_5_unsubscribe_completion( done: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(unsubscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); } int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; - struct aws_mqtt_protocol_adapter_subscription_op_data *unsubscribe_data = - s_aws_mqtt_protocol_adapter_subscription_op_data_new( - adapter->allocator, options->topic_filter, adapter->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *unsubscribe_data = + s_aws_mqtt_protocol_adapter_sub_unsub_data_new(adapter->allocator, options->topic_filter, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &unsubscribe_data->node); struct aws_mqtt5_packet_unsubscribe_view unsubscribe_view = { .topic_filters = &options->topic_filter, @@ -667,7 +715,7 @@ int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_ad error: - s_aws_mqtt_protocol_adapter_subscription_op_data_destroy(unsubscribe_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(unsubscribe_data); return AWS_OP_ERR; } @@ -679,8 +727,8 @@ static void s_protocol_adapter_5_publish_completion( const void *packet, int error_code, void *complete_ctx) { - struct aws_mqtt_protocol_adapter_publish_op_data *publish_data = complete_ctx; - struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(publish_data->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = publish_data->adapter; if (adapter == NULL) { goto done; @@ -693,17 +741,20 @@ static void s_protocol_adapter_5_publish_completion( } } - (*publish_data->completion_callback_fn)(error_code, publish_data->user_data); + (*publish_data->operation_data.publish_data.completion_callback_fn)( + error_code, publish_data->operation_data.publish_data.user_data); done: - s_aws_mqtt_protocol_adapter_publish_op_data_destroy(publish_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); } int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapter_publish_options *options) { struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; - struct aws_mqtt_protocol_adapter_publish_op_data *publish_data = - s_aws_mqtt_protocol_adapter_publish_op_data_new(adapter->allocator, options, adapter->callback_ref); + struct aws_mqtt_protocol_adapter_operation_userdata *publish_data = + s_aws_mqtt_protocol_adapter_publish_data_new(adapter->allocator, options, adapter); + + aws_linked_list_push_back(&adapter->incomplete_operations, &publish_data->node); struct aws_mqtt5_packet_publish_view publish_view = { .topic = options->topic, .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, .payload = options->payload}; @@ -722,7 +773,7 @@ int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapte error: - s_aws_mqtt_protocol_adapter_publish_op_data_destroy(publish_data); + s_aws_mqtt_protocol_adapter_operation_user_data_destroy(publish_data); return AWS_OP_ERR; } @@ -783,8 +834,7 @@ static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_da AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop)); - aws_weak_ref_zero_reference(adapter->callback_ref); - aws_weak_ref_release(adapter->callback_ref); + s_release_incomplete_operations(&adapter->incomplete_operations); aws_mqtt5_client_release(adapter->client); @@ -822,7 +872,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5( adapter->allocator = allocator; adapter->base.impl = adapter; adapter->base.vtable = &s_protocol_adapter_mqtt5_vtable; - adapter->callback_ref = aws_weak_ref_new(allocator, adapter); + aws_linked_list_init(&adapter->incomplete_operations); adapter->config = *options; adapter->loop = client->loop; adapter->client = aws_mqtt5_client_acquire(client); diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 66264104..c8d49658 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -487,7 +487,7 @@ struct aws_mqtt_request_response_client { struct aws_hash_table operation_lists_by_subscription_filter; }; -struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_acquire_internal( +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire_internal( struct aws_mqtt_request_response_client *client) { if (client != NULL) { aws_ref_count_acquire(&client->internal_ref_count); @@ -496,7 +496,7 @@ struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_acqu return client; } -struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_release_internal( +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release_internal( struct aws_mqtt_request_response_client *client) { if (client != NULL) { aws_ref_count_release(&client->internal_ref_count); @@ -738,7 +738,7 @@ static void s_aws_rr_subscription_status_event_task_delete(struct aws_rr_subscri } aws_byte_buf_clean_up(&task->topic_filter); - s_aws_mqtt_request_response_client_release_internal(task->rr_client); + aws_mqtt_request_response_client_release_internal(task->rr_client); aws_mem_release(task->allocator, task); } @@ -828,7 +828,7 @@ static struct aws_rr_subscription_status_event_task *s_aws_rr_subscription_statu task->allocator = allocator; task->type = event->type; task->operation_id = event->operation_id; - task->rr_client = s_aws_mqtt_request_response_client_acquire_internal(rr_client); + task->rr_client = aws_mqtt_request_response_client_acquire_internal(rr_client); aws_byte_buf_init_copy_from_cursor(&task->topic_filter, allocator, event->topic_filter); @@ -1621,7 +1621,7 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, */ - s_aws_mqtt_request_response_client_release_internal(operation->client_internal_ref); + aws_mqtt_request_response_client_release_internal(operation->client_internal_ref); if (operation->type == AWS_MRROT_STREAMING) { s_aws_mqtt_streaming_operation_storage_clean_up(&operation->storage.streaming_storage); @@ -1661,7 +1661,7 @@ static void s_aws_mqtt_rr_client_operation_init_shared( */ aws_mqtt_rr_client_operation_acquire(operation); - operation->client_internal_ref = s_aws_mqtt_request_response_client_acquire_internal(client); + operation->client_internal_ref = aws_mqtt_request_response_client_acquire_internal(client); operation->id = s_aws_mqtt_request_response_client_allocate_operation_id(client); s_change_operation_state(operation, AWS_MRROS_NONE); diff --git a/source/request-response/weak_ref.c b/source/request-response/weak_ref.c deleted file mode 100644 index 949014e7..00000000 --- a/source/request-response/weak_ref.c +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include - -#include - -struct aws_weak_ref { - struct aws_allocator *allocator; - struct aws_ref_count refcount; - void *referenced; -}; - -static void s_destroy_weak_ref(void *value) { - struct aws_weak_ref *weak_ref = value; - - aws_mem_release(weak_ref->allocator, weak_ref); -} - -struct aws_weak_ref *aws_weak_ref_new(struct aws_allocator *allocator, void *referenced) { - struct aws_weak_ref *weak_ref = aws_mem_calloc(allocator, 1, sizeof(struct aws_weak_ref)); - - aws_ref_count_init(&weak_ref->refcount, weak_ref, s_destroy_weak_ref); - weak_ref->allocator = allocator; - weak_ref->referenced = referenced; - - return weak_ref; -} - -struct aws_weak_ref *aws_weak_ref_acquire(struct aws_weak_ref *weak_ref) { - if (NULL != weak_ref) { - aws_ref_count_acquire(&weak_ref->refcount); - } - - return weak_ref; -} - -struct aws_weak_ref *aws_weak_ref_release(struct aws_weak_ref *weak_ref) { - if (NULL != weak_ref) { - aws_ref_count_release(&weak_ref->refcount); - } - - return NULL; -} - -void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref) { - return weak_ref->referenced; -} - -void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref) { - weak_ref->referenced = NULL; -}