From d082cb89c280ae4d554673f9e3843ac29eb9b9ed Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Tue, 7 Jan 2025 12:56:30 +0100 Subject: [PATCH] Fix review --- .../common/config/SourceConfigFragment.java | 4 +- .../common/source/input/JsonTransformer.java | 2 +- .../task/ObjectDistributionStrategy.java | 2 +- .../PartitionInPathDistributionStrategy.java | 18 +---- .../kafka/connect/s3/source/S3SourceTask.java | 38 +++++++++-- .../s3/source/utils/AWSV2SourceClient.java | 68 ++++++------------- .../source/utils/AWSV2SourceClientTest.java | 30 ++++---- 7 files changed, 79 insertions(+), 83 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java index 0901477e..135c6091 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java @@ -26,7 +26,7 @@ import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies; -import org.codehaus.plexus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; public final class SourceConfigFragment extends ConfigFragment { private static final String GROUP_OTHER = "OTHER_CFG"; @@ -80,7 +80,7 @@ public static ConfigDef update(final ConfigDef configDef) { "Based on tasks.max config and this strategy, objects are processed in distributed" + " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", " + PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH, - GROUP_OTHER, sourcePollingConfigCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD + GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD return configDef; } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index 4ff0f1a2..52a4728c 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -30,7 +30,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.function.IOSupplier; -import org.codehaus.plexus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java index 904e3eca..29358e96 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.config.ConfigException; -import org.codehaus.plexus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; /** * An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java index 68f8dfcf..eb35cfd8 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java @@ -21,7 +21,6 @@ import org.apache.kafka.connect.errors.ConnectException; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +44,6 @@ */ public final class PartitionInPathDistributionStrategy implements ObjectDistributionStrategy { private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class); - private String s3Prefix; private Pattern filePattern; private int maxTasks; @@ -58,21 +56,12 @@ public boolean isPartOfTask(final int taskId, final String pathToBeEvaluated) { try { final Matcher fileMatcher = filePattern.matcher(pathToBeEvaluated); if (fileMatcher.matches()) { - // TBD this block is not required. prefix verification is done in aws client - final String topicName = fileMatcher.group(PATTERN_TOPIC_KEY); - final String s3PrefixForObject = StringUtils.replace(s3Prefix, TOPIC_PATTERN, topicName); - if (!pathToBeEvaluated.startsWith(s3PrefixForObject)) { - LOG.warn("Ignoring path {}, does not contain the preconfigured prefix {} set up at startup", - pathToBeEvaluated, s3Prefix); - return false; - } - // -- end of block final String partitionId = fileMatcher.group(PATTERN_PARTITION_KEY); - return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(partitionId)); } else { - LOG.warn("Ignoring path {}, does not contain the preconfigured prefix {} set up at startup", - pathToBeEvaluated, s3Prefix); + LOG.warn( + "Ignoring path {}, does not contain the preconfigured prefix to extract partition set up at startup", + pathToBeEvaluated); return false; } } catch (NumberFormatException ex) { @@ -101,6 +90,5 @@ public Pattern getFilePattern() { private void configureDistributionStrategy(final int maxTasks, final String expectedPathFormat) { this.maxTasks = maxTasks; this.filePattern = configurePattern(expectedPathFormat); - s3Prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_PATTERN); } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index efda028d..adea379e 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -17,6 +17,8 @@ package io.aiven.kafka.connect.s3.source; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.MAX_POLL_RECORDS; +import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.ANY_FILENAME_PATTERN; +import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.DEFAULT_PREFIX_FILE_PATH_PATTERN; import java.util.ArrayList; import java.util.HashSet; @@ -30,8 +32,13 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; +import io.aiven.kafka.connect.common.source.task.HashObjectDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionInFilenameDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; import io.aiven.kafka.connect.s3.source.utils.OffsetManager; @@ -76,10 +83,12 @@ public class S3SourceTask extends SourceTask { private final Object pollLock = new Object(); private AWSV2SourceClient awsv2SourceClient; private final Set failedObjectKeys = new HashSet<>(); - private final Set inProcessObjectKeys = new HashSet<>(); + private ObjectDistributionStrategy objectDistributionStrategy; private OffsetManager offsetManager; + private int taskId; + @SuppressWarnings("PMD.UnnecessaryConstructor") public S3SourceTask() { super(); @@ -95,16 +104,16 @@ public void start(final Map props) { LOGGER.info("S3 Source task started."); s3SourceConfig = new S3SourceConfig(props); this.transformer = TransformerFactory.getTransformer(s3SourceConfig); + this.objectDistributionStrategy = initializeObjectDistributionStrategy(); offsetManager = new OffsetManager(context, s3SourceConfig); - awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); - awsv2SourceClient.initializeObjectDistributionStrategy(); + awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys, objectDistributionStrategy, taskId); prepareReaderFromOffsetStorageReader(); this.taskInitialized = true; } private void prepareReaderFromOffsetStorageReader() { sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, - awsv2SourceClient, awsv2SourceClient.getObjectDistributionStrategy().getFilePattern()); + awsv2SourceClient, this.objectDistributionStrategy.getFilePattern()); } @Override @@ -188,4 +197,25 @@ public boolean isTaskInitialized() { public AtomicBoolean getConnectorStopped() { return new AtomicBoolean(connectorStopped.get()); } + + private ObjectDistributionStrategy initializeObjectDistributionStrategy() { + final ObjectDistributionStrategies objectDistributionStrategies = s3SourceConfig + .getObjectDistributionStrategy(); + final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); + this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; + ObjectDistributionStrategy objectDistributionStrategy; + + if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILENAME) { + objectDistributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks, + s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()); + } else if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILEPATH) { + objectDistributionStrategy = new PartitionInPathDistributionStrategy(maxTasks, + DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN); + } else { + objectDistributionStrategy = new HashObjectDistributionStrategy(maxTasks, + s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + } + + return objectDistributionStrategy; + } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index 8a7feeed..1304cb71 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -16,9 +16,6 @@ package io.aiven.kafka.connect.s3.source.utils; -import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.ANY_FILENAME_PATTERN; -import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.DEFAULT_PREFIX_FILE_PATH_PATTERN; - import java.io.InputStream; import java.util.HashSet; import java.util.Iterator; @@ -27,11 +24,7 @@ import java.util.function.Predicate; import java.util.stream.Stream; -import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies; -import io.aiven.kafka.connect.common.source.task.HashObjectDistributionStrategy; import io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy; -import io.aiven.kafka.connect.common.source.task.PartitionInFilenameDistributionStrategy; -import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3ClientFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; @@ -55,13 +48,12 @@ public class AWSV2SourceClient { private final S3Client s3Client; private final String bucketName; - private final Predicate filterPredicate = s3Object -> s3Object.size() > 0; + private Predicate filterPredicate = s3Object -> s3Object.size() > 0; private final Set failedObjectKeys; - private ObjectDistributionStrategy objectDistributionStrategy; + private final ObjectDistributionStrategy objectDistributionStrategy; - private int taskId; - private int maxTasks; + private final int taskId; /** * @param s3SourceConfig @@ -69,22 +61,15 @@ public class AWSV2SourceClient { * @param failedObjectKeys * all objectKeys which have already been tried but have been unable to process. */ - public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set failedObjectKeys) { + public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set failedObjectKeys, + final ObjectDistributionStrategy objectDistributionStrategy, final int taskId) { this.s3SourceConfig = s3SourceConfig; final S3ClientFactory s3ClientFactory = new S3ClientFactory(); this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig); this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.failedObjectKeys = new HashSet<>(failedObjectKeys); - } - - public void initializeObjectDistributionStrategy() { - this.maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); - this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; - this.objectDistributionStrategy = getObjectDistributionStrategy(s3SourceConfig.getObjectDistributionStrategy()); - } - - public ObjectDistributionStrategy getObjectDistributionStrategy() { - return this.objectDistributionStrategy; + this.taskId = taskId; + this.objectDistributionStrategy = objectDistributionStrategy; } /** @@ -97,12 +82,14 @@ public ObjectDistributionStrategy getObjectDistributionStrategy() { * @param failedObjectKeys * all objectKeys which have already been tried but have been unable to process. */ - AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, - final Set failedObjectKeys) { + AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, final Set failedObjectKeys, + final ObjectDistributionStrategy objectDistributionStrategy, final int taskId) { this.s3SourceConfig = s3SourceConfig; this.s3Client = s3Client; this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.failedObjectKeys = new HashSet<>(failedObjectKeys); + this.taskId = taskId; + this.objectDistributionStrategy = objectDistributionStrategy; } public Iterator getListOfObjectKeys(final String startToken) { @@ -113,6 +100,8 @@ public Iterator getListOfObjectKeys(final String startToken) { .startAfter(optionalKey(startToken)) .build(); + setFilterPredicate(filterPredicate); + final Stream s3ObjectKeyStream = Stream .iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { // This is called every time next() is called on the iterator. @@ -126,12 +115,7 @@ public Iterator getListOfObjectKeys(final String startToken) { } }) - .flatMap(response -> response.contents() - .stream() - .filter(filterPredicate) - .filter(objectSummary -> objectDistributionStrategy.isPartOfTask(taskId, objectSummary.key())) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key()))) - .map(S3Object::key); + .flatMap(response -> response.contents().stream().filter(filterPredicate).map(S3Object::key)); return s3ObjectKeyStream.iterator(); } private String optionalKey(final String key) { @@ -151,25 +135,13 @@ public void addFailedObjectKeys(final String objectKey) { this.failedObjectKeys.add(objectKey); } - public void shutdown() { - s3Client.close(); + public void setFilterPredicate(final Predicate basePredicate) { + this.filterPredicate = basePredicate + .and(objectSummary -> objectDistributionStrategy.isPartOfTask(taskId, objectSummary.key())) + .and(objectSummary -> !failedObjectKeys.contains(objectSummary.key())); } - private ObjectDistributionStrategy getObjectDistributionStrategy( - final ObjectDistributionStrategies objectDistributionStrategies) { - ObjectDistributionStrategy objectDistributionStrategy; - - if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILENAME) { - objectDistributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks, - s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()); - } else if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILEPATH) { - objectDistributionStrategy = new PartitionInPathDistributionStrategy(maxTasks, - DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN); - } else { - objectDistributionStrategy = new HashObjectDistributionStrategy(maxTasks, - s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); - } - - return objectDistributionStrategy; + public void shutdown() { + s3Client.close(); } } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index 838a1eef..2db27b17 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source.utils; +import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.ANY_FILENAME_PATTERN; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static org.assertj.core.api.Assertions.assertThat; @@ -32,6 +33,8 @@ import java.util.List; import java.util.Map; +import io.aiven.kafka.connect.common.source.task.HashObjectDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.junit.jupiter.api.Test; @@ -139,15 +142,17 @@ void testFetchObjectSummariesWithPagination() throws IOException { @Test void testFetchObjectWithPrefix() { - final Map configMap = getConfigMap(1, 0); + final int taskId = 0; + final Map configMap = getConfigMap(1, taskId); configMap.put(AWS_S3_PREFIX_CONFIG, "test/"); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); - awsv2SourceClient.initializeObjectDistributionStrategy(); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(), + new PartitionInPathDistributionStrategy(1, "topics/{{topic}}/{{partition}}/" + ANY_FILENAME_PATTERN), + taskId); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3Object object1 = createObjectSummary(1, "key1"); - final S3Object object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "topics/key1/1/key1.txt"); + final S3Object object2 = createObjectSummary(1, "topics/key2/2/key2.txt"); final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); @@ -173,15 +178,16 @@ void testFetchObjectWithPrefix() { @Test void testFetchObjectWithInitialStartAfter() { - final Map configMap = getConfigMap(1, 0); + final int taskId = 0; + final Map configMap = getConfigMap(1, taskId); final String startAfter = "file-option-1-12000.txt"; final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); - awsv2SourceClient.initializeObjectDistributionStrategy(); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(), + new HashObjectDistributionStrategy(1, "{{topic}}-{{partition}}-{{start_offset}}"), taskId); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3Object object1 = createObjectSummary(1, "key1"); - final S3Object object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "key1-1-10000"); + final S3Object object2 = createObjectSummary(1, "key2-2-20000"); final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); @@ -233,8 +239,8 @@ public void initializeWithTaskConfigs(final int maxTasks, final int taskId) { final Map configMap = getConfigMap(maxTasks, taskId); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); - awsv2SourceClient.initializeObjectDistributionStrategy(); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(), + new HashObjectDistributionStrategy(maxTasks, "{{topic}}-{{partition}}-{{start_offset}}"), taskId); } private ListObjectsV2Response getListObjectsV2Response() {