Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic argument to publish callback in request-response stream client #380

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bin/elastishadow/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,11 @@ static void s_stream_subscription_status_fn(
aws_error_debug_str(error_code));
}

static void s_stream_incoming_publish_fn(struct aws_byte_cursor payload, void *user_data) {
static void s_stream_incoming_publish_fn(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data) {
(void)topic;
struct aws_shadow_streaming_operation *operation = user_data;

struct aws_byte_cursor thing_cursor = aws_byte_cursor_from_buf(&operation->thing);
Expand Down
5 changes: 4 additions & 1 deletion include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ typedef void(aws_mqtt_streaming_operation_subscription_status_fn)(
/*
* Callback signature for when a publish arrives that matches a streaming operation's subscription
*/
typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data);
typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data);

/*
* Callback signature for when a streaming operation is fully destroyed and no more events will be emitted.
Expand Down
2 changes: 1 addition & 1 deletion source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ static void s_apply_publish_to_streaming_operation_list(
}

void *user_data = operation->storage.streaming_storage.options.user_data;
(*incoming_publish_callback)(publish_event->payload, user_data);
(*incoming_publish_callback)(publish_event->payload, publish_event->topic, user_data);

AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
Expand Down
178 changes: 133 additions & 45 deletions tests/request-response/request_response_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ static int s_rrc_verify_request_completion(
return AWS_OP_SUCCESS;
}

struct aws_rr_client_fixture_publish_message {
struct aws_byte_buf payload;
struct aws_byte_buf topic;
};

struct aws_rr_client_fixture_publish_message_view {
struct aws_byte_cursor payload;
struct aws_byte_cursor topic;
};

struct aws_rr_client_fixture_streaming_record {
struct aws_allocator *allocator;

Expand Down Expand Up @@ -221,7 +231,8 @@ struct aws_rr_client_fixture_streaming_record *s_aws_rr_client_fixture_streaming
aws_byte_buf_init_copy_from_cursor(&record->record_key, allocator, record_key);
record->record_key_cursor = aws_byte_cursor_from_buf(&record->record_key);

aws_array_list_init_dynamic(&record->publishes, allocator, 10, sizeof(struct aws_byte_buf));
aws_array_list_init_dynamic(
&record->publishes, allocator, 10, sizeof(struct aws_rr_client_fixture_publish_message));
aws_array_list_init_dynamic(
&record->subscription_events,
allocator,
Expand All @@ -236,10 +247,11 @@ void s_aws_rr_client_fixture_streaming_record_delete(struct aws_rr_client_fixtur

size_t publish_count = aws_array_list_length(&record->publishes);
for (size_t i = 0; i < publish_count; ++i) {
struct aws_byte_buf publish_payload;
aws_array_list_get_at(&record->publishes, &publish_payload, i);
struct aws_rr_client_fixture_publish_message publish_message;
aws_array_list_get_at(&record->publishes, &publish_message, i);

aws_byte_buf_clean_up(&publish_payload);
aws_byte_buf_clean_up(&publish_message.payload);
aws_byte_buf_clean_up(&publish_message.topic);
}

aws_array_list_clean_up(&record->publishes);
Expand Down Expand Up @@ -276,16 +288,18 @@ static void s_rrc_fixture_streaming_operation_subscription_status_callback(

static void s_rrc_fixture_streaming_operation_incoming_publish_callback(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data) {
struct aws_rr_client_fixture_streaming_record *record = user_data;
struct aws_rr_client_test_fixture *fixture = record->fixture;

aws_mutex_lock(&fixture->lock);

struct aws_byte_buf payload_buffer;
aws_byte_buf_init_copy_from_cursor(&payload_buffer, fixture->allocator, payload);
struct aws_rr_client_fixture_publish_message publish_message;
aws_byte_buf_init_copy_from_cursor(&publish_message.payload, fixture->allocator, payload);
aws_byte_buf_init_copy_from_cursor(&publish_message.topic, fixture->allocator, topic);

aws_array_list_push_back(&record->publishes, &payload_buffer);
aws_array_list_push_back(&record->publishes, &publish_message);

aws_mutex_unlock(&fixture->lock);
aws_condition_variable_notify_all(&fixture->signal);
Expand Down Expand Up @@ -379,7 +393,7 @@ static int s_rrc_verify_streaming_publishes(
struct aws_rr_client_test_fixture *fixture,
struct aws_byte_cursor key,
size_t expected_publish_count,
struct aws_byte_cursor *expected_publishes) {
struct aws_rr_client_fixture_publish_message_view *expected_publishes) {

aws_mutex_lock(&fixture->lock);

Expand All @@ -394,13 +408,21 @@ static int s_rrc_verify_streaming_publishes(
ASSERT_INT_EQUALS(expected_publish_count, actual_publish_count);

for (size_t i = 0; i < actual_publish_count; ++i) {
struct aws_byte_buf actual_payload;
aws_array_list_get_at(&record->publishes, &actual_payload, i);
struct aws_rr_client_fixture_publish_message actual_publish_message;
aws_array_list_get_at(&record->publishes, &actual_publish_message, i);

struct aws_byte_cursor *expected_payload = &expected_publishes[i];
struct aws_rr_client_fixture_publish_message_view *expected_payload = &expected_publishes[i];

ASSERT_BIN_ARRAYS_EQUALS(
expected_payload->ptr, expected_payload->len, actual_payload.buffer, actual_payload.len);
expected_payload->payload.ptr,
expected_payload->payload.len,
actual_publish_message.payload.buffer,
actual_publish_message.payload.len);
ASSERT_BIN_ARRAYS_EQUALS(
expected_payload->topic.ptr,
expected_payload->topic.len,
actual_publish_message.topic.buffer,
actual_publish_message.topic.len);
}

aws_mutex_unlock(&fixture->lock);
Expand Down Expand Up @@ -1249,9 +1271,15 @@ static int s_rrc_streaming_operation_success_single_fn(struct aws_allocator *all

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1314,10 +1342,19 @@ static int s_rrc_streaming_operation_success_overlapping_fn(struct aws_allocator
s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);
s_rrc_wait_for_n_streaming_publishes(&fixture, record_key2, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
payload3,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
{
payload3,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes));
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key2, 2, expected_publishes));
Expand Down Expand Up @@ -1397,9 +1434,15 @@ static int s_rrc_streaming_operation_success_starting_offline_fn(struct aws_allo

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1468,9 +1511,15 @@ static int s_rrc_streaming_operation_clean_session_reestablish_subscription_fn(

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes));

Expand Down Expand Up @@ -1546,9 +1595,15 @@ static int s_rrc_streaming_operation_resume_session_fn(struct aws_allocator *all

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes));

Expand Down Expand Up @@ -1675,9 +1730,15 @@ static int s_rrc_streaming_operation_first_subscribe_times_out_resub_succeeds_fn

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1791,9 +1852,15 @@ static int s_rrc_streaming_operation_first_subscribe_retryable_failure_resub_suc

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1965,9 +2032,15 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn(

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes));

Expand Down Expand Up @@ -1998,8 +2071,11 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn(
// verify third operation got the new publish
s_rrc_wait_for_n_streaming_publishes(&fixture, record_key3, 1);

struct aws_byte_cursor third_expected_publishes[] = {
payload3,
struct aws_rr_client_fixture_publish_message_view third_expected_publishes[] = {
{
payload3,
topic_filter2,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key3, AWS_ARRAY_SIZE(third_expected_publishes), third_expected_publishes));
Expand Down Expand Up @@ -2127,9 +2203,15 @@ static int s_rrc_streaming_operation_success_delayed_by_request_operations_fn(

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -2204,9 +2286,15 @@ static int s_rrc_streaming_operation_success_sandwiched_by_request_operations_fn

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down