Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Dec 11, 2023
1 parent b7de17b commit f501fa0
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 57 deletions.
30 changes: 30 additions & 0 deletions include/aws/mqtt/private/request-response/weak_ref.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H
#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/exports.h>

#include <aws/common/common.h>

struct aws_weak_ref;

AWS_EXTERN_C_BEGIN

AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_new(struct aws_allocator *allocator, void *referenced);

AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_acquire(struct aws_weak_ref *weak_ref);

AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_release(struct aws_weak_ref *weak_ref);

AWS_MQTT_API void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref);

AWS_MQTT_API void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref);

AWS_EXTERN_C_END


#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H */
100 changes: 43 additions & 57 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,11 @@

#include <aws/io/event_loop.h>
#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/request-response/weak_ref.h>
#include <aws/mqtt/private/v5/mqtt5_client_impl.h>
#include <aws/mqtt/v5/mqtt5_client.h>
#include <aws/mqtt/v5/mqtt5_listener.h>

struct aws_protocol_adapter_weak_ref {
struct aws_allocator *allocator;
struct aws_ref_count refcount;
void *referenced;
};

static void s_destroy_protocol_adapter_weak_ref(void *value) {
struct aws_protocol_adapter_weak_ref *weak_ref = value;

aws_mem_release(weak_ref->allocator, weak_ref);
}

struct aws_protocol_adapter_weak_ref *aws_protocol_adapter_weak_ref_new(struct aws_allocator *allocator, void *referenced) {
struct aws_protocol_adapter_weak_ref *weak_ref = aws_mem_calloc(allocator, 1, sizeof(struct aws_protocol_adapter_weak_ref));

aws_ref_count_init(&weak_ref->refcount, weak_ref, s_destroy_protocol_adapter_weak_ref);
weak_ref->allocator = allocator;
weak_ref->referenced = referenced;

return weak_ref;
}

struct aws_protocol_adapter_weak_ref *aws_protocol_adapter_weak_ref_acquire(struct aws_protocol_adapter_weak_ref *weak_ref) {
if (NULL != weak_ref) {
aws_ref_count_acquire(&weak_ref->refcount);
}

return weak_ref;
}

struct aws_protocol_adapter_weak_ref *aws_protocol_adapter_weak_ref_release(struct aws_protocol_adapter_weak_ref *weak_ref) {
if (NULL != weak_ref) {
aws_ref_count_release(&weak_ref->refcount);
}

return NULL;
}

/******************************************************************************************************************/

