diff --git a/.gitignore b/.gitignore
index 022f16f..3efc8a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -55,14 +55,14 @@ hs_err_pid*
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
-# .idea/artifacts
-# .idea/compiler.xml
-# .idea/jarRepositories.xml
-# .idea/modules.xml
-# .idea/*.iml
-# .idea/modules
-# *.iml
-# *.ipr
+.idea/artifacts
+.idea/compiler.xml
+.idea/jarRepositories.xml
+.idea/modules.xml
+.idea/*.iml
+.idea/modules
+*.iml
+*.ipr
# CMake
cmake-build-*/
diff --git a/pom.xml b/pom.xml
index ee4cf06..b212952 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,6 +49,7 @@
2020.0.2
2.3.1
2.4.5
+ 1.15.3
@@ -74,6 +75,13 @@
pom
import
+
+ org.testcontainers
+ testcontainers-bom
+ ${testcontainers.version}
+ pom
+ import
+
@@ -105,6 +113,45 @@
spring-boot-configuration-processor
true
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.testcontainers
+ localstack
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
@@ -234,4 +281,4 @@
-
\ No newline at end of file
+
diff --git a/spring-cloud-stream-binder-sqs.iml b/spring-cloud-stream-binder-sqs.iml
deleted file mode 100644
index 78b2cc5..0000000
--- a/spring-cloud-stream-binder-sqs.iml
+++ /dev/null
@@ -1,2 +0,0 @@
-
-
\ No newline at end of file
diff --git a/src/main/java/de/idealo/spring/stream/binder/sqs/SqsMessageHandlerBinder.java b/src/main/java/de/idealo/spring/stream/binder/sqs/SqsMessageHandlerBinder.java
index a87debc..2108578 100644
--- a/src/main/java/de/idealo/spring/stream/binder/sqs/SqsMessageHandlerBinder.java
+++ b/src/main/java/de/idealo/spring/stream/binder/sqs/SqsMessageHandlerBinder.java
@@ -1,5 +1,8 @@
package de.idealo.spring.stream.binder.sqs;
+import java.util.ArrayList;
+import java.util.List;
+
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -25,6 +28,7 @@ public class SqsMessageHandlerBinder
private final AmazonSQSAsync amazonSQS;
private final SqsExtendedBindingProperties extendedBindingProperties;
+ private final List adapters = new ArrayList<>();
public SqsMessageHandlerBinder(AmazonSQSAsync amazonSQS, SqsStreamProvisioner provisioningProvider, SqsExtendedBindingProperties extendedBindingProperties) {
super(new String[0], provisioningProvider);
@@ -32,6 +36,14 @@ public SqsMessageHandlerBinder(AmazonSQSAsync amazonSQS, SqsStreamProvisioner pr
this.extendedBindingProperties = extendedBindingProperties;
}
+ public AmazonSQSAsync getAmazonSQS() {
+ return amazonSQS;
+ }
+
+ public List getAdapters() {
+ return adapters;
+ }
+
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception {
throw new UnsupportedOperationException("Producing to SQS is not supported yet");
@@ -52,6 +64,8 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
adapter.setMessageBuilderFactory(new SnsFanoutMessageBuilderFactory());
}
+ this.adapters.add(adapter);
+
return adapter;
}
diff --git a/src/main/java/de/idealo/spring/stream/binder/sqs/config/SqsBinderConfiguration.java b/src/main/java/de/idealo/spring/stream/binder/sqs/config/SqsBinderConfiguration.java
index 77da18b..31d0717 100644
--- a/src/main/java/de/idealo/spring/stream/binder/sqs/config/SqsBinderConfiguration.java
+++ b/src/main/java/de/idealo/spring/stream/binder/sqs/config/SqsBinderConfiguration.java
@@ -1,5 +1,8 @@
package de.idealo.spring.stream.binder.sqs.config;
+import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
+import org.springframework.boot.actuate.health.HealthIndicator;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
@@ -9,6 +12,7 @@
import com.amazonaws.services.sqs.AmazonSQSAsync;
import de.idealo.spring.stream.binder.sqs.SqsMessageHandlerBinder;
+import de.idealo.spring.stream.binder.sqs.health.SqsBinderHealthIndicator;
import de.idealo.spring.stream.binder.sqs.properties.SqsExtendedBindingProperties;
import de.idealo.spring.stream.binder.sqs.provisioning.SqsStreamProvisioner;
@@ -26,4 +30,17 @@ public SqsStreamProvisioner provisioningProvider() {
public SqsMessageHandlerBinder sqsMessageHandlerBinder(AmazonSQSAsync amazonSQS, SqsStreamProvisioner sqsStreamProvisioner, SqsExtendedBindingProperties extendedBindingProperties) {
return new SqsMessageHandlerBinder(amazonSQS, sqsStreamProvisioner, extendedBindingProperties);
}
+
+ @Configuration
+ @ConditionalOnClass(HealthIndicator.class)
+ @ConditionalOnEnabledHealthIndicator("binders")
+ protected static class SqsBinderHealthIndicatorConfiguration {
+
+ @Bean
+ @ConditionalOnMissingBean(name = "sqsBinderHealthIndicator")
+ public SqsBinderHealthIndicator sqsBinderHealthIndicator(SqsMessageHandlerBinder sqsMessageHandlerBinder) {
+ return new SqsBinderHealthIndicator(sqsMessageHandlerBinder);
+ }
+
+ }
}
diff --git a/src/main/java/de/idealo/spring/stream/binder/sqs/health/SqsBinderHealthIndicator.java b/src/main/java/de/idealo/spring/stream/binder/sqs/health/SqsBinderHealthIndicator.java
new file mode 100644
index 0000000..3300030
--- /dev/null
+++ b/src/main/java/de/idealo/spring/stream/binder/sqs/health/SqsBinderHealthIndicator.java
@@ -0,0 +1,70 @@
+package de.idealo.spring.stream.binder.sqs.health;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.actuate.health.AbstractHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
+import org.springframework.util.Assert;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+
+import de.idealo.spring.stream.binder.sqs.SqsMessageHandlerBinder;
+
+/**
+ * Code from
+ * https://github.com/spring-cloud/spring-cloud-aws/pull/342
+ */
+public class SqsBinderHealthIndicator extends AbstractHealthIndicator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SqsBinderHealthIndicator.class);
+
+ private final SqsMessageHandlerBinder sqsMessageHandlerBinder;
+
+ public SqsBinderHealthIndicator(SqsMessageHandlerBinder sqsMessageHandlerBinder) {
+ Assert.notNull(sqsMessageHandlerBinder, "SqsMessageHandlerBinder must not be null");
+ this.sqsMessageHandlerBinder = sqsMessageHandlerBinder;
+ }
+
+ @Override
+ protected void doHealthCheck(Health.Builder builder) {
+ boolean allListenersRunning = true;
+
+ if (sqsMessageHandlerBinder.getAdapters().isEmpty()) {
+ builder.unknown();
+ allListenersRunning = false;
+ }
+
+ for (SqsMessageDrivenChannelAdapter adapter : this.sqsMessageHandlerBinder.getAdapters()) {
+ for (String queueName : adapter.getQueues()) {
+ if (!adapter.isRunning(queueName)) {
+ builder.down().withDetail(queueName, "listener is not running");
+ allListenersRunning = false;
+ }
+
+ if (!isReachable(queueName)) {
+ builder.down().withDetail(queueName, "queue is not reachable");
+ allListenersRunning = false;
+ }
+ }
+ }
+
+ if (allListenersRunning) {
+ builder.up();
+ }
+ }
+
+ private boolean isReachable(String queueName) {
+ try {
+ this.sqsMessageHandlerBinder.getAmazonSQS().getQueueUrl(queueName);
+ return true;
+ } catch (QueueDoesNotExistException e) {
+ LOGGER.warn("Queue '{}' does not exist", queueName);
+ return false;
+ } catch (SdkClientException e) {
+ LOGGER.error("Queue '{}' is not reachable", queueName, e);
+ return false;
+ }
+ }
+}
diff --git a/src/test/java/de/idealo/spring/stream/binder/sqs/SqsBinderTest.java b/src/test/java/de/idealo/spring/stream/binder/sqs/SqsBinderTest.java
new file mode 100644
index 0000000..bf1cf6a
--- /dev/null
+++ b/src/test/java/de/idealo/spring/stream/binder/sqs/SqsBinderTest.java
@@ -0,0 +1,96 @@
+package de.idealo.spring.stream.binder.sqs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS;
+
+import java.time.Duration;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.actuate.health.HealthEndpoint;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import com.amazonaws.services.sqs.AmazonSQSAsync;
+import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
+
+import reactor.core.publisher.Sinks;
+import reactor.test.StepVerifier;
+
+@Testcontainers
+@SpringBootTest(properties = {
+ "cloud.aws.stack.auto=false",
+ "cloud.aws.region.static=eu-central-1",
+ "spring.cloud.stream.bindings.input-in-0.destination=queue1",
+ "spring.cloud.stream.bindings.function.definition=input"
+})
+class SqsBinderTest {
+
+ @Container
+ private static final LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.10"))
+ .withServices(SQS)
+ .withEnv("DEFAULT_REGION", "eu-central-1");
+
+ private static final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer();
+
+ @Autowired
+ private AmazonSQSAsync amazonSQS;
+
+ @Autowired
+ private HealthEndpoint healthEndpoint;
+
+ @BeforeAll
+ static void beforeAll() throws Exception {
+ localStack.execInContainer("awslocal", "sqs", "create-queue", "--queue-name", "queue1");
+ }
+
+ @Test
+ void shouldPassMessageToConsumer() {
+ String testMessage = "test message";
+
+ String queueUrl = amazonSQS.getQueueUrl("queue1").getQueueUrl();
+ amazonSQS.sendMessage(queueUrl, testMessage);
+
+ StepVerifier.create(sink.asFlux())
+ .assertNext(message -> {
+ assertThat(message).isEqualTo(testMessage);
+ })
+ .verifyTimeout(Duration.ofSeconds(1));
+ }
+
+ @Test
+ void canTestHealth() {
+ assertThat(healthEndpoint.health().getStatus()).isEqualTo(Status.UP);
+ assertThat(healthEndpoint.healthForPath("sqsBinder").getStatus()).isEqualTo(Status.UP);
+ }
+
+ @TestConfiguration
+ static class AwsConfig {
+
+ @Bean
+ AmazonSQSAsync amazonSQS() {
+ return AmazonSQSAsyncClientBuilder.standard()
+ .withEndpointConfiguration(localStack.getEndpointConfiguration(SQS))
+ .withCredentials(localStack.getDefaultCredentialsProvider())
+ .build();
+ }
+
+ @Bean
+ Consumer input() {
+ return sink::tryEmitNext;
+ }
+ }
+
+ @SpringBootApplication
+ static class Application {
+ }
+}
diff --git a/src/test/java/de/idealo/spring/stream/binder/sqs/SqsMessageHandlerBinderTest.java b/src/test/java/de/idealo/spring/stream/binder/sqs/SqsMessageHandlerBinderTest.java
new file mode 100644
index 0000000..0bb1039
--- /dev/null
+++ b/src/test/java/de/idealo/spring/stream/binder/sqs/SqsMessageHandlerBinderTest.java
@@ -0,0 +1,42 @@
+package de.idealo.spring.stream.binder.sqs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+
+import com.amazonaws.services.sqs.AmazonSQSAsync;
+
+import de.idealo.spring.stream.binder.sqs.properties.SqsConsumerProperties;
+import de.idealo.spring.stream.binder.sqs.properties.SqsExtendedBindingProperties;
+import de.idealo.spring.stream.binder.sqs.provisioning.SqsDestination;
+import de.idealo.spring.stream.binder.sqs.provisioning.SqsStreamProvisioner;
+
+@ExtendWith(MockitoExtension.class)
+class SqsMessageHandlerBinderTest {
+
+ @Mock
+ private AmazonSQSAsync amazonSQS;
+
+ private SqsMessageHandlerBinder sqsMessageHandlerBinder;
+
+ @BeforeEach
+ void setUp() {
+ this.sqsMessageHandlerBinder = new SqsMessageHandlerBinder(amazonSQS, new SqsStreamProvisioner(), new SqsExtendedBindingProperties());
+ }
+
+ @Test
+ void shouldSaveConsumerAdapter() throws Exception {
+ String queueName = "queue1";
+
+ sqsMessageHandlerBinder.createConsumerEndpoint(new SqsDestination(queueName), "group", new ExtendedConsumerProperties<>(new SqsConsumerProperties()));
+
+ assertThat(sqsMessageHandlerBinder.getAdapters()).isNotEmpty();
+ assertThat(sqsMessageHandlerBinder.getAdapters().get(0).getQueues()).containsExactly(queueName);
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/de/idealo/spring/stream/binder/sqs/health/SqsBinderHealthIndicatorTest.java b/src/test/java/de/idealo/spring/stream/binder/sqs/health/SqsBinderHealthIndicatorTest.java
new file mode 100644
index 0000000..037ad0e
--- /dev/null
+++ b/src/test/java/de/idealo/spring/stream/binder/sqs/health/SqsBinderHealthIndicatorTest.java
@@ -0,0 +1,119 @@
+package de.idealo.spring.stream.binder.sqs.health;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.services.sqs.AmazonSQSAsync;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+
+import de.idealo.spring.stream.binder.sqs.SqsMessageHandlerBinder;
+
+@ExtendWith(MockitoExtension.class)
+class SqsBinderHealthIndicatorTest {
+
+ @Mock
+ private SqsMessageHandlerBinder sqsMessageHandlerBinder;
+
+ @Mock
+ private AmazonSQSAsync amazonSQS;
+
+ @Mock
+ private SqsMessageDrivenChannelAdapter adapter;
+
+ @InjectMocks
+ private SqsBinderHealthIndicator healthIndicator;
+
+ @BeforeEach
+ void setUp() {
+ lenient().when(sqsMessageHandlerBinder.getAmazonSQS()).thenReturn(amazonSQS);
+ when(sqsMessageHandlerBinder.getAdapters()).thenReturn(Collections.singletonList(adapter));
+ }
+
+ @Test
+ void reportsTrueWhenAllConfiguredQueuesAreRunning() {
+ when(adapter.getQueues()).thenReturn(new String[] {"queue1", "queue2"});
+ when(adapter.isRunning(any())).thenReturn(true);
+ when(amazonSQS.getQueueUrl(anyString())).thenReturn(new GetQueueUrlResult().withQueueUrl("http://queue.url"));
+
+ Health.Builder builder = new Health.Builder();
+
+ healthIndicator.doHealthCheck(builder);
+
+ assertThat(builder.build().getStatus()).isEqualTo(Status.UP);
+
+ }
+
+ @Test
+ void reportsUnknownWhenNoBindingsAreConfigured() {
+ when(sqsMessageHandlerBinder.getAdapters()).thenReturn(Collections.emptyList());
+
+ Health.Builder builder = new Health.Builder();
+
+ healthIndicator.doHealthCheck(builder);
+
+ assertThat(builder.build().getStatus()).isEqualTo(Status.UNKNOWN);
+ }
+
+ @Test
+ void reportsFalseIfAtLeastOneConfiguredQueueIsNotRunning() {
+ when(adapter.getQueues()).thenReturn(new String[] {"queue1", "queue2"});
+ when(amazonSQS.getQueueUrl(anyString())).thenReturn(new GetQueueUrlResult().withQueueUrl("http://queue.url"));
+ when(adapter.isRunning("queue1")).thenReturn(true);
+ when(adapter.isRunning("queue2")).thenReturn(false);
+
+ Health.Builder builder = new Health.Builder();
+
+ healthIndicator.doHealthCheck(builder);
+
+ Health health = builder.build();
+ assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ assertThat(health.getDetails()).containsKey("queue2");
+ }
+
+ @Test
+ void reportsFalseIfAtLeastOneConfiguredQueueDoesNotExist() {
+ when(adapter.getQueues()).thenReturn(new String[] {"queue1", "queue2"});
+ when(adapter.isRunning(any())).thenReturn(true);
+ when(amazonSQS.getQueueUrl(anyString())).thenThrow(QueueDoesNotExistException.class);
+
+ Health.Builder builder = new Health.Builder();
+
+ healthIndicator.doHealthCheck(builder);
+
+ Health health = builder.build();
+ assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ assertThat(health.getDetails()).containsKey("queue1");
+ }
+
+ @Test
+ void reportsFalseIfAtLeastOneConfiguredQueueIsNotReachable() {
+ when(adapter.getQueues()).thenReturn(new String[] {"queue1", "queue2"});
+ when(adapter.isRunning(any())).thenReturn(true);
+ when(amazonSQS.getQueueUrl(anyString())).thenThrow(SdkClientException.class);
+
+ Health.Builder builder = new Health.Builder();
+
+ healthIndicator.doHealthCheck(builder);
+
+ Health health = builder.build();
+ assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ assertThat(health.getDetails()).containsKey("queue1");
+ }
+}