Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
feat!: Upgrade to spring-boot 3, spring-cloud-aws 3 and AWS SDK 2
Browse files Browse the repository at this point in the history
BREAKING CHANGE: no longer compatible to spring-cloud-aws 2 and the AWS SDK 1.

Some consumer config options were renamed to align with the spring-cloud-aws-sqs conventions:
- maxNumberOfMessages renamed to maxMessagesPerPoll
- waitTimeout renamed to pollTimeout
- queueStopTimeout renamed to listenerShutdownTimeout
- removed messageDeletionPolicy config option
  • Loading branch information
joerglaumann authored Jun 21, 2023
1 parent 01bb667 commit 89cd2b4
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 427 deletions.
73 changes: 52 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,59 @@
# Spring Cloud Stream Binder for AWS SQS

spring-cloud-stream-binder-sqs lets you use [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) with the AWS Simple Queue Service (SQS).
spring-cloud-stream-binder-sqs lets you use [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) with
the AWS Simple Queue Service (SQS).

## Installation

```xml

<dependencies>
<dependency>
<groupId>de.idealo.spring</groupId>
<artifactId>spring-cloud-stream-binder-sqs</artifactId>
<version>1.9.0</version>
<version>3.0.0</version>
</dependency>
</dependencies>
```

## Compatabilty

| spring-cloud-stream-binder-sqs | spring-boot | spring-cloud-aws | spring-cloud | aws sdk | java compiler/runtime |
|--------------------------------|-------------|------------------|--------------|---------|-----------------------|
| 1.9.0 | 2.7.x | 2.4.x | 2021.0.5 | 1.x | 8 |
| 3.0.0 | 3.1.x | 3.0.x | 2022.0.3 | 2.x | 17 |

Changes in 3.0.0:

* removed consumer configuration for **messageDeletionPolicy**: the default behaviour is now that Messages will be
acknowledged when message processing is successful.
* renamed consumer configuration for **maxNumberOfMessages** to **maxMessagesPerPoll** to align with the naming in
spring-cloud-aws-sqs. The old property is deprecated but still supported for now.
* renamed consumer configuration for **waitTimeout** to **pollTimeout** to align with the naming in
spring-cloud-aws-sqs. The old property is deprecated but still supported for now.
* renamed consumer configuration for **queueStopTimeout** to **listenerShutdownTimeout** to align with the naming in
spring-cloud-aws-sqs. The old property is deprecated but still supported for now.

## Usage

With the library in your dependencies you can configure your Spring Cloud Stream bindings as usual. The type name for this binder is `sqs`. The destination needs to match the queue name, the specific ARN will be looked up from the available queue in the account.
With the library in your dependencies you can configure your Spring Cloud Stream bindings as usual. The type name for
this binder is `sqs`. The destination needs to match the queue name, the specific ARN will be looked up from the
available queue in the account.

You may also provide additional configuration options:

- **Consumers**
- **maxNumberOfMessages** - Maximum number of messages to retrieve with one poll to SQS. Must be a number between 1 and 10.
- **visibilityTimeout** - The duration in seconds that polled messages are hidden from subsequent poll requests after having been retrieved.
- **waitTimeout** - The duration in seconds that the system will wait for new messages to arrive when polling. Uses the Amazon SQS long polling feature. The value should be between 1 and 20.
- **queueStopTimeout** - The number of milliseconds that the queue worker is given to gracefully finish its work on shutdown before interrupting the current thread. Default value is 20000 milliseconds (20 seconds).
- **messageDeletionPolicy** - The deletion policy for messages that are retrieved from SQS. Defaults to NO_REDRIVE.
- **snsFanout** - Whether the incoming message has the SNS format and should be deserialized automatically. Defaults to true.
- **maxMessagesPerPoll** - Maximum number of messages to retrieve with one poll to SQS. Must be a number between 1
and 10.
- **visibilityTimeout** - The duration in seconds that polled messages are hidden from subsequent poll requests
after having been retrieved.
- **pollTimeout** - The duration in seconds that the system will wait for new messages to arrive when polling. Uses
the Amazon SQS long polling feature. The value should be between 1 and 20.
- **listenerShutdownTimeout** - The number of milliseconds that the queue worker is given to gracefully finish its
work on
shutdown before interrupting the current thread. Default value is 10 seconds.
- **snsFanout** - Whether the incoming message has the SNS format and should be deserialized automatically. Defaults
to true.

**Example Configuration:**

