diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java index 546c0c4c..e9592079 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java @@ -22,10 +22,11 @@ import org.apache.kafka.common.config.ConfigException; -import org.apache.commons.lang3.StringUtils; +import io.aiven.kafka.connect.common.source.task.Context; -public final class FilePatternUtils { +import org.apache.commons.lang3.StringUtils; +public final class FilePatternUtils { public static final String PATTERN_PARTITION_KEY = "partition"; public static final String PATTERN_TOPIC_KEY = "topic"; public static final String START_OFFSET_PATTERN = "{{start_offset}}"; @@ -39,14 +40,25 @@ public final class FilePatternUtils { public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; - private FilePatternUtils() { - // hidden + final Pattern pattern; + final Optional targetTopic; + + public FilePatternUtils(final String pattern, final String targetTopic) { + this.pattern = configurePattern(pattern); + this.targetTopic = Optional.ofNullable(targetTopic); } - public static Pattern configurePattern(final String expectedSourceNameFormat) { - if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { - throw new ConfigException(String.format( - "Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", - expectedSourceNameFormat)); + + /** + * + * @param expectedSourceNameFormat + * This is a string in the expected compatible format which will allow object name or keys to have unique + * information such as partition number, topic name, offset and timestamp information. + * @return A pattern which is configured to allow extraction of the key information from object names and keys. + */ + private Pattern configurePattern(final String expectedSourceNameFormat) { + if (expectedSourceNameFormat == null) { + throw new ConfigException( + "Source name format is missing please configure the expected source to include the partition pattern."); } // Build REGEX Matcher String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN); @@ -62,26 +74,55 @@ public static Pattern configurePattern(final String expectedSourceNameFormat) { } } - public static Optional getTopic(final Pattern filePattern, final String sourceName) { - return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY)); + public Optional> process(final K sourceName) { + if (fileMatches(sourceName.toString())) { + final Optional topic = getTopic(sourceName.toString()); + final Optional partition = getPartitionId(sourceName.toString()); + return Optional.of(new Context(topic.orElse(null), partition.orElse(null), sourceName)); + } + return Optional.empty(); + } - public static Optional getPartitionId(final Pattern filePattern, final String sourceName) { - return matchPattern(filePattern, sourceName).flatMap(matcher -> { + private boolean fileMatches(final String sourceName) { + return matchPattern(sourceName).isPresent(); + } + + private Optional getTopic(final String sourceName) { + if (targetTopic.isPresent()) { + return targetTopic; + } + + return matchPattern(sourceName).flatMap(matcher -> { + try { + // TODO check why this worked before without the try catch + return Optional.of(matcher.group(PATTERN_TOPIC_KEY)); + } catch (IllegalArgumentException ex) { + // It is possible that when checking for the group it does not match and returns an + // illegalArgumentException + return Optional.empty(); + } + }); + } + + private Optional getPartitionId(final String sourceName) { + return matchPattern(sourceName).flatMap(matcher -> { try { return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY))); - } catch (NumberFormatException e) { + } catch (IllegalArgumentException e) { + // It is possible that when checking for the group it does not match and returns an + // illegalStateException, Number format exception is also covered by this in this case. return Optional.empty(); } }); + } - private static Optional matchPattern(final Pattern filePattern, final String sourceName) { - if (filePattern == null || sourceName == null) { + private Optional matchPattern(final String sourceName) { + if (sourceName == null) { throw new IllegalArgumentException("filePattern and sourceName must not be null"); } - - final Matcher matcher = filePattern.matcher(sourceName); + final Matcher matcher = pattern.matcher(sourceName); return matcher.find() ? Optional.of(matcher) : Optional.empty(); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java new file mode 100644 index 00000000..7e6db8ef --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.task; + +import java.util.Optional; + +public class Context { + + private String topic; + private Integer partition; + private K storageKey; + + public Context(final String topic, final Integer partition, final K storageKey) { + this.topic = topic; + this.partition = partition; + this.storageKey = storageKey; + } + + public Optional getTopic() { + return Optional.ofNullable(topic); + } + + public void setTopic(final String topic) { + this.topic = topic; + } + + public Optional getPartition() { + return Optional.ofNullable(partition); + } + + public void setPartition(final Integer partition) { + this.partition = partition; + } + + public Optional getStorageKey() { + return Optional.ofNullable(storageKey); + } + + public void setStorageKey(final K storageKey) { + this.storageKey = storageKey; + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java index 8d370c68..ec72ef6b 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.task; -import java.util.regex.Pattern; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; /** * An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files) @@ -27,18 +27,30 @@ * sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer * workers than tasks, and they will be assigned the remaining tasks as work completes. */ -public interface DistributionStrategy { +public abstract class DistributionStrategy { + protected int maxTasks; + protected final static int UNDEFINED = -1; + @SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW", justification = "constructor throws if max tasks is less then 0") + public DistributionStrategy(final int maxTasks) { + isValidMaxTask(maxTasks); + this.maxTasks = maxTasks; + } + + private static void isValidMaxTask(final int maxTasks) { + if (maxTasks <= 0) { + throw new IllegalArgumentException("tasks.max must be set to a positive number and at least 1."); + } + } + /** * Check if the object should be processed by the task with the given {@code taskId}. Any single object should be * assigned deterministically to a single taskId. * - * @param taskId - * a task ID, usually for the currently running task - * @param valueToBeEvaluated - * The value to be evaluated to determine if it should be processed by the task. - * @return true if the task should process the object, false if it should not. + * @param ctx + * This is the context which contains optional values for the partition, topic and storage key name + * @return the taskId which this particular task should be assigned to. */ - boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern); + public abstract int getTaskFor(Context ctx); /** * When a connector receives a reconfigure event this method should be called to ensure that the distribution @@ -47,5 +59,8 @@ public interface DistributionStrategy { * @param maxTasks * The maximum number of tasks created for the Connector */ - void configureDistributionStrategy(int maxTasks); + public void configureDistributionStrategy(final int maxTasks) { + isValidMaxTask(maxTasks); + this.maxTasks = maxTasks; + } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java index 4928f30d..06dbc18a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java @@ -16,8 +16,6 @@ package io.aiven.kafka.connect.common.source.task; -import java.util.regex.Pattern; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,31 +26,28 @@ * This is well-suited to use cases where the order of events between records from objects is not important, especially * when ingesting files into Kafka that were not previously created by a supported cloud storage Sink. */ -public final class HashDistributionStrategy implements DistributionStrategy { +public final class HashDistributionStrategy extends DistributionStrategy { private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class); - private int maxTasks; public HashDistributionStrategy(final int maxTasks) { - configureDistributionStrategy(maxTasks); + super(maxTasks); } + /** + * + * @param ctx + * is the Context which contains the storage key and optional values for the patition and topic + * @return the task id this context should be assigned to or -1 if it is indeterminable + */ @Override - public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) { - if (filenameToBeEvaluated == null) { + public int getTaskFor(final Context ctx) { + if (ctx.getStorageKey().isEmpty()) { LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); - return false; + return UNDEFINED; } - final int taskAssignment = Math.floorMod(filenameToBeEvaluated.hashCode(), maxTasks); + // floor mod returns the remainder of a division so will start at 0 and move up // tasks start at 0 so there should be no issue. - return taskAssignment == taskId; - } - - @Override - public void configureDistributionStrategy(final int maxTasks) { - this.maxTasks = maxTasks; + return Math.floorMod(ctx.getStorageKey().hashCode(), maxTasks); } - public void setMaxTasks(final int maxTasks) { - this.maxTasks = maxTasks; - } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java index 25f22dfc..89262445 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java @@ -17,9 +17,6 @@ package io.aiven.kafka.connect.common.source.task; import java.util.Optional; -import java.util.regex.Pattern; - -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,53 +29,26 @@ * {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed * within a single task. */ -public final class PartitionDistributionStrategy implements DistributionStrategy { +public final class PartitionDistributionStrategy extends DistributionStrategy { private final static Logger LOG = LoggerFactory.getLogger(PartitionDistributionStrategy.class); - private int maxTasks; public PartitionDistributionStrategy(final int maxTasks) { - this.maxTasks = maxTasks; + super(maxTasks); } /** * - * @param sourceNameToBeEvaluated - * is the filename/table name of the source for the connector. - * @return Predicate to confirm if the given source name matches + * @param ctx + * is the Context which contains the storage key and optional values for the patition and topic + * @return the task id this context should be assigned to or -1 if it is indeterminable */ @Override - public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated, final Pattern filePattern) { - if (sourceNameToBeEvaluated == null) { - LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); - return false; - } - final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, - sourceNameToBeEvaluated); - - if (optionalPartitionId.isPresent()) { - return optionalPartitionId.get() < maxTasks - ? taskMatchesPartition(taskId, optionalPartitionId.get()) - : taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks); + public int getTaskFor(final Context ctx) { + final Optional partitionId = ctx.getPartition(); + if (partitionId.isPresent()) { + return partitionId.get() % maxTasks; } - LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated); - return false; - } - - boolean taskMatchesPartition(final int taskId, final int partitionId) { - // The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this - // will break. - return taskId == partitionId; - } - - /** - * When a connector reconfiguration event is received this method should be called to ensure the correct strategy is - * being implemented by the connector. - * - * @param maxTasks - * maximum number of configured tasks for this connector - */ - @Override - public void configureDistributionStrategy(final int maxTasks) { - this.maxTasks = maxTasks; + LOG.warn("Unable to find the partition from this file name {}", ctx.getStorageKey()); + return UNDEFINED; } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java new file mode 100644 index 00000000..c399f0ff --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java @@ -0,0 +1,44 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.input.utils; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.util.Optional; + +import io.aiven.kafka.connect.common.source.task.Context; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class FilePatternUtilsTest { + + @ParameterizedTest + @CsvSource({ "{{topic}}-1.txt,'topic', logs-1.txt, topic", "{{topic}}-{{partition}}.txt,, logs-1.txt, logs", + "{{topic}}-{{partition}}.txt,, logs2-1.txt, logs2", + "{{topic}}-{{partition}}.txt,anomaly, logs2-1.txt, anomaly" }) + void checkTopicDistribution(final String expectedSourceFormat, final String configuredTopic, + final String sourceName, final String expectedTopic) { + + final FilePatternUtils utils = new FilePatternUtils<>(expectedSourceFormat, configuredTopic); + final Optional> ctx = utils.process(sourceName); + assertThat(ctx.isPresent()).isTrue(); + assertThat(ctx.get().getTopic().isPresent()).isTrue(); + assertThat(ctx.get().getTopic().get()).isEqualTo(expectedTopic); + } + +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java index 50ef7396..3c8adb3d 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -30,46 +31,71 @@ final class HashDistributionStrategyTest { @ParameterizedTest @CsvSource({ "logs-0-0002.txt", "logs-1-0002.txt", "logs-2-0002.txt", "logs-3-0002.txt", "logs-4-0002.txt", - "logs-5-0002.txt", "logs-6-0002.txt", "logs-7-0002.txt", "logs-8-0002.txt", "logs-9-0002.txt", "key-0.txt", - "logs-1-0002.txt", "key-0002.txt", "logs-3-0002.txt", "key-0002.txt", "logs-5-0002.txt", "value-6-0002.txt", - "logs-7-0002.txt", "anImage8-0002.png", - "reallylongfilenamecreatedonS3tohisdesomedata and alsohassome spaces.txt" }) + "logs-5-0002.txt", "logs-6-0002.txt", "logs-7-0002.txt", "logs-8-0002.txt", "logs-9-0002.txt", + "logs-1-0002.txt", "logs-3-0002.txt", "logs-5-0002.txt", "value-6-0002.txt", "logs-7-0002.txt" }) void hashDistributionExactlyOnce(final String path) { final int maxTaskId = 10; final DistributionStrategy taskDistribution = new HashDistributionStrategy(maxTaskId); - final List results = new ArrayList<>(); + final Context ctx = getContext("{{topic}}-{{partition}}-{{start_offset}}", path, null); + + final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTaskId; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } @ParameterizedTest @CsvSource({ "logs-0-0002.txt", "logs-1-0002.txt", "logs-2-0002.txt", "logs-3-0002.txt", "logs-4-0002.txt", - "logs-5-0002.txt", "logs-6-0002.txt", "logs-7-0002.txt", "logs-8-0002.txt", "logs-9-0002.txt", "key-0.txt", - "logs-1-0002.txt", "key-0002.txt", "logs-3-0002.txt", "key-0002.txt", "logs-5-0002.txt", "value-6-0002.txt", - "logs-7-0002.txt", "anImage8-0002.png", - "reallylongfilenamecreatedonS3tohisdesomedata and alsohassome spaces.txt" }) + "logs-5-0002.txt", "logs-6-0002.txt", "logs-7-0002.txt", "logs-8-0002.txt", "logs-9-0002.txt", + "logs-1-0002.txt", "logs-3-0002.txt", "logs-5-0002.txt", "value-6-0002.txt", "logs-7-0002.txt" }) void hashDistributionExactlyOnceWithReconfigureEvent(final String path) { int maxTasks = 10; final DistributionStrategy taskDistribution = new HashDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); + final Context ctx = getContext("{{topic}}-{{partition}}-{{start_offset}}", path, null); + + final List results = new ArrayList<>(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + results.clear(); + maxTasks = 5; + taskDistribution.configureDistributionStrategy(maxTasks); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + } + + @ParameterizedTest + @CsvSource({ "key-0.txt", "key-0002.txt", "key-0002.txt", "anImage8-0002.png", + "reallylongfilenamecreatedonS3tohisdesomedata and alsohassome spaces.txt" }) + void hashDistributionExactlyOnceWithReconfigureEventAndMatchAllExpectedSource(final String path) { + int maxTasks = 10; + final DistributionStrategy taskDistribution = new HashDistributionStrategy(maxTasks); + final Context ctx = getContext(".*", path, "topic1"); + + final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); results.clear(); maxTasks = 5; taskDistribution.configureDistributionStrategy(maxTasks); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + } + + private Context getContext(final String expectedSourceName, final String filename, + final String targetTopic) { + final FilePatternUtils utils = new FilePatternUtils<>(expectedSourceName, targetTopic); + final Optional> ctx = utils.process(filename); + assertThat(ctx.isPresent()).isTrue(); + // Hash distribution can have an empty context can have an empty context + return ctx.get(); } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java index c62fbb9b..85ad8e11 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java @@ -17,12 +17,10 @@ package io.aiven.kafka.connect.common.source.task; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import java.util.ArrayList; import java.util.List; - -import org.apache.kafka.common.config.ConfigException; +import java.util.Optional; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -35,18 +33,8 @@ final class PartitionDistributionStrategyTest { @Test void partitionInFileNameDefaultAivenS3Sink() { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(2); - assertThat(taskDistribution.isPartOfTask(1, "logs-1-00112.gz", - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))).isTrue(); - } - - @Test - void partitionLocationNotSetExpectException() { - assertThatThrownBy(() -> new PartitionDistributionStrategy(2).isPartOfTask(1, "", - FilePatternUtils.configurePattern("logs-23--"))) - .isInstanceOf(ConfigException.class) - .hasMessage( - "Source name format logs-23-- missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); - + final Context ctx = getContext("{{topic}}-{{partition}}-{{start_offset}}", "logs-1-00112.gz"); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(1); } @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") @@ -64,8 +52,8 @@ void partitionLocationNotSetExpectException() { void testPartitionFileNamesAndExpectedOutcomes(final String configuredFilenamePattern, final String filename) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(1); // This test is testing the filename matching not the task allocation. - assertThat(taskDistribution.isPartOfTask(0, filename, - FilePatternUtils.configurePattern(configuredFilenamePattern))).isTrue(); + final Context ctx = getContext(configuredFilenamePattern, filename); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(0); } @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") @@ -73,13 +61,12 @@ void testPartitionFileNamesAndExpectedOutcomes(final String configuredFilenamePa "no-seperator-in-date-partition-offset-{{timestamp}}-{{partition}}-{{start_offset}},no-seperator-in-date-partition-offset-202420220201100112.gz", "logs-2024-{{timestamp}}-{{partition}}-{{start_offset}},logs-20201-1-00112.gz", "logs-2024-{{timestamp}}{{partition}}-{{start_offset}},logs-202011-00112.gz", - "logs-2024-{{timestamp}}{{partition}}-{{start_offset}}, ", "logs-2023-{{partition}}-{{start_offset}},logs-2023-one-00112.gz" }) void expectFalseOnMalformedFilenames(final String configuredFilenamePattern, final String filename) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(1); // This test is testing the filename matching not the task allocation. - assertThat(taskDistribution.isPartOfTask(0, filename, - FilePatternUtils.configurePattern(configuredFilenamePattern))).isFalse(); + final Optional> ctx = getOptionalContext(configuredFilenamePattern, filename); + assertThat(ctx).isEmpty(); } @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1}, Filename: {1}") @@ -91,9 +78,8 @@ void expectFalseOnMalformedFilenames(final String configuredFilenamePattern, fin void checkCorrectDistributionAcrossTasksOnFileName(final int taskId, final int maxTasks, final String path) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("logs-{{partition}}-{{start_offset}}"))).isTrue(); + final Context ctx = getContext("logs-{{partition}}-{{start_offset}}", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(taskId); } @ParameterizedTest(name = "[{index}] MaxTasks: {0}, Filename: {1}") @@ -104,13 +90,13 @@ void checkCorrectDistributionAcrossTasksOnFileName(final int taskId, final int m void filenameDistributionExactlyOnceDistribution(final int maxTasks, final String path) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); + final List results = new ArrayList<>(); + final Context ctx = getContext("logs-{{partition}}.txt", path); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("logs-{{partition}}.txt"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + // TODO Double check this, they should all match the first task. + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } @ParameterizedTest(name = "[{index}] MaxTasks: {0}, TaskId: {1}, Filename: {2}") @@ -121,179 +107,125 @@ void filenameDistributionExactlyOnceDistribution(final int maxTasks, final Strin void filenameDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, final int maxTaskAfterReConfig, final String path) { - final String expectedSourceNameFormat = "logs-{{partition}}.txt"; final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); + final Context ctx = getContext("logs-{{partition}}.txt", path); + + final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); results.clear(); - for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } @ParameterizedTest - @CsvSource({ - "{topic}}-1.txt,'Source name format {topic}}-1.txt missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", - " ,'Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", - "empty-pattern,'Source name format empty-pattern missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'" }) - void malformedFilenameSetup(final String expectedSourceFormat, final String expectedErrorMessage) { - final int maxTaskId = 1; - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", - FilePatternUtils.configurePattern(expectedSourceFormat))).isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); - } - - @Test - void errorExpectedNullGivenForSourceNameFormat() { - final int maxTaskId = 1; - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", - FilePatternUtils.configurePattern(null))).isInstanceOf(ConfigException.class) - .hasMessage("Source name format null missing partition pattern {{partition}} please configure" - + " the expected source to include the partition pattern."); - } - - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "0,1,topics/logs/partition=5/logs+5+0002.txt,true", - "0,4,topics/logs/partition=5/logs+5+0002.txt,false", "1,4,topics/logs/partition=5/logs+5+0002.txt,true", - "0,3,topics/logs/partition=5/logs+5+0002.txt,false", "0,5,topics/logs/partition=5/logs+5+0002.txt,true", - "2,3,topics/logs/partition=5/logs+5+0002.txt,true" }) - void withLeadingStringPartitionNamingConvention(final int taskId, final int maxTasks, final String path, - final boolean expectedResult) { + @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", + "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", + "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", + "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) + void partitionPathDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, + final int maxTaskAfterReConfig, final String path) { - final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final String expectedSourceNameFormat = "topics/{{topic}}/{{partition}}/.*$"; + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final Context ctx = getContext(expectedSourceNameFormat, path); + final List results = new ArrayList<>(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/partition={{partition}}/.*$"))) - .isEqualTo(expectedResult); + results.clear(); + for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "0,1,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "0,4,bucket/topics/topic-1/5/logs+5+0002.txt,false", "1,4,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "0,3,bucket/topics/topic-1/5/logs+5+0002.txt,false", "0,5,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "2,3,bucket/topics/topic-1/5/logs+5+0002.txt,true" }) - void partitionInPathConvention(final int taskId, final int maxTaskId, final String path, - final boolean expectedResult) { + @ParameterizedTest + @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.log", "10,topics/logs/2/logs-0002.txt", + "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", + "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", + "10,topics/logs/9/logs-0002.txt" }) + void partitionPathDistributionExactlyOnceDistribution(final int maxTasks, final String path) { + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final List results = new ArrayList<>(); + final Context ctx = getContext("topics/{{topic}}/{{partition}}/.*$", path); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + } + @Test + void expectEmptyContextOnNonIntPartitionSuppliedAsNoMatchOccurs() { + final int maxTaskId = 1; + final String path = "topics/logs/one/test-001.txt"; final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("bucket/topics/{{topic}}/{{partition}}/.*$"))) - .isEqualTo(expectedResult); + final Optional> ctx = getOptionalContext("topics/{{topic}}/{{partition}}/.*$", path); + assertThat(ctx).isEmpty(); } + @ParameterizedTest(name = "[{index}] Filename: {2}") + @CsvSource({ "topcs/logs/0/logs-0002.txt", "topics/logs/1", "S3/logs/2/logs-0002.txt", + "topicss/log/3/logs-0002.txt", "prod/logs/4/logs-0002.txt", "misspelt/logs/5/logs-0002.txt", + "test/logs/6/logs-0002.txt", "random/logs/7/logs-0002.txt", "DEV/logs/8/logs-0002.txt", + "poll/logs/9/logs-0002.txt" }) + void expectNoMatchOnUnconfiguredPaths(final String path) { + final Optional> ctx = getOptionalContext("topics/{{topic}}/{{partition}}/.*$", path); + assertThat(ctx).isEmpty(); + } @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") @CsvSource({ "0,10,topics/logs/0/logs-0002.txt", "1,10,topics/logs/1/logs-0002.txt", "2,10,topics/logs/2/logs-0002.txt", "3,10,topics/logs/3/logs-0002.txt", "4,10,topics/logs/4/logs-0002.txt", "5,10,topics/logs/5/logs-0002.txt", "6,10,topics/logs/6/logs-0002.txt", "7,10,topics/logs/7/logs-0002.txt", "8,10,topics/logs/8/logs-0002.txt", "9,10,topics/logs/9/logs-0002.txt" }) void checkCorrectDistributionAcrossTasks(final int taskId, final int maxTaskId, final String path) { - final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isTrue(); + final Context ctx = getContext("topics/{{topic}}/{{partition}}/.*$", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(taskId); } - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "1,10,topcs/logs/0/logs-0002.txt", "2,10,topics/logs/1", "3,10,S3/logs/2/logs-0002.txt", - "4,10,topics/log/3/logs-0002.txt", "5,10,prod/logs/4/logs-0002.txt", "6,10,misspelt/logs/5/logs-0002.txt", - "7,10,test/logs/6/logs-0002.txt", "8,10,random/logs/7/logs-0002.txt", "9,10,DEV/logs/8/logs-0002.txt", - "10,10,poll/logs/9/logs-0002.txt" }) - void expectNoMatchOnUnconfiguredPaths(final int taskId, final int maxTaskId, final String path) { + @ParameterizedTest(name = "[{index}] MaxTasks: {1} Filename: {2}") + @CsvSource({ "1,bucket/topics/topic-1/5/logs+5+0002.txt,0", "4,bucket/topics/topic-1/5/logs+5+0002.txt,1", + "4,bucket/topics/topic-1/5/logs+5+0002.txt,1", "3,bucket/topics/topic-1/5/logs+5+0002.txt,2", + "5,bucket/topics/topic-1/5/logs+5+0002.txt,0", "3,bucket/topics/topic-1/5/logs+5+0002.txt,2" }) + void partitionInPathConvention(final int maxTaskId, final String path, final int expectedResult) { final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isFalse(); + final Context ctx = getContext("bucket/topics/{{topic}}/{{partition}}/.*$", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(expectedResult); } - @Test - void expectExceptionOnNonIntPartitionSupplied() { - final int taskId = 1; - final int maxTaskId = 1; - final String path = "topics/logs/one/test-001.txt"; - - final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isFalse(); - } + @ParameterizedTest(name = "[{index}] MaxTasks: {1} Filename: {2}") + @CsvSource({ "1,topics/logs/partition=5/logs+5+0002.txt,0", "4,topics/logs/partition=5/logs+5+0002.txt,1", + "4,topics/logs/partition=5/logs+5+0002.txt,1", "3,topics/logs/partition=5/logs+5+0002.txt,2", + "5,topics/logs/partition=5/logs+5+0002.txt,0", "3,topics/logs/partition=5/logs+5+0002.txt,2" }) + void withLeadingStringPartitionNamingConvention(final int maxTasks, final String path, final int expectedResult) { - @Test - void malformedRegexSetup() { - final int maxTaskId = 1; - - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", - FilePatternUtils.configurePattern("topics/{{topic}}/"))).isInstanceOf(ConfigException.class) - .hasMessage( - "Source name format topics/{{topic}}/ missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); - } - - @ParameterizedTest - @CsvSource({ - ",Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", - "@adsfs,Source name format @adsfs missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", - "empty-path,Source name format empty-path missing partition pattern {{partition}} please configure the expected source to include the partition pattern." }) - void malformedPathSetup(final String expectedPathFormat, final String expectedErrorMessage) { - final int maxTaskId = 1; + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final Context ctx = getContext("topics/{{topic}}/partition={{partition}}/.*$", path); - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, expectedPathFormat, - FilePatternUtils.configurePattern(expectedPathFormat))).isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(expectedResult); } - @ParameterizedTest - @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.log", "10,topics/logs/2/logs-0002.txt", - "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", - "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", - "10,topics/logs/9/logs-0002.txt" }) - void partitionPathDistributionExactlyOnceDistribution(final int maxTasks, final String path) { - final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + public static Context getContext(final String configuredFilenamePattern, final String filename) { + final Optional> ctx = getOptionalContext(configuredFilenamePattern, filename); + assertThat(ctx.isPresent()).isTrue(); + return ctx.get(); } - @ParameterizedTest - @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", - "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", - "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", - "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) - void partitionPathDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, - final int maxTaskAfterReConfig, final String path) { - - final String expectedSourceNameFormat = "topics/{{topic}}/{{partition}}/.*$"; - final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); - taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); - - results.clear(); - for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); + public static Optional> getOptionalContext(final String configuredFilenamePattern, + final String filename) { + final FilePatternUtils utils = new FilePatternUtils<>(configuredFilenamePattern, null); + return utils.process(filename); } } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 5d95d6eb..140b1910 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; @@ -46,8 +47,6 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.TransformerFactory; -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; -import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; @@ -121,10 +120,14 @@ private Map getConfig(final String topics, final int maxTasks) { @Test void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final var topicName = IntegrationBase.topicName(testInfo); - final Map configData = getConfig(topicName, 1); + final int maxTasks = 1; + final int taskId = 0; + final Map configData = getConfig(topicName, maxTasks); configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); - + configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); + configData.put("task.id", String.valueOf(taskId)); + configData.put("tasks.max", String.valueOf(maxTasks)); final String testData1 = "Hello, Kafka Connect S3 Source! object 1"; final String testData2 = "Hello, Kafka Connect S3 Source! object 2"; @@ -153,8 +156,7 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient, new HashDistributionStrategy(1), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); + TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient); final HashSet seenKeys = new HashSet<>(); while (sourceRecordIterator.hasNext()) { @@ -177,6 +179,9 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { configData.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); configData.put(AVRO_VALUE_SERIALIZER, "io.confluent.kafka.serializers.KafkaAvroSerializer"); + configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); + configData.put("task.id", String.valueOf(taskId)); + configData.put("tasks.max", String.valueOf(maxTasks)); // Define Avro schema final String schemaJson = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestRecord\",\n" @@ -219,9 +224,7 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient, - new HashDistributionStrategy(maxTasks), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId); + TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient); final HashSet seenKeys = new HashSet<>(); final Map> seenRecords = new HashMap<>(); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 3ed3fdaf..5466435a 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -19,18 +19,12 @@ import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.regex.Pattern; import org.apache.kafka.connect.source.SourceRecord; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.AbstractSourceTask; import io.aiven.kafka.connect.common.source.input.Transformer; -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; -import io.aiven.kafka.connect.common.source.task.DistributionStrategy; -import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; -import io.aiven.kafka.connect.common.source.task.PartitionDistributionStrategy; -import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; import io.aiven.kafka.connect.s3.source.utils.OffsetManager; @@ -71,9 +65,6 @@ public class S3SourceTask extends AbstractSourceTask { private OffsetManager offsetManager; private S3SourceConfig s3SourceConfig; - private int taskId; - private Pattern filePattern; - public S3SourceTask() { super(LOGGER); } @@ -136,8 +127,8 @@ protected SourceCommonConfig configure(final Map props) { this.transformer = s3SourceConfig.getTransformer(); offsetManager = new OffsetManager(context, s3SourceConfig); awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig); - setS3SourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, - awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId)); + setS3SourceRecordIterator( + new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient)); return s3SourceConfig; } @@ -179,22 +170,4 @@ public Transformer getTransformer() { return transformer; } - private DistributionStrategy initializeObjectDistributionStrategy() { - final ObjectDistributionStrategy objectDistributionStrategy = s3SourceConfig.getObjectDistributionStrategy(); - final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); - this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; - DistributionStrategy distributionStrategy; - - if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILENAME) { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); - distributionStrategy = new PartitionDistributionStrategy(maxTasks); - } else { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); - distributionStrategy = new HashDistributionStrategy(maxTasks); - } - - return distributionStrategy; - } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3Key.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3Key.java new file mode 100644 index 00000000..e63e954d --- /dev/null +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3Key.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import java.util.Objects; + +public final class S3Key { + String storageKey; + + public S3Key(final String storageKey) { + this.storageKey = storageKey; + } + + /** + * Returns a string representation of the storage key + * + * @return String representation of the storage key + */ + @Override + public String toString() { + return storageKey; + } + + /** + * Implements and returns the HashCode for the S3 storagekey + * + * @return storageKey hashcode + */ + @Override + public int hashCode() { + return Objects.hashCode(storageKey); + } + + @Override + public boolean equals(final Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + final S3Key s3Key = (S3Key) other; + return Objects.equals(storageKey, s3Key.storageKey); + } +} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 820be20a..657b582d 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -19,9 +19,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.function.Function; -import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.kafka.connect.data.SchemaAndValue; @@ -29,7 +29,11 @@ import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.Context; import io.aiven.kafka.connect.common.source.task.DistributionStrategy; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import software.amazon.awssdk.services.s3.model.S3Object; @@ -51,20 +55,18 @@ public final class SourceRecordIterator implements Iterator { // At which point it will work for al our integrations. private final AWSV2SourceClient sourceClient; - private String topic; - private int partitionId; + private Context context; private final DistributionStrategy distributionStrategy; - private final int taskId; + private int taskId; private final Iterator inner; private Iterator outer; - private final Pattern filePattern; + private FilePatternUtils filePattern; public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, - final Transformer transformer, final AWSV2SourceClient sourceClient, - final DistributionStrategy distributionStrategy, final Pattern filePattern, final int taskId) { + final Transformer transformer, final AWSV2SourceClient sourceClient) { super(); this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; @@ -72,13 +74,12 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.transformer = transformer; this.sourceClient = sourceClient; - this.filePattern = filePattern; - this.distributionStrategy = distributionStrategy; - this.taskId = taskId; + + this.distributionStrategy = initializeObjectDistributionStrategy(); // Initialize predicates sourceClient.addPredicate(this::isFileMatchingPattern); - sourceClient.addPredicate(this::isFileAssignedToTask); + sourceClient.addPredicate(ass -> isFileAssignedToTask(context, taskId)); // call filters out bad file names and extracts topic/partition inner = sourceClient.getS3ObjectIterator(null); @@ -86,19 +87,16 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan } public boolean isFileMatchingPattern(final S3Object s3Object) { - final Optional optionalTopic = FilePatternUtils.getTopic(filePattern, s3Object.key()); - final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, s3Object.key()); - - if (optionalTopic.isPresent() && optionalPartitionId.isPresent()) { - topic = optionalTopic.get(); - partitionId = optionalPartitionId.get(); + final Optional> optionalCtx = filePattern.process(new S3Key(s3Object.key())); + if (optionalCtx.isPresent()) { + context = optionalCtx.get(); return true; } return false; } - public boolean isFileAssignedToTask(final S3Object s3Object) { - return distributionStrategy.isPartOfTask(taskId, s3Object.key(), filePattern); + public boolean isFileAssignedToTask(final Context ctx, final int taskId) { + return taskId == distributionStrategy.getTaskFor(ctx); } @Override @@ -128,7 +126,8 @@ public void remove() { */ private Stream convert(final S3Object s3Object) { - final Map partitionMap = ConnectUtils.getPartitionMap(topic, partitionId, bucketName); + final Map partitionMap = ConnectUtils.getPartitionMap(context.getTopic().get(), + context.getPartition().get(), bucketName); final long recordCount = offsetManager.recordsProcessedForObjectKey(partitionMap, s3Object.key()); // Optimizing without reading stream again. @@ -136,13 +135,31 @@ private Stream convert(final S3Object s3Object) { return Stream.empty(); } - final SchemaAndValue keyData = transformer.getKeyData(s3Object.key(), topic, s3SourceConfig); + final SchemaAndValue keyData = transformer.getKeyData(s3Object.key(), context.getTopic().get(), s3SourceConfig); return transformer - .getRecords(sourceClient.getObject(s3Object.key()), topic, partitionId, s3SourceConfig, recordCount) + .getRecords(sourceClient.getObject(s3Object.key()), context.getTopic().get(), + context.getPartition().get(), s3SourceConfig, recordCount) .map(new Mapper(partitionMap, recordCount, keyData, s3Object.key())); } + private DistributionStrategy initializeObjectDistributionStrategy() { + final ObjectDistributionStrategy objectDistributionStrategy = s3SourceConfig.getObjectDistributionStrategy(); + final int maxTasks = Integer.parseInt(s3SourceConfig.originalsStrings().get("tasks.max")); + this.taskId = Integer.parseInt(s3SourceConfig.originalsStrings().get("task.id")) % maxTasks; + + if (Objects.requireNonNull(objectDistributionStrategy) == ObjectDistributionStrategy.PARTITION_IN_FILENAME) { + this.filePattern = new FilePatternUtils<>( + s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString(), + s3SourceConfig.getTargetTopics()); + return new PartitionDistributionStrategy(maxTasks); + } + this.filePattern = new FilePatternUtils<>( + s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString(), + s3SourceConfig.getTargetTopics()); + return new HashDistributionStrategy(maxTasks); + } + /** * maps the data from the @{link Transformer} stream to an S3SourceRecord given all the additional data required. */ @@ -175,7 +192,8 @@ public Mapper(final Map partitionMap, final long recordCount, fi @Override public S3SourceRecord apply(final SchemaAndValue valueData) { recordCount++; - return new S3SourceRecord(partitionMap, recordCount, topic, partitionId, objectKey, keyData, valueData); + return new S3SourceRecord(partitionMap, recordCount, context.getTopic().get(), context.getPartition().get(), + objectKey, keyData, valueData); } } } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index f7559ddf..d6816da1 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,8 +16,7 @@ package io.aiven.kafka.connect.s3.source.utils; -import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; -import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_TOPIC_KEY; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyMap; @@ -31,140 +30,165 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.source.input.AvroTransformer; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; -import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; +import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; +import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; final class SourceRecordIteratorTest { + public static final String TASK_ID = "task.id"; + public static final String MAX_TASKS = "tasks.max"; private S3SourceConfig mockConfig; private OffsetManager mockOffsetManager; private Transformer mockTransformer; + private FileNameFragment mockFileNameFrag; - private AWSV2SourceClient mockSourceApiClient; + private AWSV2SourceClient sourceApiClient; @BeforeEach public void setUp() { mockConfig = mock(S3SourceConfig.class); mockOffsetManager = mock(OffsetManager.class); mockTransformer = mock(Transformer.class); - mockSourceApiClient = mock(AWSV2SourceClient.class); + mockFileNameFrag = mock(FileNameFragment.class); + } + + private S3SourceConfig getConfig(final Map data) { + final Map defaults = new HashMap<>(); + defaults.put(AWS_S3_BUCKET_NAME_CONFIG, "bucket-name"); + defaults.putAll(data); + return new S3SourceConfig(defaults); + } + + private void mockSourceConfig(final S3SourceConfig s3SourceConfig, final String filePattern, final int taskId, final int maxTasks,final String targetTopic ){ + when(s3SourceConfig.getObjectDistributionStrategy()).thenReturn(ObjectDistributionStrategy.OBJECT_HASH); + final Map map = new HashMap<>(); + map.put(TASK_ID,String.valueOf(taskId)); + map.put(MAX_TASKS,String.valueOf(maxTasks)); + when(s3SourceConfig.originalsStrings()).thenReturn(map); + when(s3SourceConfig.getS3FileNameFragment()).thenReturn(mockFileNameFrag); + when(mockFileNameFrag.getFilenameTemplate()).thenReturn(Template.of(filePattern)); + when(mockConfig.getTargetTopics()).thenReturn(targetTopic); } @Test void testIteratorProcessesS3Objects() throws Exception { final String key = "topic-00001-abc123.txt"; + final String filePattern = "{{topic}}-{{partition}}"; + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final S3ClientBuilder builder = new S3ClientBuilder(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); - // Mock InputStream - try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); - - mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Collections.emptyIterator()); - Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(1), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); + mockSourceConfig(mockConfig, filePattern, 0, 1, null); - assertThat(iterator.hasNext()).isFalse(); - mockPatternMatcher(filePattern); + final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); + assertThat(iterator).isExhausted(); - final S3Object obj = S3Object.builder().key(key).build(); + builder.reset().addObject(key, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + final Iterator s3ObjectIterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, - new HashDistributionStrategy(1), filePattern, 0); + assertThat(s3ObjectIterator).hasNext(); + assertThat(s3ObjectIterator.next()).isNotNull(); + assertThat(s3ObjectIterator).isExhausted(); - assertThat(iterator.hasNext()).isTrue(); - assertThat(iterator.next()).isNotNull(); - } } @Test void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; - final S3Object s3Object = S3Object.builder().key(key).build(); + final String filePattern = "{{topic}}-{{partition}}"; + + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final S3ClientBuilder builder = new S3ClientBuilder(); + + builder.reset().addObject(key, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + + mockSourceConfig(mockConfig, filePattern, 0, 1, null); // With ByteArrayTransformer - try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { - when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); - final Pattern filePattern = mock(Pattern.class); - - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); - - mockTransformer = mock(ByteArrayTransformer.class); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Stream.of(SchemaAndValue.NULL)); - - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); - mockPatternMatcher(filePattern); - - // should skip if any records were produced by source record iterator. - final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator.hasNext()).isFalse(); - verify(mockSourceApiClient, never()).getObject(any()); - verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); - } + + mockTransformer = mock(ByteArrayTransformer.class); + when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + .thenReturn(Stream.of(SchemaAndValue.NULL)); + + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + + when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) + .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + + // should skip if any records were produced by source record iterator. + final Iterator byteArrayIterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); + + assertThat(byteArrayIterator).isExhausted(); + + verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); // With AvroTransformer - try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { - when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); - final Pattern filePattern = mock(Pattern.class); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); - mockTransformer = mock(AvroTransformer.class); - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); - mockPatternMatcher(filePattern); - - when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); - - final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator.hasNext()).isFalse(); - - verify(mockTransformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); - } + + mockTransformer = mock(AvroTransformer.class); + + when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) + .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + + when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); + when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); + + final Iterator avroIterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); + assertThat(avroIterator).isExhausted(); + + verify(mockTransformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + } @ParameterizedTest @@ -174,52 +198,123 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final in mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); - - mockPatternMatcher(filePattern); + final String key = "topic-00001-abc123.txt"; + final String filePattern = "{{partition}}"; + final String topic = "topic"; + final FilePatternUtils filePatternUtils = new FilePatternUtils<>(filePattern, topic); + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final S3ClientBuilder builder = new S3ClientBuilder(); + mockSourceConfig(mockConfig, filePattern, taskId, maxTasks, topic); final S3Object obj = S3Object.builder().key(objectKey).build(); - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + // Build s3 Client + builder.reset().addObject(key, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); - final Predicate s3ObjectPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) - && iterator.isFileAssignedToTask(s3Object); + sourceApiClient); + final Predicate s3ObjectPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) && iterator + .isFileAssignedToTask(filePatternUtils.process(new S3Key(s3Object.key())).orElseThrow(), taskId); // Assert assertThat(s3ObjectPredicate).accepts(obj); } @ParameterizedTest - @CsvSource({ "4, 1, topic1-2-0", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", + @CsvSource({ "4, 1, topic1-2-0", "4, 3,key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, final String objectKey) { mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); - - mockPatternMatcher(filePattern); + final String filePattern = "{{partition}}"; + final String topic = "topic"; + mockSourceConfig(mockConfig, filePattern, taskId, maxTasks, topic); + final S3ClientBuilder builder = new S3ClientBuilder(); + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final FilePatternUtils filePatternUtils = new FilePatternUtils<>(filePattern, topic); final S3Object obj = S3Object.builder().key(objectKey).build(); - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + builder.reset().addObject(objectKey, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); - final Predicate stringPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) - && iterator.isFileAssignedToTask(s3Object); + sourceApiClient); + + final Predicate stringPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) && iterator + .isFileAssignedToTask(filePatternUtils.process(new S3Key(s3Object.key())).orElseThrow(), taskId); // Assert assertThat(stringPredicate.test(obj)).as("Predicate should accept the objectKey: " + objectKey).isFalse(); } - private static void mockPatternMatcher(final Pattern filePattern) { - final Matcher fileMatcher = mock(Matcher.class); - when(filePattern.matcher(any())).thenReturn(fileMatcher); - when(fileMatcher.find()).thenReturn(true); - when(fileMatcher.group(PATTERN_TOPIC_KEY)).thenReturn("testtopic"); - when(fileMatcher.group(PATTERN_PARTITION_KEY)).thenReturn("0"); + @Test + void testS3ClientIteratorMock() { + final S3ClientBuilder builder = new S3ClientBuilder(); + builder.addObject("Key", "value"); + final S3Client client = builder.build(); // NOPMD is asking to close client is done so on line 254 + final ListObjectsV2Response response = client.listObjectsV2(ListObjectsV2Request.builder().build()); + client.close(); + assertThat(response.contents()).isNotEmpty(); + + sourceApiClient = new AWSV2SourceClient(builder.build(), getConfig(Collections.emptyMap())); + final Iterator iterator = sourceApiClient.getS3ObjectIterator(null); + assertThat(iterator.hasNext()).isTrue(); + + } + + static class S3ClientBuilder { + Queue, Map>> blocks = new LinkedList<>(); + List objects = new ArrayList<>(); + Map data = new HashMap<>(); + + public S3ClientBuilder addObject(final String key, final byte[] data) { + objects.add(S3Object.builder().key(key).size((long) data.length).build()); + this.data.put(key, data); + return this; + } + + public S3ClientBuilder endOfBlock() { + blocks.add(Pair.of(objects, data)); + return reset(); + } + + public S3ClientBuilder reset() { + objects = new ArrayList<>(); + data = new HashMap<>(); + return this; + } + + public S3ClientBuilder addObject(final String key, final String data) { + return addObject(key, data.getBytes(StandardCharsets.UTF_8)); + } + + private ResponseBytes getResponse(final String key) { + return ResponseBytes.fromByteArray(new byte[0], data.get(key)); + } + + private ListObjectsV2Response dequeueData() { + if (blocks.isEmpty()) { + objects = Collections.emptyList(); + data = Collections.emptyMap(); + } else { + final Pair, Map> pair = blocks.remove(); + objects = pair.getLeft(); + data = pair.getRight(); + } + return ListObjectsV2Response.builder().contents(objects).isTruncated(false).build(); + } + + public S3Client build() { + if (!objects.isEmpty()) { + endOfBlock(); + } + final S3Client result = mock(S3Client.class); + when(result.listObjectsV2(any(ListObjectsV2Request.class))).thenAnswer(env -> dequeueData()); + when(result.listObjectsV2(any(Consumer.class))).thenAnswer(env -> dequeueData()); + when(result.getObjectAsBytes(any(GetObjectRequest.class))) + .thenAnswer(env -> getResponse(env.getArgument(0, GetObjectRequest.class).key())); + return result; + } } }