From 87ccede70bbfb5983b333bc383892d9192cf20a4 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 8 Jan 2025 19:36:57 +0100 Subject: [PATCH] Integrate object dist strategies --- .../kafka/connect/s3/source/S3SourceTask.java | 5 +- .../s3/source/utils/AWSV2SourceClient.java | 25 +----- .../s3/source/utils/SourceRecordIterator.java | 24 +++++- .../source/utils/AWSV2SourceClientTest.java | 80 +++++-------------- .../utils/SourceRecordIteratorTest.java | 37 ++++++++- 5 files changed, 82 insertions(+), 89 deletions(-) diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 5fff1b34..786f239d 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -105,15 +105,14 @@ public void start(final Map props) { s3SourceConfig = new S3SourceConfig(props); this.transformer = TransformerFactory.getTransformer(s3SourceConfig); offsetManager = new OffsetManager(context, s3SourceConfig); - awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys, - initializeObjectDistributionStrategy(), taskId, filePattern); + awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); prepareReaderFromOffsetStorageReader(); this.taskInitialized = true; } private void prepareReaderFromOffsetStorageReader() { sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, - awsv2SourceClient, filePattern); + awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId); } @Override diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index 47684a71..c32e09b4 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -22,10 +22,8 @@ import java.util.Objects; import java.util.Set; import java.util.function.Predicate; -import java.util.regex.Pattern; import java.util.stream.Stream; -import io.aiven.kafka.connect.common.source.task.DistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3ClientFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; @@ -52,28 +50,18 @@ public class AWSV2SourceClient { private Predicate filterPredicate = s3Object -> s3Object.size() > 0; private final Set failedObjectKeys; - private final DistributionStrategy distributionStrategy; - - private final int taskId; - - private final Pattern filePattern; - /** * @param s3SourceConfig * configuration for Source connector * @param failedObjectKeys * all objectKeys which have already been tried but have been unable to process. */ - public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set failedObjectKeys, - final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) { + public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set failedObjectKeys) { this.s3SourceConfig = s3SourceConfig; final S3ClientFactory s3ClientFactory = new S3ClientFactory(); this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig); this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.failedObjectKeys = new HashSet<>(failedObjectKeys); - this.taskId = taskId; - this.distributionStrategy = distributionStrategy; - this.filePattern = filePattern; } /** @@ -86,15 +74,12 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set * @param failedObjectKeys * all objectKeys which have already been tried but have been unable to process. */ - AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, final Set failedObjectKeys, - final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) { + AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, + final Set failedObjectKeys) { this.s3SourceConfig = s3SourceConfig; this.s3Client = s3Client; this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.failedObjectKeys = new HashSet<>(failedObjectKeys); - this.taskId = taskId; - this.distributionStrategy = distributionStrategy; - this.filePattern = filePattern; } public Iterator getListOfObjectKeys(final String startToken) { @@ -141,9 +126,7 @@ public void addFailedObjectKeys(final String objectKey) { } public void setFilterPredicate(final Predicate basePredicate) { - this.filterPredicate = basePredicate - .and(objectSummary -> distributionStrategy.isPartOfTask(taskId, objectSummary.key(), filePattern)) - .and(objectSummary -> !failedObjectKeys.contains(objectSummary.key())); + this.filterPredicate = basePredicate.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key())); } public void shutdown() { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 3651f7fd..31a2cdb1 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -34,6 +35,7 @@ import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; +import io.aiven.kafka.connect.common.source.task.DistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.apache.commons.io.function.IOSupplier; @@ -48,6 +50,8 @@ public final class SourceRecordIterator implements Iterator { private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class); private final Pattern filePattern; + private final DistributionStrategy distributionStrategy; + private final int taskId; public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L; private String currentObjectKey; @@ -65,7 +69,8 @@ public final class SourceRecordIterator implements Iterator { private final AWSV2SourceClient sourceClient; // NOPMD public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, - final Transformer transformer, final AWSV2SourceClient sourceClient, final Pattern filePattern) { + final Transformer transformer, final AWSV2SourceClient sourceClient, + final DistributionStrategy distributionStrategy, final Pattern filePattern, final int taskId) { this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; @@ -74,6 +79,8 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan this.sourceClient = sourceClient; objectListIterator = sourceClient.getListOfObjectKeys(null); this.filePattern = filePattern; + this.distributionStrategy = distributionStrategy; + this.taskId = taskId; } private void nextS3Object() { @@ -88,7 +95,11 @@ private void nextS3Object() { try { currentObjectKey = objectListIterator.next(); + if (currentObjectKey != null) { + if (validateTaskDistributionStrategy(currentObjectKey)) { + return; + } recordIterator = createIteratorForCurrentFile(); } } catch (IOException e) { @@ -96,6 +107,17 @@ private void nextS3Object() { } } + public boolean validateTaskDistributionStrategy(final String currentObjectKey) { + final Predicate isPartOfTaskPredicate = objectKey -> distributionStrategy.isPartOfTask(taskId, + objectKey, filePattern); + + if (!isPartOfTaskPredicate.test(currentObjectKey)) { + recordIterator = Collections.emptyIterator(); + return true; + } + return false; + } + private Iterator createIteratorForCurrentFile() throws IOException { final Matcher fileMatcher = filePattern.matcher(currentObjectKey); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index 37aefd00..cb4f966d 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -16,7 +16,6 @@ package io.aiven.kafka.connect.s3.source.utils; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.ANY_FILENAME_PATTERN; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static org.assertj.core.api.Assertions.assertThat; @@ -33,14 +32,9 @@ import java.util.List; import java.util.Map; -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; -import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; -import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import software.amazon.awssdk.services.s3.S3Client; @@ -57,19 +51,16 @@ class AWSV2SourceClientTest { @Captor ArgumentCaptor requestCaptor; - private static Map getConfigMap(final int maxTasks, final int taskId) { + private static Map getConfigMap() { final Map configMap = new HashMap<>(); - configMap.put("tasks.max", String.valueOf(maxTasks)); - configMap.put("task.id", String.valueOf(taskId)); configMap.put(AWS_S3_BUCKET_NAME_CONFIG, "test-bucket"); return configMap; } - @ParameterizedTest - @CsvSource({ "3, 1" }) - void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) { - initializeWithTaskConfigs(maxTasks, taskId); + @Test + void testFetchObjectSummariesWithNoObjects() { + initializeWithTaskConfigs(); final ListObjectsV2Response listObjectsV2Response = createListObjectsV2Response(Collections.emptyList(), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); @@ -77,54 +68,31 @@ void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) assertThat(summaries).isExhausted(); } - @ParameterizedTest - @CsvSource({ "1, 0" }) - void testFetchObjectSummariesWithOneObjectWithBasicConfig(final int maxTasks, final int taskId) { + @Test + void testFetchObjectSummariesWithOneObjectWithBasicConfig() { final String objectKey = "any-key"; - initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectKeysIterator(objectKey); - assertThat(summaries).hasNext(); - } - - @ParameterizedTest - @CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, - final String objectKey) { - initializeWithTaskConfigs(maxTasks, taskId); + initializeWithTaskConfigs(); final Iterator summaries = getS3ObjectKeysIterator(objectKey); assertThat(summaries).hasNext(); } - @ParameterizedTest - @CsvSource({ "4, 1, key1", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", - "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, - final String objectKey) { - initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectKeysIterator(objectKey); - - assertThat(summaries).isExhausted(); - } - - @ParameterizedTest - @CsvSource({ "4, 3", "4, 0" }) - void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int taskId) { - initializeWithTaskConfigs(maxTasks, taskId); + @Test + void testFetchObjectSummariesWithZeroByteObject() { + initializeWithTaskConfigs(); final ListObjectsV2Response listObjectsV2Response = getListObjectsV2Response(); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); - // assigned 1 object to taskid - assertThat(summaries).hasNext(); + assertThat(summaries.next()).isNotBlank(); assertThat(summaries.next()).isNotBlank(); assertThat(summaries).isExhausted(); } @Test void testFetchObjectSummariesWithPagination() throws IOException { - initializeWithTaskConfigs(4, 3); + initializeWithTaskConfigs(); final S3Object object1 = createObjectSummary(1, "key1"); final S3Object object2 = createObjectSummary(2, "key2"); final List firstBatch = List.of(object1); @@ -138,19 +106,16 @@ void testFetchObjectSummariesWithPagination() throws IOException { final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class)); assertThat(summaries.next()).isNotNull(); - assertThat(summaries).isExhausted(); + assertThat(summaries.next()).isNotNull(); } @Test void testFetchObjectWithPrefix() { - final int taskId = 0; - final Map configMap = getConfigMap(1, taskId); + final Map configMap = getConfigMap(); configMap.put(AWS_S3_PREFIX_CONFIG, "test/"); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(), - new PartitionInPathDistributionStrategy(1), taskId, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/" + ANY_FILENAME_PATTERN)); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); final S3Object object1 = createObjectSummary(1, "topics/key1/1/key1.txt"); final S3Object object2 = createObjectSummary(1, "topics/key2/2/key2.txt"); @@ -178,14 +143,11 @@ void testFetchObjectWithPrefix() { @Test void testFetchObjectWithInitialStartAfter() { - final int taskId = 0; - final Map configMap = getConfigMap(1, taskId); + final Map configMap = getConfigMap(); final String startAfter = "file-option-1-12000.txt"; final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(), - new HashDistributionStrategy(1), taskId, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}")); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); final S3Object object1 = createObjectSummary(1, "key1-1-10000"); final S3Object object2 = createObjectSummary(1, "key2-2-20000"); @@ -236,13 +198,11 @@ private Iterator getS3ObjectKeysIterator(final String objectKey) { return awsv2SourceClient.getListOfObjectKeys(null); } - public void initializeWithTaskConfigs(final int maxTasks, final int taskId) { - final Map configMap = getConfigMap(maxTasks, taskId); + private void initializeWithTaskConfigs() { + final Map configMap = getConfigMap(); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(), - new HashDistributionStrategy(maxTasks), taskId, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}")); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); } private ListObjectsV2Response getListObjectsV2Response() { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 5d93f99e..e7b9efb1 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -41,10 +41,14 @@ import io.aiven.kafka.connect.common.source.input.AvroTransformer; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; final class SourceRecordIteratorTest { @@ -80,7 +84,7 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockSourceApiClient.getListOfObjectKeys(any())).thenReturn(Collections.emptyIterator()); SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, filePattern); + mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isFalse(); assertThat(iterator.next()).isNull(); @@ -91,7 +95,7 @@ void testIteratorProcessesS3Objects() throws Exception { .thenReturn(Collections.singletonList(key).listIterator()); iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, - filePattern); + new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); @@ -122,7 +126,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { mockPatternMatcher(filePattern); SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, filePattern); + mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isTrue(); iterator.next(); verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); @@ -135,7 +139,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, - filePattern); + new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isTrue(); iterator.next(); @@ -143,6 +147,31 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { } } + @ParameterizedTest + @CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" }) + void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, + final String objectKey) { + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + mockSourceApiClient, new HashDistributionStrategy(maxTasks), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId); + + final boolean taskNotAssigned = iterator.validateTaskDistributionStrategy(objectKey); + assertThat(taskNotAssigned).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "4, 1, key1", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", + "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) + void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, + final String objectKey) { + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + mockSourceApiClient, new HashDistributionStrategy(maxTasks), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId); + + final boolean taskNotAssigned = iterator.validateTaskDistributionStrategy(objectKey); + assertThat(taskNotAssigned).isTrue(); + } + private static void mockPatternMatcher(final Pattern filePattern) { final Matcher fileMatcher = mock(Matcher.class); when(filePattern.matcher(any())).thenReturn(fileMatcher);