diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java index f55257f4..c8baa95b 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java @@ -163,7 +163,7 @@ private boolean tryAdd(final List results, final Iterator offsetManager; + @BeforeEach void setup() { offsetStorageReader = mock(OffsetStorageReader.class); - sourceTaskContext = mock(SourceTaskContext.class); + final SourceTaskContext sourceTaskContext = mock(SourceTaskContext.class); when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); offsetManager = new OffsetManager<>(sourceTaskContext); } @@ -76,54 +74,6 @@ void testNewEntryWithoutDataFromContext() { assertThat(result).isNotPresent(); } - @Test - void testUpdateCurrentEntry() { - final TestingOffsetManagerEntry offsetEntry = new TestingOffsetManagerEntry("bucket", "topic1", "thing"); - - final ConcurrentHashMap, Map> offsets = new ConcurrentHashMap<>(); - offsets.put(offsetEntry.getManagerKey().getPartitionMap(), offsetEntry.getProperties()); - - offsetManager = new OffsetManager<>(sourceTaskContext, offsets); - offsetEntry.setProperty("MyProperty", "WOW"); - - offsetManager.updateCurrentOffsets(offsetEntry); - - final Optional result = offsetManager.getEntry(offsetEntry.getManagerKey(), - TestingOffsetManagerEntry::new); - assertThat(result).isPresent(); - assertThat(result.get().getProperty("MyProperty")).isEqualTo("WOW"); - assertThat(result.get().getProperties()).isEqualTo(offsetEntry.getProperties()); - } - - @Test - void testUpdateNonExistentEntry() { - final TestingOffsetManagerEntry offsetEntry = new TestingOffsetManagerEntry("bucket", "topic1", "0"); - offsetEntry.setProperty("Random-property", "random value"); - offsetManager.updateCurrentOffsets(offsetEntry); - - final Optional result = offsetManager.getEntry(offsetEntry.getManagerKey(), - offsetEntry::fromProperties); - assertThat(result).isPresent(); - assertThat(result.get().getProperties()).isEqualTo(offsetEntry.getProperties()); - } - - @Test - void updateCurrentOffsetsDataNotLost() { - final TestingOffsetManagerEntry offsetEntry = new TestingOffsetManagerEntry("bucket", "topic1", "0"); - offsetEntry.setProperty("test", "WOW"); - offsetManager.updateCurrentOffsets(offsetEntry); - - final TestingOffsetManagerEntry offsetEntry2 = new TestingOffsetManagerEntry("bucket", "topic1", "0"); - offsetEntry2.setProperty("test2", "a thing"); - offsetManager.updateCurrentOffsets(offsetEntry2); - - final Optional result = offsetManager.getEntry(offsetEntry.getManagerKey(), - offsetEntry::fromProperties); - assertThat(result).isPresent(); - assertThat(result.get().getProperty("test")).isEqualTo("WOW"); - assertThat(result.get().getProperty("test2")).isEqualTo("a thing"); - } - @SuppressWarnings("PMD.TestClassWithoutTestCases") // TODO figure out why this fails. public static class TestingOffsetManagerEntry // NOPMD the above suppress warnings does not work. implements diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 4844b40c..d391b9eb 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -267,6 +267,7 @@ static Map consumeOffsetMessages(KafkaConsumer c }); final List key = OBJECT_MAPPER.readValue(record.key(), new TypeReference<>() { // NOPMD }); + // key.get(0) will return the name of the connector the commit is from. final Map keyDetails = (Map) key.get(1); messages.put((String) keyDetails.get(OBJECT_KEY), offsetRec.get(RECORD_COUNT)); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index dc1c9a19..fb0ab4c4 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -111,7 +111,6 @@ public boolean hasNext() { public SourceRecord next() { final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next(); final S3OffsetManagerEntry entry = s3SourceRecord.getOffsetManagerEntry(); - offsetManager.updateCurrentOffsets(entry); return RecordProcessor.createSourceRecord(s3SourceRecord, s3SourceConfig, awsv2SourceClient, entry); } }; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java index 9fdc5671..94a51af7 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java @@ -57,6 +57,7 @@ public void setUp() { sourceTaskContext = mock(SourceTaskContext.class); when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); offsetManager = new OffsetManager<>(sourceTaskContext); + } private Map createPartitionMap() { @@ -104,31 +105,6 @@ void testGetEntry() { verify(sourceTaskContext, times(1)).offsetStorageReader(); } - @Test - void testUpdate() { - final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); - assertThat(entry.getRecordCount()).isEqualTo(0L); - assertThat(entry.getProperty("random_entry")).isNull(); - - offsetManager.updateCurrentOffsets(entry); - - entry.setProperty("random_entry", 5L); - entry.incrementRecordCount(); - assertThat(entry.getRecordCount()).isEqualTo(1L); - - offsetManager.updateCurrentOffsets(entry); - - final Optional entry2 = offsetManager.getEntry(entry.getManagerKey(), - entry::fromProperties); - assertThat(entry2).isPresent(); - assertThat(entry2.get().getPartition()).isEqualTo(PARTITION); - assertThat(entry2.get().getRecordCount()).isEqualTo(1L); - assertThat(entry2.get().getTopic()).isEqualTo(TOPIC); - assertThat(entry2.get().getBucket()).isEqualTo(TEST_BUCKET); - assertThat(entry2.get().getProperty("random_entry")).isEqualTo(5L); - verify(sourceTaskContext, times(0)).offsetStorageReader(); - } - @Test void testFromProperties() { final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION);