diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FileExtractionPatterns.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FileExtractionPatterns.java deleted file mode 100644 index 499da563..00000000 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FileExtractionPatterns.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2025 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.common.source.input.utils; -public class FileExtractionPatterns { - public static final String PATTERN_PARTITION_KEY = "partition"; - public static final String PATTERN_TOPIC_KEY = "topic"; - public static final String START_OFFSET_PATTERN = "{{start_offset}}"; - public static final String TIMESTAMP_PATTERN = "{{timestamp}}"; - public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}"; - public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}"; - - // Use a named group to return the partition in a complex string to always get the correct information for the - // partition number. - public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)"; - public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; - public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; - public static final String ANY_FILENAME_PATTERN = ".*$"; -} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java index da9dc6ae..5b127c4f 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java @@ -16,14 +16,6 @@ package io.aiven.kafka.connect.common.source.input.utils; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.NUMBER_REGEX_PATTERN; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PARTITION_NAMED_GROUP_REGEX_PATTERN; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PARTITION_PATTERN; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.START_OFFSET_PATTERN; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TIMESTAMP_PATTERN; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TOPIC_NAMED_GROUP_REGEX_PATTERN; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TOPIC_PATTERN; - import java.util.regex.Pattern; import org.apache.kafka.common.config.ConfigException; @@ -32,6 +24,20 @@ public final class FilePatternUtils { + public static final String PATTERN_PARTITION_KEY = "partition"; + public static final String PATTERN_TOPIC_KEY = "topic"; + public static final String START_OFFSET_PATTERN = "{{start_offset}}"; + public static final String TIMESTAMP_PATTERN = "{{timestamp}}"; + public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}"; + public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}"; + + // Use a named group to return the partition in a complex string to always get the correct information for the + // partition number. + public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)"; + public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; + public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; + public static final String ANY_FILENAME_PATTERN = ".*$"; + private FilePatternUtils() { // hidden } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java index 283f77d0..2ebeb2be 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.task; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PATTERN_PARTITION_KEY; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java index 275d4d2d..0cb0bbc1 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.task; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PATTERN_PARTITION_KEY; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 42d10aad..84d5681b 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -23,7 +23,6 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; -import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR; @@ -34,8 +33,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -49,6 +46,8 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.TransformerFactory; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; @@ -58,7 +57,6 @@ import org.apache.avro.Schema; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -76,28 +74,16 @@ class AwsIntegrationTest implements IntegrationBase { @Container public static final LocalStackContainer LOCALSTACK = IntegrationBase.createS3Container(); - private static String s3Prefix; - private S3Client s3Client; private String s3Endpoint; private BucketAccessor testBucketAccessor; - @Override - public String getS3Prefix() { - return s3Prefix; - } - @Override public S3Client getS3Client() { return s3Client; } - @BeforeAll - static void setUpAll() { - s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/"; - } - @BeforeEach void setupAWS() { s3Client = IntegrationBase.createS3Client(LOCALSTACK); @@ -118,7 +104,6 @@ private Map getConfig(final String topics, final int maxTasks) { config.put(AWS_SECRET_ACCESS_KEY_CONFIG, S3_SECRET_ACCESS_KEY); config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint); config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME); - config.put(AWS_S3_PREFIX_CONFIG, getS3Prefix()); config.put(TARGET_TOPIC_PARTITIONS, "0,1"); config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); @@ -146,14 +131,14 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final List offsetKeys = new ArrayList<>(); final List expectedKeys = new ArrayList<>(); // write 2 objects to s3 - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000")); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000")); - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001")); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001")); + expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0")); + expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0")); + expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1")); + expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1")); // we don't expext the empty one. offsetKeys.addAll(expectedKeys); - offsetKeys.add(writeToS3(topicName, new byte[0], "00003")); + offsetKeys.add(writeToS3(topicName, new byte[0], "3")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -168,7 +153,8 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>()); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient); + TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient, new HashDistributionStrategy(1), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); final HashSet seenKeys = new HashSet<>(); while (sourceRecordIterator.hasNext()) { @@ -183,8 +169,10 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { @Test void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); + final int maxTasks = 1; + final int taskId = 0; - final Map configData = getConfig(topicName, 1); + final Map configData = getConfig(topicName, maxTasks); configData.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); @@ -211,12 +199,12 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final Set offsetKeys = new HashSet<>(); - offsetKeys.add(writeToS3(topicName, outputStream1, "00001")); - offsetKeys.add(writeToS3(topicName, outputStream2, "00001")); + offsetKeys.add(writeToS3(topicName, outputStream1, "1")); + offsetKeys.add(writeToS3(topicName, outputStream2, "1")); - offsetKeys.add(writeToS3(topicName, outputStream3, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream4, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream5, "00002")); + offsetKeys.add(writeToS3(topicName, outputStream3, "2")); + offsetKeys.add(writeToS3(topicName, outputStream4, "2")); + offsetKeys.add(writeToS3(topicName, outputStream5, "2")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -231,7 +219,9 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>()); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient); + TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient, + new HashDistributionStrategy(maxTasks), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId); final HashSet seenKeys = new HashSet<>(); final Map> seenRecords = new HashMap<>(); @@ -275,9 +265,9 @@ void verifyIteratorRehydration(final TestInfo testInfo) { final List actualKeys = new ArrayList<>(); // write 2 objects to s3 - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000") + expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0") .substring((OBJECT_KEY + SEPARATOR).length())); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000") + expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0") .substring((OBJECT_KEY + SEPARATOR).length())); assertThat(testBucketAccessor.listObjects()).hasSize(2); @@ -296,7 +286,7 @@ void verifyIteratorRehydration(final TestInfo testInfo) { assertThat(actualKeys).containsAll(expectedKeys); // write 3rd object to s3 - expectedKeys.add(writeToS3(topicName, testData3.getBytes(StandardCharsets.UTF_8), "00000") + expectedKeys.add(writeToS3(topicName, testData3.getBytes(StandardCharsets.UTF_8), "0") .substring((OBJECT_KEY + SEPARATOR).length())); assertThat(testBucketAccessor.listObjects()).hasSize(3); diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index a8b91a19..fa4f60b7 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -102,8 +102,6 @@ static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final S3Client getS3Client(); - String getS3Prefix(); - /** * Write file to s3 with the specified key and data. * @@ -134,8 +132,7 @@ default void writeToS3WithKey(final String objectKey, final byte[] testDataBytes * {@link io.aiven.kafka.connect.s3.source.utils.OffsetManager#SEPARATOR} */ default String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) { - final String objectKey = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topicName - + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; + final String objectKey = topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; writeToS3WithKey(objectKey, testDataBytes); return OBJECT_KEY + SEPARATOR + objectKey; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 083d8627..4e7c409d 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -16,10 +16,13 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.SourceConfigFragment.OBJECT_DISTRIBUTION_STRATEGY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; @@ -27,6 +30,8 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; +import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; +import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR; import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -36,8 +41,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -54,6 +57,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.testutils.ContentUtils; @@ -61,11 +65,14 @@ import com.fasterxml.jackson.databind.JsonNode; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -80,7 +87,6 @@ final class IntegrationTest implements IntegrationBase { private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTest.class); private static final String CONNECTOR_NAME = "aiven-s3-source-connector"; - private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-test-"; private static final int OFFSET_FLUSH_INTERVAL_MS = 500; private static String s3Endpoint; @@ -95,22 +101,16 @@ final class IntegrationTest implements IntegrationBase { private ConnectRunner connectRunner; private static S3Client s3Client; + private TestInfo testInfo; @Override public S3Client getS3Client() { return s3Client; } - @Override - public String getS3Prefix() { - return s3Prefix; - } - public @BeforeAll static void setUpAll() throws IOException, InterruptedException { - s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/"; - s3Client = IntegrationBase.createS3Client(LOCALSTACK); s3Endpoint = LOCALSTACK.getEndpoint().toString(); testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET_NAME); @@ -122,6 +122,7 @@ public String getS3Prefix() { @BeforeEach void setUp(final TestInfo testInfo) throws Exception { testBucketAccessor.createBucket(); + this.testInfo = testInfo; connectRunner = new ConnectRunner(OFFSET_FLUSH_INTERVAL_MS); final List ports = IntegrationBase.getKafkaListenerPorts(); @@ -151,10 +152,25 @@ void tearDown() { testBucketAccessor.removeBucket(); } - @Test - void bytesTest(final TestInfo testInfo) { + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void bytesTest(final boolean addPrefix) { final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1); + final ObjectDistributionStrategy objectDistributionStrategy; + final int partitionId = 0; + final String prefixPattern = "topics/{{topic}}/partition={{partition}}/"; + String s3Prefix = ""; + if (addPrefix) { + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILEPATH; + s3Prefix = "topics/" + topicName + "/partition=" + partitionId + "/"; + } else { + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; + } + + final String fileNamePatternSeparator = "_"; + + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, objectDistributionStrategy, + addPrefix, s3Prefix, prefixPattern, fileNamePatternSeparator); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); @@ -165,11 +181,15 @@ void bytesTest(final TestInfo testInfo) { final List offsetKeys = new ArrayList<>(); // write 2 objects to s3 - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000")); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000")); - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001")); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001")); - offsetKeys.add(writeToS3(topicName, new byte[0], "00003")); + offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, new byte[0], "3", s3Prefix, "-")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -190,7 +210,9 @@ void bytesTest(final TestInfo testInfo) { @Test void avroTest(final TestInfo testInfo) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getAvroConfig(topicName, InputFormat.AVRO); + final boolean addPrefix = false; + final Map connectorConfig = getAvroConfig(topicName, InputFormat.AVRO, addPrefix, "", "", + ObjectDistributionStrategy.OBJECT_HASH); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); @@ -215,12 +237,14 @@ void avroTest(final TestInfo testInfo) throws IOException { final Set offsetKeys = new HashSet<>(); - offsetKeys.add(writeToS3(topicName, outputStream1, "00001")); - offsetKeys.add(writeToS3(topicName, outputStream2, "00001")); + final String s3Prefix = ""; + + offsetKeys.add(writeToS3(topicName, outputStream1, "1", s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream2, "1", s3Prefix, "-")); - offsetKeys.add(writeToS3(topicName, outputStream3, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream4, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream5, "00002")); + offsetKeys.add(writeToS3(topicName, outputStream3, "2", s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream4, "2", s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream5, "2", s3Prefix, "-")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -244,16 +268,28 @@ void avroTest(final TestInfo testInfo) throws IOException { connectRunner.getBootstrapServers()); } - @Test - void parquetTest(final TestInfo testInfo) throws IOException { + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void parquetTest(final boolean addPrefix) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); - final String partition = "00000"; - final String fileName = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topicName + "-" - + partition + "-" + System.currentTimeMillis() + ".txt"; + final String partition = "0"; + final ObjectDistributionStrategy objectDistributionStrategy; + final String prefixPattern = "bucket/topics/{{topic}}/partition/{{partition}}/"; + String s3Prefix = ""; + if (addPrefix) { + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILEPATH; + s3Prefix = "bucket/topics/" + topicName + "/partition/" + partition + "/"; + } else { + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; + } + + final String fileName = (StringUtils.isNotBlank(s3Prefix) ? s3Prefix : "") + topicName + "-" + partition + "-" + + System.currentTimeMillis() + ".txt"; final String name = "testuser"; - final Map connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET); + final Map connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET, addPrefix, s3Prefix, + prefixPattern, objectDistributionStrategy); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); final Path path = ContentUtils.getTmpFilePath(name); @@ -275,8 +311,11 @@ void parquetTest(final TestInfo testInfo) throws IOException { .containsExactlyInAnyOrderElementsOf(expectedRecordNames); } - private Map getAvroConfig(final String topicName, final InputFormat inputFormat) { - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 4); + private Map getAvroConfig(final String topicName, final InputFormat inputFormat, + final boolean addPrefix, final String s3Prefix, final String prefixPattern, + final ObjectDistributionStrategy objectDistributionStrategy) { + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 4, objectDistributionStrategy, + addPrefix, s3Prefix, prefixPattern, "-"); connectorConfig.put(INPUT_FORMAT_KEY, inputFormat.getValue()); connectorConfig.put(SCHEMA_REGISTRY_URL, schemaRegistry.getSchemaRegistryUrl()); connectorConfig.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); @@ -288,7 +327,8 @@ private Map getAvroConfig(final String topicName, final InputFor @Test void jsonTest(final TestInfo testInfo) { final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1); + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, + ObjectDistributionStrategy.PARTITION_IN_FILENAME, false, "", "", "-"); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue()); connectorConfig.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.json.JsonConverter"); @@ -301,7 +341,7 @@ void jsonTest(final TestInfo testInfo) { } final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8); - final String offsetKey = writeToS3(topicName, jsonBytes, "00001"); + final String offsetKey = writeToS3(topicName, jsonBytes, "1", "", "-"); // Poll Json messages from the Kafka topic and deserialize them final List records = IntegrationBase.consumeJsonMessages(topicName, 500, @@ -316,25 +356,36 @@ void jsonTest(final TestInfo testInfo) { verifyOffsetPositions(Map.of(offsetKey, 500), connectRunner.getBootstrapServers()); } - private Map getConfig(final String connectorName, final String topics, final int maxTasks) { - final Map config = new HashMap<>(basicS3ConnectorConfig()); + private Map getConfig(final String connectorName, final String topics, final int maxTasks, + final ObjectDistributionStrategy taskDistributionConfig, final boolean addPrefix, final String s3Prefix, + final String prefixPattern, final String fileNameSeparator) { + final Map config = new HashMap<>(basicS3ConnectorConfig(addPrefix, s3Prefix)); config.put("name", connectorName); config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put("tasks.max", String.valueOf(maxTasks)); + config.put(OBJECT_DISTRIBUTION_STRATEGY, taskDistributionConfig.value()); + config.put(FILE_NAME_TEMPLATE_CONFIG, + "{{topic}}" + fileNameSeparator + "{{partition}}" + fileNameSeparator + "{{start_offset}}"); + if (addPrefix) { + config.put(FILE_PATH_PREFIX_TEMPLATE_CONFIG, prefixPattern); + } return config; } - private static Map basicS3ConnectorConfig() { + private static Map basicS3ConnectorConfig(final boolean addPrefix, final String s3Prefix) { final Map config = new HashMap<>(); config.put("connector.class", AivenKafkaConnectS3SourceConnector.class.getName()); config.put(AWS_ACCESS_KEY_ID_CONFIG, S3_ACCESS_KEY_ID); config.put(AWS_SECRET_ACCESS_KEY_CONFIG, S3_SECRET_ACCESS_KEY); config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint); config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME); - config.put(AWS_S3_PREFIX_CONFIG, s3Prefix); + if (addPrefix) { + config.put(AWS_S3_PREFIX_CONFIG, s3Prefix); + } config.put(TARGET_TOPIC_PARTITIONS, "0,1"); + return config; } @@ -351,4 +402,12 @@ static void verifyOffsetPositions(final Map expectedRecords, fin }); } } + + String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId, + final String s3Prefix, final String separator) { + final String objectKey = (StringUtils.isNotBlank(s3Prefix) ? s3Prefix : "") + topicName + separator + + partitionId + separator + System.currentTimeMillis() + ".txt"; + writeToS3WithKey(objectKey, testDataBytes); + return OBJECT_KEY + SEPARATOR + objectKey; + } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 1bfc5558..9cdc6792 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -16,17 +16,26 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.ANY_FILENAME_PATTERN; + import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.regex.Pattern; import org.apache.kafka.connect.source.SourceRecord; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.common.source.AbstractSourceTask; import io.aiven.kafka.connect.common.source.input.Transformer; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.DistributionStrategy; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionInFilenameDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; import io.aiven.kafka.connect.s3.source.utils.OffsetManager; @@ -69,6 +78,9 @@ public class S3SourceTask extends AbstractSourceTask { private OffsetManager offsetManager; private S3SourceConfig s3SourceConfig; + private int taskId; + private Pattern filePattern; + public S3SourceTask() { super(LOGGER); } @@ -131,8 +143,8 @@ protected SourceCommonConfig configure(final Map props) { this.transformer = s3SourceConfig.getTransformer(); offsetManager = new OffsetManager(context, s3SourceConfig); awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); - setS3SourceRecordIterator( - new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient)); + setS3SourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, + awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId)); return s3SourceConfig; } @@ -173,4 +185,32 @@ protected void closeResources() { public Transformer getTransformer() { return transformer; } + + private DistributionStrategy initializeObjectDistributionStrategy() { + final ObjectDistributionStrategy objectDistributionStrategy = s3SourceConfig.getObjectDistributionStrategy(); + final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); + this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; + DistributionStrategy distributionStrategy; + + switch (objectDistributionStrategy) { + case PARTITION_IN_FILENAME : + this.filePattern = FilePatternUtils + .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks); + break; + case PARTITION_IN_FILEPATH : + this.filePattern = FilePatternUtils + .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilePathPrefixTemplateConfig() + + ANY_FILENAME_PATTERN); + distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks); + break; + default : + this.filePattern = FilePatternUtils + .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + distributionStrategy = new HashDistributionStrategy(maxTasks); + break; + } + + return distributionStrategy; + } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index ed460a50..903d50d8 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -29,8 +29,6 @@ import org.apache.commons.io.function.IOSupplier; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -44,7 +42,6 @@ */ public class AWSV2SourceClient { - private static final Logger LOGGER = LoggerFactory.getLogger(AWSV2SourceClient.class); public static final int PAGE_SIZE_FACTOR = 2; private final S3SourceConfig s3SourceConfig; private final S3Client s3Client; @@ -53,9 +50,6 @@ public class AWSV2SourceClient { private Predicate filterPredicate = s3Object -> s3Object.size() > 0; private final Set failedObjectKeys; - private final int taskId; - private final int maxTasks; - /** * @param s3SourceConfig * configuration for Source connector @@ -82,38 +76,6 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set this.s3Client = s3Client; this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.failedObjectKeys = new HashSet<>(failedObjectKeys); - - // TODO the code below should be configured in some sort of taks assignement method/process/call. - int maxTasks; - try { - final Object value = s3SourceConfig.originals().get("tasks.max"); - if (value == null) { - LOGGER.info("Setting tasks.max to 1"); - maxTasks = 1; - } else { - maxTasks = Integer.parseInt(value.toString()); - } - } catch (NumberFormatException e) { // NOPMD catch null pointer - LOGGER.warn("Invalid tasks.max: {}", e.getMessage()); - LOGGER.info("Setting tasks.max to 1"); - maxTasks = 1; - } - this.maxTasks = maxTasks; - int taskId; - try { - final Object value = s3SourceConfig.originals().get("task.id"); - if (value == null) { - LOGGER.info("Setting task.id to 0"); - taskId = 0; - } else { - taskId = Integer.parseInt(value.toString()) % maxTasks; - } - } catch (NumberFormatException e) { // NOPMD catch null pointer - LOGGER.warn("Invalid task.id: {}", e.getMessage()); - LOGGER.info("Setting task.id to 0"); - taskId = 0; - } - this.taskId = taskId; } /** @@ -130,6 +92,7 @@ private Stream getS3ObjectStream(final String startToken) { .prefix(StringUtils.defaultIfBlank(s3SourceConfig.getAwsS3Prefix(), null)) .startAfter(StringUtils.defaultIfBlank(startToken, null)) .build(); + setFilterPredicate(filterPredicate); return Stream.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { // This is called every time next() is called on the iterator. @@ -142,12 +105,7 @@ private Stream getS3ObjectStream(final String startToken) { return null; } - }) - .flatMap(response -> response.contents() - .stream() - .filter(filterPredicate) - .filter(objectSummary -> assignObjectToTask(objectSummary.key())) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key()))); + }).flatMap(response -> response.contents().stream().filter(filterPredicate)); } /** @@ -184,13 +142,8 @@ public void addFailedObjectKeys(final String objectKey) { this.failedObjectKeys.add(objectKey); } - public void setFilterPredicate(final Predicate predicate) { - filterPredicate = predicate; - } - - private boolean assignObjectToTask(final String objectKey) { - final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); - return taskAssignment == taskId; + public void setFilterPredicate(final Predicate basePredicate) { + this.filterPredicate = basePredicate.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key())); } public void shutdown() { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index bded51d1..1e15ac6a 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -16,6 +16,9 @@ package io.aiven.kafka.connect.s3.source.utils; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_TOPIC_KEY; + import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -29,6 +32,7 @@ import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; +import io.aiven.kafka.connect.common.source.task.DistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.apache.commons.collections4.IteratorUtils; @@ -39,11 +43,6 @@ * Parquet). */ public final class SourceRecordIterator implements Iterator { - public static final String PATTERN_TOPIC_KEY = "topicName"; - public static final String PATTERN_PARTITION_KEY = "partitionId"; - - public static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile("(?[^/]+?)-" - + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // topic-00001.txt public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L; private final OffsetManager offsetManager; @@ -59,25 +58,19 @@ public final class SourceRecordIterator implements Iterator { private String topic; private int partitionId; + private final DistributionStrategy distributionStrategy; + private final int taskId; + private final Iterator inner; private Iterator outer; + private final Pattern filePattern; - private final Predicate fileNamePredicate = s3Object -> { - - final Matcher fileMatcher = FILE_DEFAULT_PATTERN.matcher(s3Object.key()); - - if (fileMatcher.find()) { - // TODO move this from the SourceRecordIterator so that we can decouple it from S3 and make it API agnostic - topic = fileMatcher.group(PATTERN_TOPIC_KEY); - partitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); - return true; - } - return false; - }; + private final Predicate fileNamePredicate; public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, - final Transformer transformer, final AWSV2SourceClient sourceClient) { + final Transformer transformer, final AWSV2SourceClient sourceClient, + final DistributionStrategy distributionStrategy, final Pattern filePattern, final int taskId) { super(); this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; @@ -85,13 +78,61 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.transformer = transformer; this.sourceClient = sourceClient; + this.filePattern = filePattern; + this.distributionStrategy = distributionStrategy; + this.taskId = taskId; + + this.fileNamePredicate = predicateForFileAndTaskAssignment(); // call filters out bad file names and extracts topic/partition inner = IteratorUtils.filteredIterator(sourceClient.getS3ObjectIterator(null), - s3Object -> this.fileNamePredicate.test(s3Object)); + s3Object -> this.fileNamePredicate.test(s3Object.key())); outer = Collections.emptyIterator(); } + // public Predicate predicateForFileAndTaskAssignment() { + // return objectKey -> { + // if (!isPartOfTask(objectKey)) { + // return false; + // } + // + // return extractTopicAndPartition(objectKey); + // }; + // } + // + // private boolean isPartOfTask(final String objectKey) { + // return distributionStrategy.isPartOfTask(taskId, objectKey, filePattern); + // } + // + // private boolean extractTopicAndPartition(final String objectKey) { + // final Matcher fileMatcher = filePattern.matcher(objectKey); + // + // if (fileMatcher.find()) { + // topic = fileMatcher.group(PATTERN_TOPIC_KEY); + // partitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); + // return true; + // } + // return false; + // } + + public Predicate predicateForFileAndTaskAssignment() { + return objectKey -> { + if (!distributionStrategy.isPartOfTask(taskId, objectKey, filePattern)) { + return false; + } + + final Matcher fileMatcher = filePattern.matcher(objectKey); + + if (fileMatcher.find()) { + // TODO: Decouple topic and partition extraction from S3 specifics + topic = fileMatcher.group(PATTERN_TOPIC_KEY); + partitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); + return true; + } + return false; + }; + } + @Override public boolean hasNext() { while (!outer.hasNext() && inner.hasNext()) { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 944ccbfd..c915376c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -174,6 +174,7 @@ private void setBasicProperties() { properties.putIfAbsent("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); properties.putIfAbsent("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); properties.putIfAbsent("tasks.max", "1"); + properties.put("task.id", "1"); properties.putIfAbsent("connector.class", AivenKafkaConnectS3SourceConnector.class.getName()); properties.putIfAbsent(TARGET_TOPIC_PARTITIONS, "0,1"); properties.putIfAbsent(TARGET_TOPICS, "testtopic"); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index af9b679f..fb9af7bd 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.s3.source.utils; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_TOPIC_KEY; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyMap; @@ -35,6 +37,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.kafka.connect.data.SchemaAndValue; @@ -44,10 +49,14 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import software.amazon.awssdk.services.s3.model.S3Object; final class SourceRecordIteratorTest { @@ -78,35 +87,38 @@ void testIteratorProcessesS3Objects() throws Exception { mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + final Pattern filePattern = mock(Pattern.class); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Collections.emptyIterator()); Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient); + mockSourceApiClient, new HashDistributionStrategy(1), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); assertThat(iterator.hasNext()).isFalse(); + mockPatternMatcher(filePattern); final S3Object obj = S3Object.builder().key(key).build(); final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient); + iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, + new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator).hasNext(); + assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); - assertThat(iterator).isExhausted(); } } @Test void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { - final String key = "topic-00001-abc123.txt"; final S3Object s3Object = S3Object.builder().key(key).build(); // With ByteArrayTransformer try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); + final Pattern filePattern = mock(Pattern.class); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); @@ -120,10 +132,11 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { .thenReturn(Collections.singletonList(key).listIterator()); when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + mockPatternMatcher(filePattern); // should skip if any records were produced by source record iterator. final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient); + mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isFalse(); verify(mockSourceApiClient, never()).getObject(any()); verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); @@ -132,6 +145,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { // With AvroTransformer try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); + final Pattern filePattern = mock(Pattern.class); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); mockTransformer = mock(AvroTransformer.class); when(mockSourceApiClient.getListOfObjectKeys(any())) @@ -139,13 +153,14 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + mockPatternMatcher(filePattern); when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient); + mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isTrue(); iterator.next(); @@ -153,4 +168,58 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { } } + @ParameterizedTest + @CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" }) + void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, + final String objectKey) { + + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + final Pattern filePattern = mock(Pattern.class); + + mockPatternMatcher(filePattern); + + final S3Object obj = S3Object.builder().key(objectKey).build(); + + final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); + when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); + when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); + final Predicate stringPredicate = iterator.predicateForFileAndTaskAssignment(); + // Assert + assertThat(stringPredicate).accepts(objectKey); + } + + @ParameterizedTest + @CsvSource({ "4, 1, topic1-2-0", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", + "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) + void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, + final String objectKey) { + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + final Pattern filePattern = mock(Pattern.class); + + mockPatternMatcher(filePattern); + + final S3Object obj = S3Object.builder().key(objectKey).build(); + + final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); + when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); + when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); + final Predicate stringPredicate = iterator.predicateForFileAndTaskAssignment(); + // Assert + assertThat(stringPredicate.test(objectKey)).as("Predicate should accept the objectKey: " + objectKey).isFalse(); + } + + private static void mockPatternMatcher(final Pattern filePattern) { + final Matcher fileMatcher = mock(Matcher.class); + when(filePattern.matcher(any())).thenReturn(fileMatcher); + when(fileMatcher.find()).thenReturn(true); + when(fileMatcher.group(PATTERN_TOPIC_KEY)).thenReturn("testtopic"); + when(fileMatcher.group(PATTERN_PARTITION_KEY)).thenReturn("0"); + } + }