Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request response operations #356

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS,
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,
AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand Down
32 changes: 32 additions & 0 deletions include/aws/mqtt/private/request-response/request_response.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H
#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/mqtt.h>

/*
* Describes a change to the state of a request-response client subscription
*/
enum aws_rr_subscription_event_type {

/*
* A subscribe succeeded
*/
ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS,

/*
* A subscribe failed
*/
ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE,

/*
* A previously successful subscription has ended (generally due to a failure to resume a session)
*/
ARRSET_SUBSCRIPTION_ENDED
};

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H */
22 changes: 1 addition & 21 deletions include/aws/mqtt/private/request-response/subscription_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,12 @@
#include <aws/mqtt/mqtt.h>

#include <aws/common/hash_table.h>
#include <aws/mqtt/private/request-response/request_response.h>

struct aws_mqtt_protocol_adapter;
struct aws_protocol_adapter_connection_event;
struct aws_protocol_adapter_subscription_event;

/*
* The kind of subscription event being emitted.
*/
enum aws_rr_subscription_event_type {

/*
* A subscribe succeeded
*/
ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS,

/*
* A subscribe failed
*/
ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE,

/*
* A previously successful subscription has ended (generally due to a failure to resume a session)
*/
ARRSET_SUBSCRIPTION_ENDED
};

struct aws_rr_subscription_status_event {
enum aws_rr_subscription_event_type type;
struct aws_byte_cursor topic_filter;
Expand Down
76 changes: 74 additions & 2 deletions include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,69 @@
* SPDX-License-Identifier: Apache-2.0.
*/

#include "aws/mqtt/mqtt.h"
#include <aws/mqtt/mqtt.h>

#include <aws/mqtt/private/request-response/request_response.h>

struct aws_mqtt_request_response_client;
struct aws_mqtt_client_connection;
struct aws_mqtt5_client;
struct aws_mqtt_streaming_operation;

struct aws_mqtt_request_operation_response_path {
struct aws_byte_cursor topic;

/* potential point of expansion into an abstract "extractor" if we ever need to support non-JSON payloads */
bretambrose marked this conversation as resolved.
Show resolved Hide resolved
struct aws_byte_cursor correlation_token_json_path;
};

typedef void(
aws_mqtt_request_operation_completion_fn)(struct aws_byte_cursor *payload, int error_code, void *user_data);

struct aws_mqtt_request_operation_options {
struct aws_byte_cursor subscription_topic_filter;

struct aws_mqtt_request_operation_response_path *response_paths;
size_t response_path_count;

struct aws_byte_cursor publish_topic;
struct aws_byte_cursor serialized_request;
struct aws_byte_cursor correlation_token;

aws_mqtt_request_operation_completion_fn *completion_callback;
void *user_data;
};

struct aws_mqtt_request_operation_storage {
struct aws_mqtt_request_operation_options options;

struct aws_array_list operation_response_paths;

struct aws_byte_buf operation_data;
};

typedef void(aws_mqtt_streaming_operation_subscription_status_fn)(
enum aws_rr_subscription_event_type status,
int error_code,
void *user_data);
typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data);
typedef void(aws_mqtt_streaming_operation_terminated_fn)(void *user_data);

struct aws_mqtt_streaming_operation_options {
struct aws_byte_cursor topic_filter;

aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback;
aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback;
aws_mqtt_streaming_operation_terminated_fn *terminated_callback;

void *user_data;
};

struct aws_mqtt_streaming_operation_storage {
struct aws_mqtt_streaming_operation_options options;

struct aws_byte_buf operation_data;
};

typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data);
typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data);
Expand All @@ -21,8 +79,8 @@ struct aws_mqtt_request_response_client_options {

/* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */
aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback;

aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback;

void *user_data;
};

Expand Down Expand Up @@ -56,6 +114,20 @@ AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_
AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(
struct aws_mqtt_request_response_client *client);

AWS_MQTT_API int aws_mqtt_request_response_client_submit_request(
struct aws_mqtt_request_response_client *client,
const struct aws_mqtt_request_operation_options *request_options);

AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_streaming_operation(
struct aws_mqtt_request_response_client *client,
const struct aws_mqtt_streaming_operation_options *streaming_options);

AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(
struct aws_mqtt_rr_client_operation *operation);

AWS_MQTT_API struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release(
struct aws_mqtt_rr_client_operation *operation);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */
3 changes: 3 additions & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,
"MQTT operation returned a failing reason code"),
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,
"Request operation failed due to client shut down"),
};
/* clang-format on */
#undef AWS_DEFINE_ERROR_INFO_MQTT
Expand Down
Loading
Loading