Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS Source: Add on_error Config, Multi-Region Support, and sqsMessageDelayTimer Metric for Auto-Scaling #5409

Merged
merged 10 commits into from
Feb 6, 2025

Conversation

jmsusanto
Copy link
Contributor

Description

Added the following:

  • Introduced an on_error configuration parameter that lets users choose whether to delete or retain messages upon processing failure.

  • Enabled support for SQS queues across multiple regions within a single configuration by instantiating one client per region.

  • Added the sqsMessageDelayTimer metric to assist with auto-scaling in OSI.

  • modified README

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Jeremy Michael added 8 commits February 3, 2025 12:50
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
…README

Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Duration duration = Duration.between(Instant.ofEpochMilli(Long.parseLong(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP))), Instant.now());
sqsMessageDelayTimer.record(duration.isNegative() ? Duration.ZERO : duration); // Negative durations can occur if messages are processed immediately

final int receiveCount = Integer.parseInt(message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this attribute doesn't exist for some reason, will this default to 0, or will Integer.parseInt(null) throw an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both of these system attributes will always exist since we make this receiveMessageRequest call:

   ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder()
            .queueUrl(queueUrl)
            .attributeNamesWithStrings("All")
            .messageAttributeNames("All");

sqsMessageDelayTimer.record(duration.isNegative() ? Duration.ZERO : duration); // Negative durations can occur if messages are processed immediately

final int receiveCount = Integer.parseInt(message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT));
if (receiveCount < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be receiveCount <= 1, since the receive count will always be 1 when it's grabbed for the first time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, changing this right away

Comment on lines +177 to +180
Duration duration = Duration.between(
Instant.ofEpochMilli(Long.parseLong(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP))),
Instant.now());
sqsMessageDelayTimer.record(duration.isNegative() ? Duration.ZERO : duration); // can sometimes record negative durations if message is processed immediately
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this to a unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this

Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
graytaylor0
graytaylor0 previously approved these changes Feb 3, 2025
@Test
void processSqsMessages_should_record_sqsMessageDelayTimer_when_approximateReceiveCount_less_than_or_equal_to_one() throws IOException {
final long sentTimestampMillis = Instant.now().minusSeconds(5).toEpochMilli();
final Message message = Message.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You could mock this message instead of building it.

@@ -0,0 +1,34 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use new format file header

/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 *
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}

@JsonCreator
static OnErrorOption fromOptionValue(final String option) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could possibly return a null value. Not sure if you need to consider that at the caller end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation will not allow for values other than "RETAIN_MESSAGES" or "DELETE_MESSAGES". Values such as None won't work. Doing on_error: null will treat as if that parameter is not overridden and use the default.

@@ -56,18 +55,17 @@ public class SqsWorkerCommon {
private final Counter sqsVisibilityTimeoutChangedCount;
private final Counter sqsVisibilityTimeoutChangeFailedCount;

public SqsWorkerCommon(final SqsClient sqsClient,
final Backoff standardBackoff,
public SqsWorkerCommon(final Backoff standardBackoff,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, standardBackoff logic should be per sqsClient. You don't want to share one instance of the backoff across different sqs clients because it maintains state to track retry attempts and compute delays

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to have one Backoff instance on the queue level. This would mean that workers of the same queue will share Backoff.

Copy link
Collaborator

@san81 san81 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we shouldn't share Backoff object instance between different SQSClients because it maintains state to track retry attempts and compute delays

Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
@graytaylor0 graytaylor0 merged commit 137e1e7 into opensearch-project:main Feb 6, 2025
45 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants