From de0c91e50e44e80c7f28473308c9f18f89d769eb Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Wed, 22 Jan 2025 15:05:07 +0000 Subject: [PATCH] Remove unused configuration, updates to improve code Signed-off-by: Aindriu Lavelle --- .../common/config/FileNameFragment.java | 23 +------- .../common/config/SourceCommonConfig.java | 3 - .../common/config/SourceConfigFragment.java | 8 --- .../connect/common/source/OffsetManager.java | 32 +++-------- .../connect/common/source/task/Context.java | 29 +++++++--- .../common/source/OffsetManagerTest.java | 49 ++++++++-------- .../connect/s3/source/AwsIntegrationTest.java | 2 - .../connect/s3/source/IntegrationTest.java | 2 - .../s3/source/utils/S3OffsetManagerEntry.java | 2 - .../s3/source/utils/S3SourceRecord.java | 27 ++------- .../s3/source/utils/SourceRecordIterator.java | 17 +++--- .../connect/s3/source/S3SourceTaskTest.java | 2 - .../s3/source/config/S3SourceConfigTest.java | 3 - .../utils/S3OffsetManagerEntryTest.java | 2 - .../s3/source/utils/S3SourceRecordTest.java | 57 +++++++++++++++++++ 15 files changed, 127 insertions(+), 131 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java index 467ea2cb..a15fe0e7 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java @@ -72,8 +72,7 @@ public static ConfigDef update(final ConfigDef configDef) { + "Only some combinations of variables are valid, which currently are:\n" + "- `topic`, `partition`, `start_offset`." + "There is also `key` only variable {{key}} for grouping by keys", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG); + GROUP_FILE, fileGroupCounter++, ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG); final String supportedCompressionTypes = CompressionType.names() .stream() @@ -84,8 +83,7 @@ public static ConfigDef update(final ConfigDef configDef) { ConfigDef.Importance.MEDIUM, "The compression type used for files put on S3. " + "The supported values are: " + supportedCompressionTypes + ".", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG, + GROUP_FILE, fileGroupCounter++, ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG, FixedSetRecommender.ofSupportedValues(CompressionType.names())); configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new ConfigDef.Validator() { @@ -99,8 +97,7 @@ public void ensureValid(final String name, final Object value) { }, ConfigDef.Importance.MEDIUM, "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " + "0 is interpreted as \"unlimited\", which is the default.", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.SHORT, FILE_MAX_RECORDS); + GROUP_FILE, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_MAX_RECORDS); configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(), new TimeZoneValidator(), ConfigDef.Importance.LOW, @@ -114,16 +111,6 @@ public void ensureValid(final String name, final Object value) { "Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE); - configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, - "The template for file prefix on S3. " - + "Supports `{{ variable }}` placeholders for substituting variables. " - + "Currently supported variables are `topic` and `partition` " - + "and are mandatory to have these in the directory structure." - + "Example prefix : topics/{{topic}}/partition/{{partition}}/", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG); - return configDef; } @@ -197,8 +184,4 @@ public int getMaxRecordsPerFile() { return cfg.getInt(FILE_MAX_RECORDS); } - public String getFilePathPrefixTemplateConfig() { - return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG); - } - } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java index 68036bd6..d527db41 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -62,9 +62,6 @@ public String getSchemaRegistryUrl() { public String getTargetTopics() { return sourceConfigFragment.getTargetTopics(); } - public String getTargetTopicPartitions() { - return sourceConfigFragment.getTargetTopicPartitions(); - } public ErrorsTolerance getErrorsTolerance() { return sourceConfigFragment.getErrorsTolerance(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java index 7f5d6276..927a065c 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java @@ -34,7 +34,6 @@ public final class SourceConfigFragment extends ConfigFragment { public static final String MAX_POLL_RECORDS = "max.poll.records"; public static final String EXPECTED_MAX_MESSAGE_BYTES = "expected.max.message.bytes"; private static final String GROUP_OFFSET_TOPIC = "OFFSET_TOPIC"; - public static final String TARGET_TOPIC_PARTITIONS = "topic.partitions"; public static final String TARGET_TOPICS = "topics"; public static final String ERRORS_TOLERANCE = "errors.tolerance"; @@ -70,9 +69,6 @@ public static ConfigDef update(final ConfigDef configDef) { // Offset Storage config group includes target topics int offsetStorageGroupCounter = 0; - configDef.define(TARGET_TOPIC_PARTITIONS, ConfigDef.Type.STRING, "0", new ConfigDef.NonEmptyString(), - ConfigDef.Importance.MEDIUM, "eg : 0,1", GROUP_OFFSET_TOPIC, offsetStorageGroupCounter++, - ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS); configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC, offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); @@ -93,10 +89,6 @@ public String getTargetTopics() { return cfg.getString(TARGET_TOPICS); } - public String getTargetTopicPartitions() { - return cfg.getString(TARGET_TOPIC_PARTITIONS); - } - public int getMaxPollRecords() { return cfg.getInt(MAX_POLL_RECORDS); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java index 70db39ff..13e71b0e 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java @@ -17,12 +17,12 @@ package io.aiven.kafka.connect.common.source; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; @@ -94,33 +94,19 @@ public Optional getEntry(final OffsetManagerKey key, final Function { - if (v == null) { - return new HashMap<>(entry.getProperties()); - } else { - v.putAll(entry.getProperties()); - return v; - } - }); - } - /** * Gets any offset information stored in the offsetStorageReader and adds to the local offsets Map. This provides a * performance improvement over when checking if offsets exists individually. * - * @param partitionMaps - * A Collection of partition maps which identify individual offset entries + * @param offsetManagerKeys + * A Collection of OffsetManagerKey which identify individual offset entries */ - public void hydrateOffsetManager(final Collection> partitionMaps) { - offsets.putAll(context.offsetStorageReader().offsets(partitionMaps)); + public void populateOffsetManager(final Collection offsetManagerKeys) { + + offsets.putAll(context.offsetStorageReader() + .offsets(offsetManagerKeys.stream() + .map(OffsetManagerKey::getPartitionMap) + .collect(Collectors.toList()))); } /** diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java index 265ade6d..ebbba42d 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java @@ -37,35 +37,48 @@ public Context(final K storageKey) { this.storageKey = storageKey; } - public Optional getTopic() { + /** + * Creates a defensive copy of the Context for use internally by the S3SourceRecord + * + * @param anotherContext + * The Context which needs to be copied + */ + protected Context(final Context anotherContext) { + this.storageKey = anotherContext.storageKey; + this.partition = anotherContext.partition; + this.topic = anotherContext.topic; + this.offset = anotherContext.offset; + } + + public final Optional getTopic() { return Optional.ofNullable(topic); } - public void setTopic(final String topic) { + public final void setTopic(final String topic) { this.topic = topic; } - public Optional getPartition() { + public final Optional getPartition() { return Optional.ofNullable(partition); } - public void setPartition(final Integer partition) { + public final void setPartition(final Integer partition) { this.partition = partition; } - public Optional getStorageKey() { + public final Optional getStorageKey() { return Optional.ofNullable(storageKey); } - public void setStorageKey(final K storageKey) { + public final void setStorageKey(final K storageKey) { this.storageKey = storageKey; } - public Optional getOffset() { + public final Optional getOffset() { return Optional.ofNullable(offset); } - public void setOffset(final Integer offset) { + public final void setOffset(final Integer offset) { this.offset = offset; } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java index c8d1cafc..2d3890e2 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java @@ -17,6 +17,8 @@ package io.aiven.kafka.connect.common.source; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -80,52 +82,49 @@ void testNewEntryWithoutDataFromContext() { } @Test - void testHydrateOffsetManagerWithPartitionMapsNoExistingEntries() { - final List> partitionMaps = new ArrayList<>(); + void testPopulateOffsetManagerWithPartitionMapsNoExistingEntries() { + final List partitionMaps = new ArrayList<>(); for (int i = 0; i < 10; i++) { - final Map partitionKey = new HashMap<>(); // NOPMD avoid instantiating objects in loops. - partitionKey.put("segment1", "topic" + i); - partitionKey.put("segment2", String.valueOf(i)); - partitionKey.put("segment3", "something else" + i); - partitionMaps.add(partitionKey); + partitionMaps.add(new TestingOffsetManagerEntry("topic" + i, String.valueOf(i), "something else " + i) + .getManagerKey());// NOPMD avoid instantiating objects in loops. } - when(offsetStorageReader.offsets(eq(partitionMaps))).thenReturn(Map.of()); + when(offsetStorageReader.offsets(anyCollection())).thenReturn(Map.of()); - offsetManager.hydrateOffsetManager(partitionMaps); - verify(offsetStorageReader, times(1)).offsets(eq(partitionMaps)); + offsetManager.populateOffsetManager(partitionMaps); + verify(offsetStorageReader, times(1)).offsets(anyList()); // No Existing entries so we expect nothing to exist and for it to try check the offsets again. - final Optional result = offsetManager.getEntry(() -> partitionMaps.get(0), - TestingOffsetManagerEntry::new); + final Optional result = offsetManager + .getEntry(() -> partitionMaps.get(0).getPartitionMap(), TestingOffsetManagerEntry::new); assertThat(result).isEmpty(); - verify(offsetStorageReader, times(1)).offset(eq(partitionMaps.get(0))); + verify(offsetStorageReader, times(1)).offset(eq(partitionMaps.get(0).getPartitionMap())); } @Test - void testHydrateOffsetManagerWithPartitionMapsAllEntriesExist() { - final List> partitionMaps = new ArrayList<>(); + void testPopulateOffsetManagerWithPartitionMapsAllEntriesExist() { + final List partitionMaps = new ArrayList<>(); final Map, Map> offsetReaderMap = new HashMap<>(); for (int i = 0; i < 10; i++) { - final Map partitionKey = Map.of("segment1", "topic" + 1, "segment2", String.valueOf(i), - "segment3", "somethingelse" + i); // NOPMD avoid instantiating objects in loops. - partitionMaps.add(partitionKey); - offsetReaderMap.put(partitionKey, Map.of("recordCount", (long) i)); + OffsetManager.OffsetManagerKey key = new TestingOffsetManagerEntry("topic" + i, String.valueOf(i), + "something else " + i).getManagerKey(); + partitionMaps.add(key);// NOPMD avoid instantiating objects in loops. + offsetReaderMap.put(key.getPartitionMap(), Map.of("recordCount", (long) i)); } - when(offsetStorageReader.offsets(eq(partitionMaps))).thenReturn(offsetReaderMap); + when(offsetStorageReader.offsets(anyList())).thenReturn(offsetReaderMap); - offsetManager.hydrateOffsetManager(partitionMaps); - verify(offsetStorageReader, times(1)).offsets(eq(partitionMaps)); + offsetManager.populateOffsetManager(partitionMaps); + verify(offsetStorageReader, times(1)).offsets(anyList()); // No Existing entries so we expect nothing to exist and for it to try check the offsets again. - final Optional result = offsetManager.getEntry(() -> partitionMaps.get(0), - TestingOffsetManagerEntry::new); + final Optional result = offsetManager + .getEntry(() -> partitionMaps.get(0).getPartitionMap(), TestingOffsetManagerEntry::new); assertThat(result).isPresent(); - verify(offsetStorageReader, times(0)).offset(eq(partitionMaps.get(0))); + verify(offsetStorageReader, times(0)).offset(eq(partitionMaps.get(0).getPartitionMap())); } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index b548ed19..b57fe6b7 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -22,7 +22,6 @@ import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; @@ -123,7 +122,6 @@ private Map getConfig(final String topics, final int maxTasks) { config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint); config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME); config.put(AWS_S3_PREFIX_CONFIG, getS3Prefix()); - config.put(TARGET_TOPIC_PARTITIONS, "0,1"); config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter"); diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index dba0f781..fa7fb621 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -25,7 +25,6 @@ import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.DISTRIBUTION_TYPE; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; @@ -378,7 +377,6 @@ private static Map basicS3ConnectorConfig(final boolean addPrefi if (addPrefix) { config.put(AWS_S3_PREFIX_CONFIG, s3Prefix); } - config.put(TARGET_TOPIC_PARTITIONS, "0,1"); return config; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java index 6a96c54e..a12c49cb 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java @@ -30,8 +30,6 @@ public final class S3OffsetManagerEntry implements OffsetManager.OffsetManagerEn // TODO make this package private after values in S3SourceTask are no longer needed public static final String BUCKET = "bucket"; public static final String OBJECT_KEY = "objectKey"; - public static final String TOPIC = "topic"; - public static final String PARTITION = "partition"; public static final String RECORD_COUNT = "recordCount"; /** diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 4a714a85..6fd037b1 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -24,7 +24,6 @@ import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; import io.aiven.kafka.connect.common.source.task.Context; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.model.S3Object; @@ -50,9 +49,9 @@ public S3SourceRecord(final S3SourceRecord s3SourceRecord) { this.valueData = s3SourceRecord.valueData; this.context = s3SourceRecord.context; } - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "offsetManagerEntry is essentially read only") + public void setOffsetManagerEntry(final S3OffsetManagerEntry offsetManagerEntry) { - this.offsetManagerEntry = offsetManagerEntry; + this.offsetManagerEntry = offsetManagerEntry.fromProperties(offsetManagerEntry.getProperties()); } public long getRecordCount() { @@ -96,11 +95,13 @@ public S3OffsetManagerEntry getOffsetManagerEntry() { } public Context getContext() { - return defensiveContextCopy(context); + return new Context<>(context) { + }; } public void setContext(final Context context) { - this.context = defensiveContextCopy(context); + this.context = new Context<>(context) { + }; } /** @@ -125,20 +126,4 @@ public SourceRecord getSourceRecord(final ErrorsTolerance tolerance) { } } - /** - * Creates a defensive copy of the Context for use internally by the S3SourceRecord - * - * @param context - * The Context which needs to be copied - * @return The new Context object which has been created from the original context - */ - private Context defensiveContextCopy(final Context context) { - final Context returnValue = new Context<>( - context.getStorageKey().isPresent() ? context.getStorageKey().get() : null); - context.getTopic().ifPresent(returnValue::setTopic); - context.getPartition().ifPresent(returnValue::setPartition); - context.getOffset().ifPresent(returnValue::setOffset); - return returnValue; - } - } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 7429621a..35056125 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -136,9 +136,7 @@ private Stream convert(final S3SourceRecord s3SourceRecord) { s3SourceRecord.setKeyData( transformer.getKeyData(s3SourceRecord.getObjectKey(), s3SourceRecord.getTopic(), s3SourceConfig)); - if (!s3SourceRecord.getObjectKey().equals(lastSeenObjectKey)) { - lastSeenObjectKey = s3SourceRecord.getObjectKey(); - } + lastSeenObjectKey = s3SourceRecord.getObjectKey(); return transformer .getRecords(sourceClient.getObject(s3SourceRecord.getObjectKey()), s3SourceRecord.getTopic(), @@ -193,9 +191,8 @@ public boolean test(final Optional s3SourceRecord) { if (s3SourceRecord.isPresent()) { final S3SourceRecord record = s3SourceRecord.get(); final Context context = record.getContext(); - if (context != null) { - return taskId == distributionStrategy.getTaskFor(context); - } + return taskId == distributionStrategy.getTaskFor(context); + } return false; } @@ -203,7 +200,7 @@ public boolean test(final Optional s3SourceRecord) { } class FileMatching implements Function> { - private Context context; + final FilePatternUtils utils; FileMatching(final FilePatternUtils utils) { this.utils = utils; @@ -215,8 +212,8 @@ public Optional apply(final S3Object s3Object) { final Optional> optionalContext = utils.process(s3Object.key()); if (optionalContext.isPresent()) { final S3SourceRecord s3SourceRecord = new S3SourceRecord(s3Object); - context = optionalContext.get(); - overrideContextTopic(); + final Context context = optionalContext.get(); + overrideContextTopic(context); s3SourceRecord.setContext(context); S3OffsetManagerEntry offsetManagerEntry = new S3OffsetManagerEntry(bucket, s3Object.key()); offsetManagerEntry = offsetManager @@ -228,7 +225,7 @@ public Optional apply(final S3Object s3Object) { return Optional.empty(); } - private void overrideContextTopic() { + private void overrideContextTopic(final Context context) { // Set the target topic in the context if it has been set from configuration. if (targetTopics.isPresent()) { if (context.getTopic().isPresent()) { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 8483d204..78f8d00d 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -20,7 +20,6 @@ import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.BUCKET; import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.OBJECT_KEY; import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.RECORD_COUNT; @@ -186,7 +185,6 @@ private void setBasicProperties() { properties.putIfAbsent(MAX_TASKS, "1"); properties.put(TASK_ID, "1"); properties.putIfAbsent("connector.class", AivenKafkaConnectS3SourceConnector.class.getName()); - properties.putIfAbsent(TARGET_TOPIC_PARTITIONS, "0,1"); properties.putIfAbsent(TARGET_TOPICS, "testtopic"); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index 10939c51..3d02a30c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -19,7 +19,6 @@ import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; @@ -45,7 +44,6 @@ void correctFullConfig() { // record, topic specific props props.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); - props.put(TARGET_TOPIC_PARTITIONS, "0,1"); props.put(TARGET_TOPICS, "testtopic"); props.put(SCHEMA_REGISTRY_URL, "localhost:8081"); @@ -60,7 +58,6 @@ void correctFullConfig() { assertThat(conf.getInputFormat()).isEqualTo(InputFormat.AVRO); assertThat(conf.getTargetTopics()).isEqualTo("testtopic"); - assertThat(conf.getTargetTopicPartitions()).isEqualTo("0,1"); assertThat(conf.getSchemaRegistryUrl()).isEqualTo("localhost:8081"); assertThat(conf.getS3RetryBackoffDelayMs()).isEqualTo(S3ConfigFragment.AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java index 398ee359..4e019135 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java @@ -61,8 +61,6 @@ public void setUp() { private Map createPartitionMap() { final Map partitionKey = new HashMap<>(); - partitionKey.put(S3OffsetManagerEntry.TOPIC, TOPIC); - partitionKey.put(S3OffsetManagerEntry.PARTITION, PARTITION); partitionKey.put(S3OffsetManagerEntry.BUCKET, TEST_BUCKET); partitionKey.put(S3OffsetManagerEntry.OBJECT_KEY, OBJECT_KEY); return partitionKey; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java index 426d8f25..6561c51c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -74,6 +75,7 @@ void testCreateSourceRecordWithDataError() { context = mock(Context.class); final S3OffsetManagerEntry offsetManagerEntry = mock(S3OffsetManagerEntry.class); when(offsetManagerEntry.getManagerKey()).thenThrow(new DataException("Test Exception")); + when(offsetManagerEntry.fromProperties(any())).thenReturn(offsetManagerEntry); final S3SourceRecord s3Record = new S3SourceRecord(s3Object); s3Record.setOffsetManagerEntry(offsetManagerEntry); s3Record.setContext(context); @@ -84,4 +86,59 @@ void testCreateSourceRecordWithDataError() { assertThat(result).isNull(); } + @Test + void testModifyingInitialContextDoesNotAlterTheSourceRecordsContext() { + final String topic = "test-topic"; + final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(BUCKET_ONE, TEST_OBJECT_KEY_TXT); + context = new Context<>(TEST_OBJECT_KEY_TXT); + context.setPartition(null); + context.setTopic(topic); + final S3SourceRecord s3Record = new S3SourceRecord(s3Object); + s3Record.setOffsetManagerEntry(entry); + s3Record.setContext(context); + s3Record.setValueData(new SchemaAndValue(null, "")); + s3Record.setKeyData(new SchemaAndValue(null, "")); + + // alter context and it should have no impact on the source record. + context.setPartition(14); + context.setTopic("a-diff-topic"); + SourceRecord result = s3Record.getSourceRecord(ErrorsTolerance.NONE); + assertThat(result.topic()).isEqualTo(topic); + assertThat(result.kafkaPartition()).isEqualTo(null); + + // We should return a defensive copy so altering here should not affect the ssSourceRecord + context = s3Record.getContext(); + context.setPartition(99); + context.setTopic("another-diff-topic"); + result = s3Record.getSourceRecord(ErrorsTolerance.NONE); + assertThat(result.topic()).isEqualTo(topic); + assertThat(result.kafkaPartition()).isEqualTo(null); + + } + + @Test + void testModifyingInitialOffsetManagerEntryDoesNotAlterTheSourceRecordsOffsetManagerEntry() { + final String topic = "test-topic"; + S3OffsetManagerEntry entry = new S3OffsetManagerEntry(BUCKET_ONE, TEST_OBJECT_KEY_TXT); + context = new Context<>(TEST_OBJECT_KEY_TXT); + context.setPartition(null); + context.setTopic(topic); + final S3SourceRecord s3Record = new S3SourceRecord(s3Object); + s3Record.setOffsetManagerEntry(entry); + s3Record.setContext(context); + s3Record.setValueData(new SchemaAndValue(null, "")); + s3Record.setKeyData(new SchemaAndValue(null, "")); + final long currentRecordCount = entry.getRecordCount(); + // alter entry record count and it should have no impact on the source record. + entry.incrementRecordCount(); + assertThat(s3Record.getRecordCount()).isEqualTo(currentRecordCount); + + // We should return a defensive copy so altering here should not affect the ssSourceRecord + entry = s3Record.getOffsetManagerEntry(); + entry.incrementRecordCount(); + entry.incrementRecordCount(); + assertThat(s3Record.getRecordCount()).isEqualTo(currentRecordCount); + + } + }