Skip to content

Commit

Permalink
Sync point
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 17, 2024
1 parent 08842aa commit 7a1db26
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 26 deletions.
4 changes: 2 additions & 2 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct aws_mqtt_protocol_adapter_options {

struct aws_mqtt_protocol_adapter_vtable {

void (*aws_mqtt_protocol_adapter_release_fn)(void *);
void (*aws_mqtt_protocol_adapter_delete_fn)(void *);

int (*aws_mqtt_protocol_adapter_subscribe_fn)(void *, struct aws_protocol_adapter_subscribe_options *);

Expand All @@ -83,7 +83,7 @@ AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_fro

AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt5_client *client);

AWS_MQTT_API void aws_mqtt_protocol_adapter_release(struct aws_mqtt_protocol_adapter *adapter);
AWS_MQTT_API void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter);

AWS_MQTT_API int aws_mqtt_protocol_adapter_subscribe(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_subscribe_options *options);

Expand Down
101 changes: 77 additions & 24 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ static void s_aws_mqtt_protocol_adapter_subscription_destroy(struct aws_mqtt_pro
aws_mem_release(subscription->allocator, subscription);
}

/******************************************************************************************************************/

struct aws_mqtt_protocol_adapter_subscription_set {
struct aws_allocator *allocator;
struct aws_mqtt_protocol_adapter *owner; // not an acquired reference due to the parent-child relationship
Expand Down Expand Up @@ -90,6 +92,42 @@ static void s_aws_mqtt_protocol_adapter_subscription_set_clean_up(struct aws_mqt
aws_hash_table_clean_up(&subscriptions);
}

static void s_aws_mqtt_protocol_adapter_subscription_set_update_subscription(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, enum aws_protocol_adapter_subscription_status status) {
(void)subscription_set;
(void)topic_filter;
(void)status;

// TODO
}

static void s_aws_mqtt_protocol_adapter_subscription_set_create_or_update_subscription(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, enum aws_protocol_adapter_subscription_status status) {
(void)subscription_set;
(void)topic_filter;
(void)status;

// TODO
}

/*
* New API contract
*
* Invariant 1: Subscribe is only called from the RR subscription manager when going from 0 to 1 pending operations
* Invariant 2: Unsubscribe is only called from the RR subscription manager when there are 0 pending operations, not
* necessarily on the exact transition to zero though.
*
* Entries are not tracked with the exception of eventstream impl which needs the stream handles to close.
* A subscribe failure should not trigger an unsubscribe, only notify the status callback.
* Subscription event callback should be {subscribe_success, subscribe_failure, unsubscribe_success, unsubscribe_failure}.
* The sub manager is responsible for calling Unsubscribe on all its entries when shutting down (before releasing
* hold of the adapter).
*
* How do we know not to retry unsubscribe failures because a subscribe came in? Well, we don't retry failures; let
* the manager make that decision. Only retry (maybe) if the manager is gone (ie failure against a zeroed weak ref).
*
* On subscribe failures with zeroed weak ref, trust that an Unsubscribe was sent that will resolve later and let it
* decide what to do.
*/

/*
* On subscribe success: if there's not an entry, is this possible? No because we're called only by a function that checks for adapter weak->strong first, so the adapter exists and we don't allow subscription removal without an unsubscribe complete and we don't allow the subscribe until the unsubscribe has completed. But what
* if
Expand All @@ -104,47 +142,48 @@ static void s_aws_mqtt_protocol_adapter_subscription_set_clean_up(struct aws_mqt
* is if we're in/post destruction but then there's no adapter and we early out
*/
static void s_aws_mqtt_protocol_adapter_subscription_set_on_subscribe_completion(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, bool success) {
if (!success) {
if (success) {
s_aws_mqtt_protocol_adapter_subscription_set_update_subscription(subscription_set, topic_filter, PASS_SUBSCRIBED);
} else {
struct aws_protocol_adapter_unsubscribe_options options = {
.topic_filter = topic_filter,
};

aws_mqtt_protocol_adapter_unsubscribe(subscription_set->owner, &options);
}



struct aws_hash_element *hash_element = NULL;
if (!aws_hash_table_find(&subscription_set->subscriptions, &topic_filter, &hash_element) || hash_element == NULL) {
return;
}

struct aws_mqtt_protocol_adapter_subscription *subscription = hash_element->value;
AWS_FATAL_ASSERT(subscription != NULL);

}

static void s_aws_mqtt_protocol_adapter_subscription_set_update_subscription(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, enum aws_protocol_adapter_subscription_status status) {
(void)subscription_set;
(void)topic_filter;
(void)status;
switch (subscription->status) {
case PASS_SUBSCRIBING: {
if (success) {
subscription->status = PASS_SUBSCRIBED;
}
}

??;
}

static void s_aws_mqtt_protocol_adapter_subscription_set_create_or_update_subscription(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, enum aws_protocol_adapter_subscription_status status) {
(void)subscription_set;
(void)topic_filter;
(void)status;
default:
break;
}

??;
// TODO
}


/******************************************************************************************************************/

struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt_client_connection *connection) {
(void)allocator;
(void)options;
(void)connection;

// TODO
return NULL;
}

Expand All @@ -163,16 +202,18 @@ struct aws_mqtt_protocol_adapter_5_impl {
struct aws_mqtt_protocol_adapter_subscription_set subscriptions;
};

static void s_aws_mqtt_protocol_adapter_5_release(void *impl) {
static void s_aws_mqtt_protocol_adapter_5_delete(void *impl) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;

aws_mqtt5_listener_release(adapter->listener);
}

/* Subscribe */

struct aws_mqtt_protocol_adapter_5_subscribe_data {
struct aws_allocator *allocator;

struct aws_byte_buf *topic_filter;
struct aws_byte_buf topic_filter;
struct aws_protocol_adapter_weak_ref *callback_ref;
};

Expand All @@ -188,6 +229,7 @@ static struct aws_mqtt_protocol_adapter_5_subscribe_data *aws_mqtt_protocol_adap

static void aws_mqtt_protocol_adapter_5_subscribe_data_delete(struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data) {
aws_weak_ref_release(subscribe_data->callback_ref);
aws_byte_buf_clean_up(&subscribe_data->topic_filter);

aws_mem_release(subscribe_data->allocator, subscribe_data);
}
Expand All @@ -202,7 +244,7 @@ static void s_protocol_adapter_5_subscribe_completion(const struct aws_mqtt5_pac
goto done;
}

bool success = error_code == AWS_ERROR_SUCCESS && suback != NULL && suback->reason_code_count == 1 && suback->reason_codes[0] <= AWS_MQTT5_SARC_GRANTED_QOS_1;
bool success = error_code == AWS_ERROR_SUCCESS && suback != NULL && suback->reason_code_count == 1 && suback->reason_codes[0] <= AWS_MQTT5_SARC_GRANTED_QOS_2;
s_aws_mqtt_protocol_adapter_subscription_set_on_subscribe_completion(&adapter->subscriptions, aws_byte_cursor_from_buf(&subscribe_data->topic_filter), success);

done:
Expand Down Expand Up @@ -244,6 +286,8 @@ int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adap
return AWS_OP_ERR;
}