enum aws_protocol_adapter_subscription_status {
PASS_NONE,
PASS_SUBSCRIBING,
Expand Down Expand Up @@ -84,25 +45,27 @@ static void s_aws_mqtt_protocol_adapter_subscription_destroy(struct aws_mqtt_pro

struct aws_mqtt_protocol_adapter_subscription_set {
struct aws_allocator *allocator;
struct aws_mqtt_protocol_adapter *owner; // not an acquired reference due to the parent-child relationship
struct aws_hash_table subscriptions; // aws_byte_cursor * -> aws_mqtt_protocol_adapter_subscription *

aws_protocol_adapter_subscription_status_fn *subscription_status_update_callback;
void *callback_user_data;
};

static int s_aws_mqtt_protocol_adapter_subscription_set_init(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options) {
static int s_aws_mqtt_protocol_adapter_subscription_set_init(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter *owner, struct aws_mqtt_protocol_adapter_options *options) {
subscription_set->allocator = allocator;
subscription_set->owner = owner;
subscription_set->subscription_status_update_callback = options->subscription_status_update_callback;
subscription_set->callback_user_data = options->user_data;

return aws_hash_table_init(&subscription_set->subscriptions, allocator, 0, aws_hash_byte_cursor_ptr, aws_mqtt_byte_cursor_hash_equality, NULL, NULL);
}

static int s_aws_mqtt_protocol_adapter_subscription_set_subscription_clean_up(void *context, struct aws_hash_element *elem) {
struct aws_mqtt_protocol_adapter *adapter = context;
static int s_aws_mqtt_protocol_adapter_subscription_set_subscription_destroy(void *context, struct aws_hash_element *elem) {
struct aws_mqtt_protocol_adapter_subscription_set *subscription_set = context;
struct aws_mqtt_protocol_adapter *adapter = subscription_set->owner;

struct aws_mqtt_protocol_adapter_subscription *subscription = elem->value;

if (subscription->status != PASS_UNSUBSCRIBING) {
struct aws_protocol_adapter_unsubscribe_options options = {
.topic_filter = subscription->topic_filter,
Expand All @@ -116,23 +79,46 @@ static int s_aws_mqtt_protocol_adapter_subscription_set_subscription_clean_up(vo
return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE;
}

static void s_aws_mqtt_protocol_adapter_subscription_set_clean_up(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_mqtt_protocol_adapter *owner) {
static void s_aws_mqtt_protocol_adapter_subscription_set_clean_up(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set) {
struct aws_hash_table subscriptions;
AWS_ZERO_STRUCT(subscriptions);

aws_hash_table_swap(&subscription_set->subscriptions, &subscriptions);

aws_hash_table_foreach(&subscriptions, s_aws_mqtt_protocol_adapter_subscription_set_subscription_clean_up, owner);
aws_hash_table_foreach(&subscriptions, s_aws_mqtt_protocol_adapter_subscription_set_subscription_destroy, subscription_set);

aws_hash_table_clean_up(&subscriptions);
}

/*
* On subscribe success: if there's not an entry, is this possible? No because we're called only by a function that checks for adapter weak->strong first, so the adapter exists and we don't allow subscription removal without an unsubscribe complete and we don't allow the subscribe until the unsubscribe has completed. But what
* if
* On subscribe success: if there's an entry, transition | subscribing -> subscribed, send an update
* On subscribe failure: if there's not an entry, is this possible?
* On subscribe failure: if there's an entry, transition -> unsubscribing, send an update
*
* Should we just blindly add if the adapter exists? Yes: simplest. No: represents undefined behavior if it shouldn't be happening
*
* In the design we said that the subscription set is just a dumb reflection of the ordered sequence of operations
* from the rr client which implies we should just create_or_update. The only time we don't want to create_or_update
* is if we're in/post destruction but then there's no adapter and we early out
*/
static void s_aws_mqtt_protocol_adapter_subscription_set_on_subscribe_completion(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, bool success) {
(void)subscription_set;
(void)topic_filter;
(void)success;
if (!success) {
struct aws_protocol_adapter_unsubscribe_options options = {
.topic_filter = topic_filter,
};

aws_mqtt_protocol_adapter_unsubscribe(subscription_set->owner, &options);
}

struct aws_hash_element *hash_element = NULL;
if (!aws_hash_table_find(&subscription_set->subscriptions, &topic_filter, &hash_element) || hash_element == NULL) {
return;
}

struct aws_mqtt_protocol_adapter_subscription *subscription = hash_element->value;

??;
}

static void s_aws_mqtt_protocol_adapter_subscription_set_update_subscription(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, enum aws_protocol_adapter_subscription_status status) {
Expand Down Expand Up @@ -194,14 +180,14 @@ static struct aws_mqtt_protocol_adapter_5_subscribe_data *aws_mqtt_protocol_adap
struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_subscribe_data));

subscribe_data->allocator = allocator;
subscribe_data->callback_ref = aws_protocol_adapter_weak_ref_acquire(callback_ref);
subscribe_data->callback_ref = aws_weak_ref_acquire(callback_ref);
aws_byte_buf_init_copy_from_cursor(&subscribe_data->topic_filter, allocator, topic_filter);

return subscribe_data;
}

static void aws_mqtt_protocol_adapter_5_subscribe_data_delete(struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data) {
aws_protocol_adapter_weak_ref_release(subscribe_data->callback_ref);
aws_weak_ref_release(subscribe_data->callback_ref);

aws_mem_release(subscribe_data->allocator, subscribe_data);
}
Expand All @@ -210,14 +196,14 @@ static void s_protocol_adapter_5_subscribe_completion(const struct aws_mqtt5_pac
int error_code,
void *complete_ctx) {
struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data = complete_ctx;
struct aws_mqtt_protocol_adapter_5_impl *adapter = subscribe_data->callback_ref->referenced;
struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(subscribe_data->callback_ref);

if (adapter == NULL) {
goto done;
}

bool success = error_code == AWS_ERROR_SUCCESS && suback != NULL && suback->reason_code_count == 1 && suback->reason_codes[0] <= AWS_MQTT5_SARC_GRANTED_QOS_1;
s_aws_mqtt_protocol_adapter_subscription_set_on_subscribe_completion(&adapter->config, &adapter->subscriptions, aws_byte_cursor_from_buf(&subscribe_data->topic_filter), success);
s_aws_mqtt_protocol_adapter_subscription_set_on_subscribe_completion(&adapter->subscriptions, aws_byte_cursor_from_buf(&subscribe_data->topic_filter), success);

done:

Expand Down Expand Up @@ -290,8 +276,8 @@ static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_da

aws_mqtt5_client_release(adapter->client);

adapter->callback_ref->referenced = NULL;
aws_protocol_adapter_weak_ref_release(adapter->callback_ref);
aws_weak_ref_zero_reference(adapter->callback_ref);
aws_weak_ref_release(adapter->callback_ref);

aws_mem_release(adapter->allocator, adapter);
}
Expand All @@ -311,7 +297,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aw
adapter->allocator = allocator;
adapter->base.impl = adapter;
adapter->base.vtable = &s_protocol_adapter_mqtt5_vtable;
adapter->callback_ref = aws_protocol_adapter_weak_ref_new(allocator, adapter);
adapter->callback_ref = aws_weak_ref_new(allocator, adapter);
adapter->config = *options;
adapter->loop = client->loop;
adapter->client = aws_mqtt5_client_acquire(client);
Expand Down
54 changes: 54 additions & 0 deletions source/request-response/weak_ref.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/private/request-response/weak_ref.h>

#include <aws/common/ref_count.h>

struct aws_weak_ref {
struct aws_allocator *allocator;
struct aws_ref_count refcount;
void *referenced;
};

static void s_destroy_weak_ref(void *value) {
struct aws_weak_ref *weak_ref = value;

aws_mem_release(weak_ref->allocator, weak_ref);
}

struct aws_weak_ref *aws_weak_ref_new(struct aws_allocator *allocator, void *referenced) {
struct aws_weak_ref *weak_ref = aws_mem_calloc(allocator, 1, sizeof(struct aws_weak_ref));

aws_ref_count_init(&weak_ref->refcount, weak_ref, s_destroy_weak_ref);
weak_ref->allocator = allocator;
weak_ref->referenced = referenced;

return weak_ref;
}

struct aws_weak_ref *aws_weak_ref_acquire(struct aws_weak_ref *weak_ref) {
if (NULL != weak_ref) {
aws_ref_count_acquire(&weak_ref->refcount);
}

return weak_ref;
}

struct aws_weak_ref *aws_weak_ref_release(struct aws_weak_ref *weak_ref) {
if (NULL != weak_ref) {
aws_ref_count_release(&weak_ref->refcount);
}

return NULL;
}

void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref) {
return weak_ref->referenced;
}

void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref) {
weak_ref->referenced = NULL;
}

0 comments on commit f501fa0

Please sign in to comment.