Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

311 listener2 #345

Closed
wants to merge 20 commits into from
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ file(GLOB AWS_MQTT_PRIV_EXPOSED_HEADERS
file(GLOB AWS_MQTT_SRC
"source/*.c"
"source/v5/*.c"
"source/request-response/*.c"
)

file(GLOB MQTT_HEADERS
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/mqtt.h>
#include <aws/mqtt/exports.h>

#include <aws/common/byte_buf.h>
16 changes: 7 additions & 9 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(struct
struct aws_mqtt_protocol_adapter_5_impl {
struct aws_allocator *allocator;
struct aws_mqtt_protocol_adapter base;
struct aws_protocol_adapter_weak_ref *callback_ref;
struct aws_weak_ref *callback_ref;
struct aws_mqtt_protocol_adapter_options config;

struct aws_event_loop *loop;
@@ -68,10 +68,10 @@ struct aws_mqtt_protocol_adapter_5_subscription_op_data {
struct aws_allocator *allocator;

struct aws_byte_buf topic_filter;
struct aws_protocol_adapter_weak_ref *callback_ref;
struct aws_weak_ref *callback_ref;
};

static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, struct aws_protocol_adapter_weak_ref *callback_ref) {
static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, struct aws_weak_ref *callback_ref) {
struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_subscription_op_data));

subscribe_data->allocator = allocator;
@@ -207,13 +207,13 @@ int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_ad

struct aws_mqtt_protocol_adapter_5_publish_op_data {
struct aws_allocator *allocator;
struct aws_protocol_adapter_weak_ref *callback_ref;
struct aws_weak_ref *callback_ref;

void (*completion_callback_fn)(bool, void *);
void *user_data;
};

static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_adapter_5_publish_op_data_new(struct aws_allocator *allocator, const struct aws_protocol_adapter_publish_options *publish_options, struct aws_protocol_adapter_weak_ref *callback_ref) {
static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_adapter_5_publish_op_data_new(struct aws_allocator *allocator, const struct aws_protocol_adapter_publish_options *publish_options, struct aws_weak_ref *callback_ref) {
struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_publish_op_data));

publish_data->allocator = allocator;
@@ -347,8 +347,6 @@ static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable =
};

struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt5_client *client) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop));

struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_impl));

adapter->allocator = allocator;
@@ -373,7 +371,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aw

adapter->listener = aws_mqtt5_listener_new(allocator, &listener_options);

return adapter;
return &adapter->base;
}

void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter) {
@@ -390,4 +388,4 @@ int aws_mqtt_protocol_adapter_unsubscribe(struct aws_mqtt_protocol_adapter *adap

int aws_mqtt_protocol_adapter_publish(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_publish_options *options) {
return (*adapter->vtable->aws_mqtt_protocol_adapter_publish_fn)(adapter->impl, options);
}
}
2 changes: 1 addition & 1 deletion source/request-response/weak_ref.c
Original file line number Diff line number Diff line change
@@ -51,4 +51,4 @@ void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref) {

void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref) {
weak_ref->referenced = NULL;
}
}
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -442,7 +442,7 @@ add_test_case(mqtt_subscription_set_publish_single_level_wildcards)
add_test_case(mqtt_subscription_set_publish_multi_level_wildcards)
add_test_case(mqtt_subscription_set_get_subscriptions)

#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_success)
add_test_case(request_response_mqtt5_protocol_adapter_subscribe_success)
#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_error_code)
#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_reason_code)
#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_timeout)
Loading