Skip to content

Commit

Permalink
Checkpoint pre-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Mar 30, 2024
1 parent 8fa8c20 commit b858a25
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 11 deletions.
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY,
AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR,
AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand Down
3 changes: 3 additions & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR,
"Request operation failed due to a non-specific internal error within the client."),
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE,
"Request-response operation failed because the associated publish failed synchronously."),

};
/* clang-format on */
Expand Down
66 changes: 55 additions & 11 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,36 @@ static void s_mqtt_request_response_client_wake_service(struct aws_mqtt_request_
}
}

struct aws_rrc_incomplete_publish {
struct aws_allocator *allocator;

struct aws_mqtt_request_response_client *rr_client;

uint64_t operation_id;
};

static void s_make_mqtt_request(
struct aws_mqtt_request_response_client *client,
struct aws_mqtt_rr_client_operation *operation) {
(void)client;

AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST);

struct aws_mqtt_request_operation_options *request_options = &operation->storage.request_storage.options;

struct aws_protocol_adapter_publish_options publish_options = {
.topic = request_options->publish_topic,
.payload = request_options->serialized_request,
.ack_timeout_seconds = client->config.operation_timeout_seconds,
.completion_callback_fn = s_??,
.user_data = ??,
};

if (aws_mqtt_protocol_adapter_publish(client->client_adapter, &publish_options)) {
s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_PUBLISH_FAILURE);
}
}

struct aws_rr_subscription_status_event_task {
struct aws_allocator *allocator;

Expand All @@ -743,6 +773,30 @@ static void s_aws_rr_subscription_status_event_task_delete(struct aws_rr_subscri
aws_mem_release(task->allocator, task);
}

static void s_on_request_operation_subscription_status_event(
struct aws_mqtt_rr_client_operation *operation,
struct aws_byte_cursor topic_filter,
enum aws_rr_subscription_event_type event_type) {
(void)topic_filter;

switch (event_type) {
case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE);
break;

case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
if (operation->state == AWS_MRROS_PENDING_SUBSCRIPTION) {
s_change_operation_state(operation, AWS_MRROS_PENDING_RESPONSE);
s_make_mqtt_request(operation->client_internal_ref, operation);
}
break;

default:
AWS_FATAL_ASSERT(false);
}
}

static void s_on_streaming_operation_subscription_status_event(
struct aws_mqtt_rr_client_operation *operation,
struct aws_byte_cursor topic_filter,
Expand Down Expand Up @@ -799,7 +853,7 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void
case ARRSET_REQUEST_SUBSCRIBE_SUCCESS:
case ARRSET_REQUEST_SUBSCRIBE_FAILURE:
case ARRSET_REQUEST_SUBSCRIPTION_ENDED:
/* NYI */
s_on_request_operation_subscription_status_event(operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type);
break;

case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
Expand Down Expand Up @@ -1187,16 +1241,6 @@ static int s_add_operation_to_subscription_topic_filter_table(
return AWS_OP_SUCCESS;
}

static void s_make_mqtt_request(
struct aws_mqtt_request_response_client *client,
struct aws_mqtt_rr_client_operation *operation) {
(void)client;

AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST);

// TODO: NYI
}

static void s_handle_operation_subscribe_result(
struct aws_mqtt_request_response_client *client,
struct aws_mqtt_rr_client_operation *operation,
Expand Down

0 comments on commit b858a25

Please sign in to comment.