/* Unsubscribe */

int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
(void)adapter;
Expand All @@ -252,6 +296,8 @@ int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_ad
return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
}

/* Publish */

int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapter_publish_options *options) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
(void)adapter;
Expand All @@ -274,16 +320,23 @@ static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_da

AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop));

aws_mqtt5_client_release(adapter->client);

aws_weak_ref_zero_reference(adapter->callback_ref);
aws_weak_ref_release(adapter->callback_ref);

aws_mqtt5_client_release(adapter->client);

aws_protocol_adapter_terminate_callback_fn *terminate_callback = adapter->config.terminate_callback;
void *terminate_user_data = adapter->config.user_data;

aws_mem_release(adapter->allocator, adapter);

if (terminate_callback) {
(*terminate_callback)(terminate_user_data);
}
}

static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable = {
.aws_mqtt_protocol_adapter_release_fn = s_aws_mqtt_protocol_adapter_5_release,
.aws_mqtt_protocol_adapter_delete_fn = s_aws_mqtt_protocol_adapter_5_delete,
.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_5_subscribe,
.aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_5_unsubscribe,
.aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_5_publish,
Expand Down Expand Up @@ -319,8 +372,8 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aw
return adapter;
}

void aws_mqtt_protocol_adapter_release(struct aws_mqtt_protocol_adapter *adapter) {
(*adapter->vtable->aws_mqtt_protocol_adapter_release_fn)(adapter->impl);
void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter) {
(*adapter->vtable->aws_mqtt_protocol_adapter_delete_fn)(adapter->impl);
}

int aws_mqtt_protocol_adapter_subscribe(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_subscribe_options *options) {
Expand Down

0 comments on commit 7a1db26

Please sign in to comment.