Expand All @@ -46,11 +73,13 @@ spring:
destination: output-queue-name
```
You may also provide your own beans of `AmazonSQSAsync` to override those that are created by [spring-cloud-aws-autoconfigure](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-autoconfigure).
You may also provide your own beans of `SqsAsyncClient` to override those that are created
by [spring-cloud-aws-autoconfigure](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-autoconfigure).

### FIFO queues

To use [FIFO SQS queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) you will need to provide a deduplication id and a group id.
To use [FIFO SQS queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html)
you will need to provide a deduplication id and a group id.
With this binder you may set these using the message headers `SqsHeaders.GROUP_ID` and `SqsHeaders.DEDUPLICATION_ID`.
The example below shows how you could use a FIFO queue in real life.

Expand All @@ -69,22 +98,24 @@ spring:

```java
class Application {
@Bean
public Message<String> someFunction(String input) {
return MessageBuilder.withPayload(input)
.setHeader(SqsHeaders.GROUP_ID, "my-application")
.setHeader(SqsHeaders.DEDUPLICATION_ID, UUID.randomUUID())
.build();
}
@Bean
public Message<String> someFunction(String input) {
return MessageBuilder.withPayload(input)
.setHeader(SqsHeaders.GROUP_ID, "my-application")
.setHeader(SqsHeaders.DEDUPLICATION_ID, UUID.randomUUID())
.build();
}
}
```

### Concurrency

Consumers in the SQS binder support the Spring Cloud Stream `concurrency` property.
By specifying a value you will launch `concurrency` threads continuously polling for `maxNumberOfMessages` each.
The threads will process all messages asynchronously, but each thread will wait for its current batch of messages to all complete processing before retrieving new ones.
If your message processing is highly variable from message to message it is recommended to set a lower value for `maxNumberOfMessages` and a higher value for `concurrency`.
The threads will process all messages asynchronously, but each thread will wait for its current batch of messages to all
complete processing before retrieving new ones.
If your message processing is highly variable from message to message it is recommended to set a lower value
for `maxNumberOfMessages` and a higher value for `concurrency`.
Note that this will increase the amount of API calls done against SQS.

**Example Configuration:**
Expand All @@ -97,7 +128,7 @@ spring:
bindings:
someFunction-in-0:
consumer:
maxNumberOfMessages: 5
maxMessagesPerPoll: 5
bindings:
someFunction-in-0:
destination: input-queue-name
Expand Down
19 changes: 10 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@
</distributionManagement>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring.cloud-version>2022.0.2</spring.cloud-version>
<spring.cloud-aws-version>2.4.4</spring.cloud-aws-version>
<spring.boot-version>3.0.6</spring.boot-version>
<testcontainers.version>1.17.6</testcontainers.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<spring.cloud-version>2022.0.3</spring.cloud-version>
<spring.cloud-aws-version>3.0.1</spring.cloud-aws-version>
<spring.boot-version>3.1.0</spring.boot-version>
<spring.integration-version>3.0.0</spring.integration-version>
<testcontainers.version>1.18.3</testcontainers.version>

<sonar.projectKey>idealo_spring-cloud-stream-binder-sqs</sonar.projectKey>
<sonar.organization>idealo</sonar.organization>
Expand Down Expand Up @@ -96,16 +97,16 @@
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<artifactId>spring-cloud-aws-sqs</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>2.5.4</version>
<version>${spring.integration-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.idealo.spring.stream.binder.sqs;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -16,7 +17,9 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

import de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter;
import de.idealo.spring.stream.binder.sqs.properties.SqsConsumerProperties;
Expand All @@ -28,18 +31,18 @@ public class SqsMessageHandlerBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<SqsConsumerProperties>, ExtendedProducerProperties<SqsProducerProperties>, SqsStreamProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, SqsConsumerProperties, SqsProducerProperties> {

private final AmazonSQSAsync amazonSQS;
private final SqsAsyncClient sqsAsyncClient;
private final SqsExtendedBindingProperties extendedBindingProperties;
private final List<SqsInboundChannelAdapter> adapters = new ArrayList<>();

public SqsMessageHandlerBinder(AmazonSQSAsync amazonSQS, SqsStreamProvisioner provisioningProvider, SqsExtendedBindingProperties extendedBindingProperties) {
public SqsMessageHandlerBinder(SqsAsyncClient amazonSQS, SqsStreamProvisioner provisioningProvider, SqsExtendedBindingProperties extendedBindingProperties) {
super(new String[0], provisioningProvider);
this.amazonSQS = amazonSQS;
this.sqsAsyncClient = amazonSQS;
this.extendedBindingProperties = extendedBindingProperties;
}

public AmazonSQSAsync getAmazonSQS() {
return amazonSQS;
public SqsAsyncClient getSqsAsyncClient() {
return sqsAsyncClient;
}

public List<SqsInboundChannelAdapter> getAdapters() {
Expand All @@ -48,9 +51,8 @@ public List<SqsInboundChannelAdapter> getAdapters() {

@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<SqsProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(amazonSQS);
SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsyncClient);
sqsMessageHandler.setQueue(destination.getName());
sqsMessageHandler.setFailureChannel(errorChannel);
sqsMessageHandler.setBeanFactory(getBeanFactory());

sqsMessageHandler.setDelayExpressionString(String.format("headers.get('%s')", SqsHeaders.DELAY));
Expand All @@ -62,19 +64,17 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin

@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<SqsConsumerProperties> properties) throws Exception {
SqsInboundChannelAdapter adapter = new SqsInboundChannelAdapter(amazonSQS, destination.getName());
adapter.setMaxNumberOfMessages(properties.getExtension().getMaxNumberOfMessages());
final SqsContainerOptions sqsContainerOptions =
SqsContainerOptions.builder()
.maxMessagesPerPoll(properties.getExtension().getMaxMessagesPerPoll())
.messageVisibility(Duration.ofSeconds(properties.getExtension().getVisibilityTimeout()))
.pollTimeout(Duration.ofSeconds(properties.getExtension().getPollTimeout()))
.listenerShutdownTimeout(Duration.ofSeconds(properties.getExtension().getListenerShutdownTimeout()))
.queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
.build();
SqsInboundChannelAdapter adapter = new SqsInboundChannelAdapter(sqsAsyncClient, destination.getName());
adapter.setSqsContainerOptions(sqsContainerOptions);
adapter.setConcurrency(properties.getConcurrency());
adapter.setVisibilityTimeout(properties.getExtension().getVisibilityTimeout());
adapter.setWaitTimeOut(properties.getExtension().getWaitTimeout());

if (properties.getExtension().getQueueStopTimeout() != null) {
adapter.setQueueStopTimeout(properties.getExtension().getQueueStopTimeout());
}

if (properties.getExtension().getMessageDeletionPolicy() != null) {
adapter.setMessageDeletionPolicy(properties.getExtension().getMessageDeletionPolicy());
}

if (properties.getExtension().isSnsFanout()) {
adapter.setMessageBuilderFactory(new SnsFanoutMessageBuilderFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

import de.idealo.spring.stream.binder.sqs.SqsMessageHandlerBinder;
import de.idealo.spring.stream.binder.sqs.health.SqsBinderHealthIndicator;
Expand All @@ -27,7 +27,7 @@ public SqsStreamProvisioner provisioningProvider() {
}

@Bean
public SqsMessageHandlerBinder sqsMessageHandlerBinder(AmazonSQSAsync amazonSQS, SqsStreamProvisioner sqsStreamProvisioner, SqsExtendedBindingProperties extendedBindingProperties) {
public SqsMessageHandlerBinder sqsMessageHandlerBinder(SqsAsyncClient amazonSQS, SqsStreamProvisioner sqsStreamProvisioner, SqsExtendedBindingProperties extendedBindingProperties) {
return new SqsMessageHandlerBinder(amazonSQS, sqsStreamProvisioner, extendedBindingProperties);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package de.idealo.spring.stream.binder.sqs.health;

import static java.util.Collections.singletonList;

import java.net.URI;
import java.net.URISyntaxException;

Expand All @@ -11,8 +9,10 @@
import org.springframework.boot.actuate.health.Health;
import org.springframework.util.Assert;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

import de.idealo.spring.stream.binder.sqs.SqsMessageHandlerBinder;
import de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter;
Expand Down Expand Up @@ -63,9 +63,13 @@ protected void doHealthCheck(Health.Builder builder) {
private boolean isReachable(String queueName) {
try {
if (isValidQueueUrl(queueName)) {
this.sqsMessageHandlerBinder.getAmazonSQS().getQueueAttributes(queueName, singletonList("CreatedTimestamp"));
this.sqsMessageHandlerBinder.getSqsAsyncClient()
.getQueueAttributes(
GetQueueAttributesRequest.builder().queueUrl(queueName).attributeNamesWithStrings("CreatedTimestamp").build()).get();
} else {
this.sqsMessageHandlerBinder.getAmazonSQS().getQueueUrl(queueName);
this.sqsMessageHandlerBinder.getSqsAsyncClient()
.getQueueUrl(
GetQueueUrlRequest.builder().queueName(queueName).build()).get();
}
return true;
} catch (QueueDoesNotExistException e) {
Expand All @@ -74,6 +78,9 @@ private boolean isReachable(String queueName) {
} catch (SdkClientException e) {
LOGGER.error("Queue '{}' is not reachable", queueName, e);
return false;
} catch (Exception e) {
LOGGER.error("Health check failed for queue '{}'", queueName, e);
return false;
}
}

Expand Down
Loading

0 comments on commit 89cd2b4

Please sign in to comment.