-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SQS Source: Add on_error Config, Multi-Region Support, and sqsMessageDelayTimer Metric for Auto-Scaling #5409
Conversation
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
…README Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Duration duration = Duration.between(Instant.ofEpochMilli(Long.parseLong(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP))), Instant.now()); | ||
sqsMessageDelayTimer.record(duration.isNegative() ? Duration.ZERO : duration); // Negative durations can occur if messages are processed immediately | ||
|
||
final int receiveCount = Integer.parseInt(message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this attribute doesn't exist for some reason, will this default to 0, or will Integer.parseInt(null) throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both of these system attributes will always exist since we make this receiveMessageRequest call:
ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.attributeNamesWithStrings("All")
.messageAttributeNames("All");
sqsMessageDelayTimer.record(duration.isNegative() ? Duration.ZERO : duration); // Negative durations can occur if messages are processed immediately | ||
|
||
final int receiveCount = Integer.parseInt(message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)); | ||
if (receiveCount < 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be receiveCount <= 1
, since the receive count will always be 1 when it's grabbed for the first time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, changing this right away
Duration duration = Duration.between( | ||
Instant.ofEpochMilli(Long.parseLong(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP))), | ||
Instant.now()); | ||
sqsMessageDelayTimer.record(duration.isNegative() ? Duration.ZERO : duration); // can sometimes record negative durations if message is processed immediately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add this to a unit test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added this
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
@Test | ||
void processSqsMessages_should_record_sqsMessageDelayTimer_when_approximateReceiveCount_less_than_or_equal_to_one() throws IOException { | ||
final long sentTimestampMillis = Instant.now().minusSeconds(5).toEpochMilli(); | ||
final Message message = Message.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: You could mock this message instead of building it.
@@ -0,0 +1,34 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use new format file header
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
} | ||
|
||
@JsonCreator | ||
static OnErrorOption fromOptionValue(final String option) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could possibly return a null value. Not sure if you need to consider that at the caller end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current implementation will not allow for values other than "RETAIN_MESSAGES" or "DELETE_MESSAGES". Values such as None won't work. Doing on_error: null
will treat as if that parameter is not overridden and use the default.
@@ -56,18 +55,17 @@ public class SqsWorkerCommon { | |||
private final Counter sqsVisibilityTimeoutChangedCount; | |||
private final Counter sqsVisibilityTimeoutChangeFailedCount; | |||
|
|||
public SqsWorkerCommon(final SqsClient sqsClient, | |||
final Backoff standardBackoff, | |||
public SqsWorkerCommon(final Backoff standardBackoff, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, standardBackoff
logic should be per sqsClient. You don't want to share one instance of the backoff across different sqs clients because it maintains state to track retry attempts and compute delays
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make more sense to have one Backoff instance on the queue level. This would mean that workers of the same queue will share Backoff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we shouldn't share Backoff object instance between different SQSClients because it maintains state to track retry attempts and compute delays
Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
Description
Added the following:
Introduced an on_error configuration parameter that lets users choose whether to delete or retain messages upon processing failure.
Enabled support for SQS queues across multiple regions within a single configuration by instantiating one client per region.
Added the sqsMessageDelayTimer metric to assist with auto-scaling in OSI.
modified README
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.