Skip to content

Commit

Permalink
Tests complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 19, 2024
1 parent 384a380 commit 86de219
Show file tree
Hide file tree
Showing 8 changed files with 734 additions and 105 deletions.
34 changes: 25 additions & 9 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/mqtt.h>
#include <aws/mqtt/exports.h>
#include <aws/mqtt/mqtt.h>

#include <aws/common/byte_buf.h>

Expand Down Expand Up @@ -61,10 +61,14 @@ struct aws_protocol_adapter_connection_event {
bool rejoined_session;
};

typedef void(aws_protocol_adapter_subscription_event_fn)(struct aws_protocol_adapter_subscription_event *event, void *user_data);
typedef void(aws_protocol_adapter_incoming_publish_fn)(struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data);
typedef void(
aws_protocol_adapter_subscription_event_fn)(struct aws_protocol_adapter_subscription_event *event, void *user_data);
typedef void(aws_protocol_adapter_incoming_publish_fn)(
struct aws_protocol_adapter_incoming_publish_event *publish,
void *user_data);
typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);
typedef void(aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data);
typedef void(
aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data);

struct aws_mqtt_protocol_adapter_options {
aws_protocol_adapter_subscription_event_fn *subscription_event_callback;
Expand Down Expand Up @@ -93,17 +97,29 @@ struct aws_mqtt_protocol_adapter {

AWS_EXTERN_C_BEGIN

AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt_client_connection *connection);
AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt_client_connection *connection);

AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt5_client *client);
AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt5_client *client);

AWS_MQTT_API void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter);

AWS_MQTT_API int aws_mqtt_protocol_adapter_subscribe(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_subscribe_options *options);
AWS_MQTT_API int aws_mqtt_protocol_adapter_subscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_subscribe_options *options);

AWS_MQTT_API int aws_mqtt_protocol_adapter_unsubscribe(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_unsubscribe_options *options);
AWS_MQTT_API int aws_mqtt_protocol_adapter_unsubscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_unsubscribe_options *options);

AWS_MQTT_API int aws_mqtt_protocol_adapter_publish(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_publish_options *options);
AWS_MQTT_API int aws_mqtt_protocol_adapter_publish(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_publish_options *options);

AWS_EXTERN_C_END

Expand Down
1 change: 0 additions & 1 deletion include/aws/mqtt/private/request-response/weak_ref.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ AWS_MQTT_API void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref);

AWS_EXTERN_C_END


#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H */
109 changes: 68 additions & 41 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
*
* Entries are not tracked with the exception of eventstream impl which needs the stream handles to close.
* A subscribe failure should not trigger an unsubscribe, only notify the status callback.
* Subscription event callback should be {subscribe_success, subscribe_failure, unsubscribe_success, unsubscribe_failure}.
* The sub manager is responsible for calling Unsubscribe on all its entries when shutting down (before releasing
* hold of the adapter).
* Subscription event callback should be {subscribe_success, subscribe_failure, unsubscribe_success,
* unsubscribe_failure}. The sub manager is responsible for calling Unsubscribe on all its entries when shutting down
* (before releasing hold of the adapter).
*
* How do we know not to retry unsubscribe failures because a subscribe came in? Well, we don't retry failures; let
* the manager make that decision. No retry when the weak ref is zeroed either. The potential for things to go wrong
Expand All @@ -33,8 +33,10 @@
* decide what to do.
*/


struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt_client_connection *connection) {
struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt_client_connection *connection) {
(void)allocator;
(void)options;
(void)connection;
Expand Down Expand Up @@ -71,8 +73,12 @@ struct aws_mqtt_protocol_adapter_5_subscription_op_data {
struct aws_weak_ref *callback_ref;
};

static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, struct aws_weak_ref *callback_ref) {
struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_subscription_op_data));
static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(
struct aws_allocator *allocator,
struct aws_byte_cursor topic_filter,
struct aws_weak_ref *callback_ref) {
struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_subscription_op_data));

subscribe_data->allocator = allocator;
subscribe_data->callback_ref = aws_weak_ref_acquire(callback_ref);
Expand All @@ -81,7 +87,8 @@ static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_proto
return subscribe_data;
}

