diff --git a/data-prepper-plugins/rds-source/build.gradle b/data-prepper-plugins/rds-source/build.gradle index 1b325457bf..09f734cc91 100644 --- a/data-prepper-plugins/rds-source/build.gradle +++ b/data-prepper-plugins/rds-source/build.gradle @@ -34,4 +34,5 @@ dependencies { testImplementation libs.avro.core testImplementation libs.parquet.hadoop testImplementation libs.parquet.avro +// testImplementation 'org.slf4j:slf4j-simple:2.0.9' } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java index 22935fc6e3..8eb3b9cde9 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java @@ -47,6 +47,7 @@ public LogicalReplicationClient(final ConnectionManager connectionManager, @Override public void connect() { + LOG.debug("Start connecting logical replication stream. "); PGReplicationStream stream; try (Connection conn = connectionManager.getConnection()) { PGConnection pgConnection = conn.unwrap(PGConnection.class); @@ -62,6 +63,7 @@ public void connect() { logicalStreamBuilder.withStartPosition(startLsn); } stream = logicalStreamBuilder.start(); + LOG.debug("Logical replication stream started. "); if (eventProcessor != null) { while (!disconnectRequested) { @@ -88,7 +90,8 @@ public void connect() { } stream.close(); - LOG.info("Replication stream closed successfully."); + disconnectRequested = false; + LOG.debug("Replication stream closed successfully."); } catch (Exception e) { LOG.error("Exception while creating Postgres replication stream. ", e); } @@ -97,6 +100,7 @@ public void connect() { @Override public void disconnect() { disconnectRequested = true; + LOG.debug("Requested to disconnect logical replication stream."); } public void setEventProcessor(LogicalReplicationEventProcessor eventProcessor) { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java index 9cd410ee44..45897335b5 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java @@ -33,7 +33,9 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -87,6 +89,92 @@ void test_connect() throws SQLException, InterruptedException { verify(stream).setFlushedLSN(lsn); } + @Test + void test_disconnect() throws SQLException, InterruptedException { + final Connection connection = mock(Connection.class); + final PGConnection pgConnection = mock(PGConnection.class, RETURNS_DEEP_STUBS); + final ChainedLogicalStreamBuilder logicalStreamBuilder = mock(ChainedLogicalStreamBuilder.class); + final PGReplicationStream stream = mock(PGReplicationStream.class); + final ByteBuffer message = ByteBuffer.allocate(0); + final LogSequenceNumber lsn = mock(LogSequenceNumber.class); + + when(connectionManager.getConnection()).thenReturn(connection); + when(connection.unwrap(PGConnection.class)).thenReturn(pgConnection); + when(pgConnection.getReplicationAPI().replicationStream().logical()).thenReturn(logicalStreamBuilder); + when(logicalStreamBuilder.withSlotName(anyString())).thenReturn(logicalStreamBuilder); + when(logicalStreamBuilder.withSlotOption(anyString(), anyString())).thenReturn(logicalStreamBuilder); + when(logicalStreamBuilder.start()).thenReturn(stream); + when(stream.readPending()).thenReturn(message).thenReturn(null); + when(stream.getLastReceiveLSN()).thenReturn(lsn); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> logicalReplicationClient.connect()); + + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(eventProcessor).process(message)); + Thread.sleep(20); + verify(stream).setAppliedLSN(lsn); + verify(stream).setFlushedLSN(lsn); + + logicalReplicationClient.disconnect(); + Thread.sleep(20); + verify(stream).close(); + verifyNoMoreInteractions(stream, eventProcessor); + + executorService.shutdownNow(); + } + + @Test + void test_connect_disconnect_cycles() throws SQLException, InterruptedException { + final Connection connection = mock(Connection.class); + final PGConnection pgConnection = mock(PGConnection.class, RETURNS_DEEP_STUBS); + final ChainedLogicalStreamBuilder logicalStreamBuilder = mock(ChainedLogicalStreamBuilder.class); + final PGReplicationStream stream = mock(PGReplicationStream.class); + final ByteBuffer message = ByteBuffer.allocate(0); + final LogSequenceNumber lsn = mock(LogSequenceNumber.class); + + when(connectionManager.getConnection()).thenReturn(connection); + when(connection.unwrap(PGConnection.class)).thenReturn(pgConnection); + when(pgConnection.getReplicationAPI().replicationStream().logical()).thenReturn(logicalStreamBuilder); + when(logicalStreamBuilder.withSlotName(anyString())).thenReturn(logicalStreamBuilder); + when(logicalStreamBuilder.withSlotOption(anyString(), anyString())).thenReturn(logicalStreamBuilder); + when(logicalStreamBuilder.start()).thenReturn(stream); + when(stream.readPending()).thenReturn(message).thenReturn(null); + when(stream.getLastReceiveLSN()).thenReturn(lsn); + + // First connect + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> logicalReplicationClient.connect()); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(eventProcessor, times(1)).process(message)); + Thread.sleep(20); + verify(stream).setAppliedLSN(lsn); + verify(stream).setFlushedLSN(lsn); + + // First disconnect + logicalReplicationClient.disconnect(); + Thread.sleep(20); + verify(stream).close(); + verifyNoMoreInteractions(stream, eventProcessor); + + // Second connect + when(stream.readPending()).thenReturn(message).thenReturn(null); + executorService.submit(() -> logicalReplicationClient.connect()); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(eventProcessor, times(2)).process(message)); + Thread.sleep(20); + verify(stream, times(2)).setAppliedLSN(lsn); + verify(stream, times(2)).setFlushedLSN(lsn); + + // Second disconnect + logicalReplicationClient.disconnect(); + Thread.sleep(20); + verify(stream, times(2)).close(); + verifyNoMoreInteractions(stream, eventProcessor); + + executorService.shutdownNow(); + } + private LogicalReplicationClient createObjectUnderTest() { return new LogicalReplicationClient(connectionManager, replicationSlotName, publicationName); }