Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Mar 28, 2024
1 parent aabe34d commit 93283ab
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 3 deletions.
26 changes: 23 additions & 3 deletions include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

#include <aws/mqtt/mqtt.h>

#include <aws/mqtt/private/request-response/request_response.h>

struct aws_mqtt_request_response_client;
struct aws_mqtt_client_connection;
struct aws_mqtt5_client;
Expand Down Expand Up @@ -46,8 +44,30 @@ struct aws_mqtt_request_operation_storage {
struct aws_byte_buf operation_data;
};

/*
* Describes a change to the state of a request operation subscription
*/
enum aws_rr_streaming_subscription_event_type {

/*
* The streaming operation is successfully subscribed to its topic (filter)
*/
ARRSSET_SUBSCRIPTION_ESTABLISHED,

/*
* The streaming operation has temporarily lost its subscription to its topic (filter)
*/
ARRSSET_SUBSCRIPTION_LOST,

/*
* The streaming operation has entered a terminal state where it has given up trying to subscribe
* to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy).
*/
ARRSSET_SUBSCRIPTION_HALTED,
};

typedef void(aws_mqtt_streaming_operation_subscription_status_fn)(
enum aws_rr_subscription_event_type status,
enum aws_rr_streaming_subscription_event_type status,
int error_code,
void *user_data);
typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data);
Expand Down
98 changes: 98 additions & 0 deletions tests/request-response/request_response_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,104 @@ static void s_rrc_wait_on_streaming_termination(
aws_mutex_unlock(&fixture->lock);
}

struct rrc_streaming_event_wait_context {
struct aws_byte_cursor operation_key;
struct aws_rr_client_test_fixture *fixture;
size_t event_count;
};

static bool s_streaming_operation_has_n_publishes(void *context) {
struct rrc_streaming_event_wait_context *streaming_publish_context = context;

struct aws_hash_element *element = NULL;
aws_hash_table_find(
&streaming_publish_context->fixture->streaming_records, &streaming_publish_context->operation_key, &element);

AWS_FATAL_ASSERT(element != NULL && element->value != NULL);

struct aws_rr_client_fixture_streaming_record *record = element->value;

return aws_array_list_length(&record->publishes) >= streaming_publish_context->event_count;
}

static void s_rrc_wait_for_n_streaming_publishes(
struct aws_rr_client_test_fixture *fixture,
struct aws_byte_cursor key,
size_t count) {
struct rrc_streaming_event_wait_context context = {
.operation_key = key,
.fixture = fixture,
.event_count = count,
};

aws_mutex_lock(&fixture->lock);
aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_streaming_operation_has_n_publishes, &context);
aws_mutex_unlock(&fixture->lock);
}

static int s_rrc_verify_streaming_publishes(
struct aws_rr_client_test_fixture *fixture,
struct aws_byte_cursor key,
size_t expected_publish_count,
struct aws_byte_cursor *expected_publishes) {

aws_mutex_lock(&fixture->lock);

struct aws_hash_element *element = NULL;
aws_hash_table_find(&fixture->streaming_records, &key, &element);

AWS_FATAL_ASSERT(element != NULL && element->value != NULL);

struct aws_rr_client_fixture_streaming_record *record = element->value;

size_t actual_publish_count = aws_array_list_length(&record->publishes);
ASSERT_INT_EQUALS(expected_publish_count, actual_publish_count);

for (size_t i = 0; i < actual_publish_count; ++i) {
struct aws_byte_buf actual_payload;
aws_array_list_get_at(&record->publishes, &actual_payload, i);

struct aws_byte_cursor *expected_payload = &expected_publishes[i];

ASSERT_BIN_ARRAYS_EQUALS(
expected_payload->ptr, expected_payload->len, actual_payload.buffer, actual_payload.len);
}

aws_mutex_unlock(&fixture->lock);

return AWS_OP_SUCCESS;
}

static bool s_streaming_operation_has_n_subscription_events(void *context) {
struct rrc_streaming_event_wait_context *streaming_publish_context = context;

struct aws_hash_element *element = NULL;
aws_hash_table_find(
&streaming_publish_context->fixture->streaming_records, &streaming_publish_context->operation_key, &element);

AWS_FATAL_ASSERT(element != NULL && element->value != NULL);

struct aws_rr_client_fixture_streaming_record *record = element->value;

return aws_array_list_length(&record->subscription_events) >= streaming_publish_context->event_count;
}

static void s_rrc_wait_for_n_streaming_subscription_events(
struct aws_rr_client_test_fixture *fixture,
struct aws_byte_cursor key,
size_t count) {
struct rrc_streaming_event_wait_context context = {
.operation_key = key,
.fixture = fixture,
.event_count = count,
};

aws_mutex_lock(&fixture->lock);
aws_condition_variable_wait_pred(
&fixture->signal, &fixture->lock, s_streaming_operation_has_n_subscription_events, &context);
aws_mutex_unlock(&fixture->lock);
}

static int s_rrc_verify_streaming_record_subscription_events(
struct aws_rr_client_test_fixture *fixture,
struct aws_byte_cursor key,
Expand Down

0 comments on commit 93283ab

Please sign in to comment.