static void s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data) {
static void s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(
struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data) {
aws_weak_ref_release(subscribe_data->callback_ref);
aws_byte_buf_clean_up(&subscribe_data->topic_filter);

Expand All @@ -90,17 +97,19 @@ static void s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(struct aws

/* Subscribe */

static void s_protocol_adapter_5_subscribe_completion(const struct aws_mqtt5_packet_suback_view *suback,
int error_code,
void *complete_ctx) {
static void s_protocol_adapter_5_subscribe_completion(
const struct aws_mqtt5_packet_suback_view *suback,
int error_code,
void *complete_ctx) {
struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = complete_ctx;
struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(subscribe_data->callback_ref);

if (adapter == NULL) {
goto done;
}

bool success = error_code == AWS_ERROR_SUCCESS && suback != NULL && suback->reason_code_count == 1 && suback->reason_codes[0] <= AWS_MQTT5_SARC_GRANTED_QOS_2;
bool success = error_code == AWS_ERROR_SUCCESS && suback != NULL && suback->reason_code_count == 1 &&
suback->reason_codes[0] <= AWS_MQTT5_SARC_GRANTED_QOS_2;

struct aws_protocol_adapter_subscription_event subscribe_event = {
.topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter),
Expand All @@ -117,7 +126,9 @@ static void s_protocol_adapter_5_subscribe_completion(const struct aws_mqtt5_pac
int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;

struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(adapter->allocator, options->topic_filter, adapter->callback_ref);
struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data =
s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(
adapter->allocator, options->topic_filter, adapter->callback_ref);

struct aws_mqtt5_subscription_view subscription_view = {
.qos = AWS_MQTT5_QOS_AT_LEAST_ONCE,
Expand Down Expand Up @@ -150,17 +161,19 @@ int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adap

/* Unsubscribe */

static void s_protocol_adapter_5_unsubscribe_completion(const struct aws_mqtt5_packet_unsuback_view *unsuback,
int error_code,
void *complete_ctx) {
static void s_protocol_adapter_5_unsubscribe_completion(
const struct aws_mqtt5_packet_unsuback_view *unsuback,
int error_code,
void *complete_ctx) {
struct aws_mqtt_protocol_adapter_5_subscription_op_data *unsubscribe_data = complete_ctx;
struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(unsubscribe_data->callback_ref);

if (adapter == NULL) {
goto done;
}

bool success = error_code == AWS_ERROR_SUCCESS && unsuback != NULL && unsuback->reason_code_count == 1 && unsuback->reason_codes[0] < 128;
bool success = error_code == AWS_ERROR_SUCCESS && unsuback != NULL && unsuback->reason_code_count == 1 &&
unsuback->reason_codes[0] < 128;

struct aws_protocol_adapter_subscription_event unsubscribe_event = {
.topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->topic_filter),
Expand All @@ -177,7 +190,9 @@ static void s_protocol_adapter_5_unsubscribe_completion(const struct aws_mqtt5_p
int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;

struct aws_mqtt_protocol_adapter_5_subscription_op_data *unsubscribe_data = s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(adapter->allocator, options->topic_filter, adapter->callback_ref);
struct aws_mqtt_protocol_adapter_5_subscription_op_data *unsubscribe_data =
s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(
adapter->allocator, options->topic_filter, adapter->callback_ref);

struct aws_mqtt5_packet_unsubscribe_view unsubscribe_view = {
.topic_filters = &options->topic_filter,
Expand Down Expand Up @@ -213,8 +228,12 @@ struct aws_mqtt_protocol_adapter_5_publish_op_data {
void *user_data;
};

static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_adapter_5_publish_op_data_new(struct aws_allocator *allocator, const struct aws_protocol_adapter_publish_options *publish_options, struct aws_weak_ref *callback_ref) {
struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_publish_op_data));
static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_adapter_5_publish_op_data_new(
struct aws_allocator *allocator,
const struct aws_protocol_adapter_publish_options *publish_options,
struct aws_weak_ref *callback_ref) {
struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_publish_op_data));

publish_data->allocator = allocator;
publish_data->callback_ref = aws_weak_ref_acquire(callback_ref);
Expand All @@ -224,7 +243,8 @@ static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_a
return publish_data;
}

static void s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data) {
static void s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(
struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data) {
aws_weak_ref_release(publish_data->callback_ref);

aws_mem_release(publish_data->allocator, publish_data);
Expand Down Expand Up @@ -259,13 +279,11 @@ static void s_protocol_adapter_5_publish_completion(

int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapter_publish_options *options) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;
struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data = s_aws_mqtt_protocol_adapter_5_publish_op_data_new(adapter->allocator, options, adapter->callback_ref);
struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data =
s_aws_mqtt_protocol_adapter_5_publish_op_data_new(adapter->allocator, options, adapter->callback_ref);

struct aws_mqtt5_packet_publish_view publish_view = {
.topic = options->topic,
.qos = AWS_MQTT5_QOS_AT_LEAST_ONCE,
.payload = options->payload
};
.topic = options->topic, .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, .payload = options->payload};

struct aws_mqtt5_publish_completion_options completion_options = {
.ack_timeout_seconds_override = options->ack_timeout_seconds,
Expand All @@ -286,13 +304,13 @@ int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapte
return AWS_OP_ERR;
}

static bool s_protocol_adapter_mqtt5_listener_publish_received(const struct aws_mqtt5_packet_publish_view *publish, void *user_data) {
static bool s_protocol_adapter_mqtt5_listener_publish_received(
const struct aws_mqtt5_packet_publish_view *publish,
void *user_data) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = user_data;

struct aws_protocol_adapter_incoming_publish_event publish_event = {
.topic = publish->topic,
.payload = publish->payload
};
.topic = publish->topic, .payload = publish->payload};

(*adapter->config.incoming_publish_callback)(&publish_event, adapter->config.user_data);

Expand Down Expand Up @@ -346,8 +364,12 @@ static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable =
.aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_5_publish,
};

struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt5_client *client) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_impl));
struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt5_client *client) {
struct aws_mqtt_protocol_adapter_5_impl *adapter =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_impl));

adapter->allocator = allocator;
adapter->base.impl = adapter;
Expand All @@ -359,12 +381,11 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aw

struct aws_mqtt5_listener_config listener_options = {
.client = client,
.listener_callbacks = {
.listener_publish_received_handler = s_protocol_adapter_mqtt5_listener_publish_received,
.listener_publish_received_handler_user_data = adapter,
.lifecycle_event_handler = s_protocol_adapter_mqtt5_lifecycle_event_callback,
.lifecycle_event_handler_user_data = adapter
},
.listener_callbacks =
{.listener_publish_received_handler = s_protocol_adapter_mqtt5_listener_publish_received,
.listener_publish_received_handler_user_data = adapter,
.lifecycle_event_handler = s_protocol_adapter_mqtt5_lifecycle_event_callback,
.lifecycle_event_handler_user_data = adapter},
.termination_callback = s_protocol_adapter_mqtt5_listener_termination_callback,
.termination_callback_user_data = adapter,
};
Expand All @@ -378,14 +399,20 @@ void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter)
(*adapter->vtable->aws_mqtt_protocol_adapter_delete_fn)(adapter->impl);
}

int aws_mqtt_protocol_adapter_subscribe(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_subscribe_options *options) {
int aws_mqtt_protocol_adapter_subscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_subscribe_options *options) {
return (*adapter->vtable->aws_mqtt_protocol_adapter_subscribe_fn)(adapter->impl, options);
}

int aws_mqtt_protocol_adapter_unsubscribe(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_unsubscribe_options *options) {
int aws_mqtt_protocol_adapter_unsubscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_unsubscribe_options *options) {
return (*adapter->vtable->aws_mqtt_protocol_adapter_unsubscribe_fn)(adapter->impl, options);
}

int aws_mqtt_protocol_adapter_publish(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_publish_options *options) {
int aws_mqtt_protocol_adapter_publish(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_publish_options *options) {
return (*adapter->vtable->aws_mqtt_protocol_adapter_publish_fn)(adapter->impl, options);
}
6 changes: 3 additions & 3 deletions source/request-response/weak_ref.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/private/request-response/weak_ref.h>

Expand Down
11 changes: 11 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,17 @@ add_test_case(request_response_mqtt5_protocol_adapter_subscribe_success)
add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_error_code)
add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_reason_code)
add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_timeout)
add_test_case(request_response_mqtt5_protocol_adapter_unsubscribe_success)
add_test_case(request_response_mqtt5_protocol_adapter_unsubscribe_failure_error_code)
add_test_case(request_response_mqtt5_protocol_adapter_unsubscribe_failure_reason_code)
add_test_case(request_response_mqtt5_protocol_adapter_unsubscribe_failure_timeout)
add_test_case(request_response_mqtt5_protocol_adapter_publish_success)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_error_code)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_reason_code)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_timeout)
add_test_case(request_response_mqtt5_protocol_adapter_connection_event_sequence)
add_test_case(request_response_mqtt5_protocol_adapter_incoming_publish)
add_test_case(request_response_mqtt5_protocol_adapter_shutdown_while_pending)

generate_test_driver(${PROJECT_NAME}-tests)

Expand Down
Loading

0 comments on commit 86de219

Please sign in to comment.