diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java index 546c0c4c..b1d9dcc6 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java @@ -17,15 +17,20 @@ package io.aiven.kafka.connect.common.source.input.utils; import java.util.Optional; +import java.util.OptionalInt; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.kafka.common.config.ConfigException; +import io.aiven.kafka.connect.common.source.task.Context; + import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class FilePatternUtils { - + private static final Logger LOGGER = LoggerFactory.getLogger(FilePatternUtils.class); public static final String PATTERN_PARTITION_KEY = "partition"; public static final String PATTERN_TOPIC_KEY = "topic"; public static final String START_OFFSET_PATTERN = "{{start_offset}}"; @@ -39,10 +44,12 @@ public final class FilePatternUtils { public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; - private FilePatternUtils() { - // hidden + final Pattern pattern; + + public FilePatternUtils(final String pattern) { + this.pattern = configurePattern(pattern); } - public static Pattern configurePattern(final String expectedSourceNameFormat) { + private Pattern configurePattern(final String expectedSourceNameFormat) { if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { throw new ConfigException(String.format( "Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", @@ -62,26 +69,46 @@ public static Pattern configurePattern(final String expectedSourceNameFormat) { } } - public static Optional getTopic(final Pattern filePattern, final String sourceName) { - return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY)); + public Optional process(final String sourceName) { + final Optional topic = getTopic(sourceName); + final OptionalInt partition = getPartitionId(sourceName); + return Optional.of(new Context(topic.orElse(null), partition, sourceName)); + } - public static Optional getPartitionId(final Pattern filePattern, final String sourceName) { - return matchPattern(filePattern, sourceName).flatMap(matcher -> { + private Optional getTopic(final String sourceName) { + return matchPattern(sourceName).flatMap(matcher -> { try { - return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY))); - } catch (NumberFormatException e) { + // TODO check why this worked before without the try catch + return Optional.of(matcher.group(PATTERN_TOPIC_KEY)); + } catch (IllegalArgumentException ex) { + // It is possible that when checking for the group it does not match and returns an + // illegalArgumentException return Optional.empty(); } }); } - private static Optional matchPattern(final Pattern filePattern, final String sourceName) { - if (filePattern == null || sourceName == null) { - throw new IllegalArgumentException("filePattern and sourceName must not be null"); + private OptionalInt getPartitionId(final String sourceName) { + final Optional parseIntMatcher = matchPattern(sourceName); + if (parseIntMatcher.isPresent()) { + try { + return OptionalInt.of(Integer.parseInt(parseIntMatcher.get().group(PATTERN_PARTITION_KEY))); + } catch (IllegalArgumentException e) { + // It is possible that when checking for the group it does not match and returns an + // illegalStateException, Number format exception is also covered by this in this case. + LOGGER.debug("No partition found in this entry {}", sourceName); + } } + return OptionalInt.empty(); + + } - final Matcher matcher = filePattern.matcher(sourceName); + private Optional matchPattern(final String sourceName) { + if (sourceName == null) { + throw new IllegalArgumentException("filePattern and sourceName must not be null"); + } + final Matcher matcher = pattern.matcher(sourceName); return matcher.find() ? Optional.of(matcher) : Optional.empty(); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java new file mode 100644 index 00000000..20180760 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java @@ -0,0 +1,58 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.task; + +import java.util.Optional; +import java.util.OptionalInt; + +public class Context { + + private String topic; + private OptionalInt partition; + private String storageKey; + + public Context(final String topic, final OptionalInt partition, final String storageKey) { + this.topic = topic; + this.partition = partition; + this.storageKey = storageKey; + } + + public Optional getTopic() { + return Optional.ofNullable(topic); + } + + public void setTopic(final String topic) { + this.topic = topic; + } + + public OptionalInt getPartition() { + return partition; + } + + public void setPartition(final OptionalInt partition) { + this.partition = partition; + } + + public Optional getStorageKey() { + return Optional.ofNullable(storageKey); + } + + public void setStorageKey(final String storageKey) { + this.storageKey = storageKey; + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java index 8d370c68..9c1769d2 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.task; -import java.util.regex.Pattern; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; /** * An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files) @@ -27,18 +27,24 @@ * sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer * workers than tasks, and they will be assigned the remaining tasks as work completes. */ -public interface DistributionStrategy { +public abstract class DistributionStrategy { + protected int maxTasks; + @SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW", justification = "constructor throws if max tasks is less then 0") + public DistributionStrategy(final int maxTasks) { + if (maxTasks <= 0) { + throw new IllegalArgumentException("tasks.max must be set to a positive number and at least 1."); + } + this.maxTasks = maxTasks; + } /** * Check if the object should be processed by the task with the given {@code taskId}. Any single object should be * assigned deterministically to a single taskId. * - * @param taskId - * a task ID, usually for the currently running task - * @param valueToBeEvaluated - * The value to be evaluated to determine if it should be processed by the task. - * @return true if the task should process the object, false if it should not. + * @param ctx + * This is the context which contains optional values for the partition, topic and storage key name + * @return the taskId which this particular task should be assigned to. */ - boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern); + public abstract int getTaskFor(Context ctx); /** * When a connector receives a reconfigure event this method should be called to ensure that the distribution @@ -47,5 +53,7 @@ public interface DistributionStrategy { * @param maxTasks * The maximum number of tasks created for the Connector */ - void configureDistributionStrategy(int maxTasks); + public void configureDistributionStrategy(final int maxTasks) { + this.maxTasks = maxTasks; + } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java index 4928f30d..1c60651a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java @@ -16,8 +16,6 @@ package io.aiven.kafka.connect.common.source.task; -import java.util.regex.Pattern; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,31 +26,28 @@ * This is well-suited to use cases where the order of events between records from objects is not important, especially * when ingesting files into Kafka that were not previously created by a supported cloud storage Sink. */ -public final class HashDistributionStrategy implements DistributionStrategy { +public final class HashDistributionStrategy extends DistributionStrategy { private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class); - private int maxTasks; public HashDistributionStrategy(final int maxTasks) { - configureDistributionStrategy(maxTasks); + super(maxTasks); } + /** + * + * @param ctx + * is the Context which contains the storage key and optional values for the patition and topic + * @return the task id this context should be assigned to or -1 if it is indeterminable + */ @Override - public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) { - if (filenameToBeEvaluated == null) { + public int getTaskFor(final Context ctx) { + if (ctx.getStorageKey().isEmpty()) { LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); - return false; + return -1; } - final int taskAssignment = Math.floorMod(filenameToBeEvaluated.hashCode(), maxTasks); + // floor mod returns the remainder of a division so will start at 0 and move up // tasks start at 0 so there should be no issue. - return taskAssignment == taskId; - } - - @Override - public void configureDistributionStrategy(final int maxTasks) { - this.maxTasks = maxTasks; + return Math.floorMod(ctx.getStorageKey().hashCode(), maxTasks); } - public void setMaxTasks(final int maxTasks) { - this.maxTasks = maxTasks; - } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java index 25f22dfc..3c1acb90 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java @@ -16,11 +16,6 @@ package io.aiven.kafka.connect.common.source.task; -import java.util.Optional; -import java.util.regex.Pattern; - -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,36 +27,26 @@ * {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed * within a single task. */ -public final class PartitionDistributionStrategy implements DistributionStrategy { +public final class PartitionDistributionStrategy extends DistributionStrategy { private final static Logger LOG = LoggerFactory.getLogger(PartitionDistributionStrategy.class); - private int maxTasks; public PartitionDistributionStrategy(final int maxTasks) { - this.maxTasks = maxTasks; + super(maxTasks); } /** * - * @param sourceNameToBeEvaluated - * is the filename/table name of the source for the connector. - * @return Predicate to confirm if the given source name matches + * @param ctx + * is the Context which contains the storage key and optional values for the patition and topic + * @return the task id this context should be assigned to or -1 if it is indeterminable */ @Override - public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated, final Pattern filePattern) { - if (sourceNameToBeEvaluated == null) { - LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); - return false; + public int getTaskFor(final Context ctx) { + if (ctx.getPartition().isPresent()) { + return ctx.getPartition().getAsInt() % maxTasks; } - final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, - sourceNameToBeEvaluated); - - if (optionalPartitionId.isPresent()) { - return optionalPartitionId.get() < maxTasks - ? taskMatchesPartition(taskId, optionalPartitionId.get()) - : taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks); - } - LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated); - return false; + LOG.warn("Unable to find the partition from this file name {}", ctx.getStorageKey()); + return -1; } boolean taskMatchesPartition(final int taskId, final int partitionId) { @@ -69,16 +54,4 @@ boolean taskMatchesPartition(final int taskId, final int partitionId) { // will break. return taskId == partitionId; } - - /** - * When a connector reconfiguration event is received this method should be called to ensure the correct strategy is - * being implemented by the connector. - * - * @param maxTasks - * maximum number of configured tasks for this connector - */ - @Override - public void configureDistributionStrategy(final int maxTasks) { - this.maxTasks = maxTasks; - } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java new file mode 100644 index 00000000..752dad93 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.input.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import java.util.Optional; + +import org.apache.kafka.common.config.ConfigException; + +import io.aiven.kafka.connect.common.source.task.Context; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class FilePatternUtilsTest { + + @Test + void partitionLocationNotSetExpectException() { + assertThatThrownBy(() -> new FilePatternUtils("logs-23--")) + .isInstanceOf(ConfigException.class) + .hasMessage( + "Source name format logs-23-- missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); + + } + + @Test + void expectFalseOnMalformedFilenames() { + // This test is testing the filename matching not the task allocation. + assertThatThrownBy(() -> getContext("logs-2024-{{timestamp}}{{partition}}-{{start_offset}}", null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("filePattern and sourceName must not be null"); + } + + @ParameterizedTest + @CsvSource({ + "{topic}}-1.txt,'Source name format {topic}}-1.txt missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", + " ,'Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", + "empty-pattern,'Source name format empty-pattern missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'" }) + void malformedFilenameSetup(final String expectedSourceFormat, final String expectedErrorMessage) { + assertThatThrownBy(() -> new FilePatternUtils(expectedSourceFormat)).isInstanceOf(ConfigException.class) + .hasMessage(expectedErrorMessage); + } + + @Test + void errorExpectedNullGivenForSourceNameFormat() { + assertThatThrownBy(() -> new FilePatternUtils(null)).isInstanceOf(ConfigException.class) + .hasMessage("Source name format null missing partition pattern {{partition}} please configure" + + " the expected source to include the partition pattern."); + } + + @Test + void malformedRegexSetup() { + assertThatThrownBy(() -> new FilePatternUtils("topics/{{topic}}/")).isInstanceOf(ConfigException.class) + .hasMessage( + "Source name format topics/{{topic}}/ missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); + } + + @ParameterizedTest + @CsvSource({ + ",Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", + "@adsfs,Source name format @adsfs missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", + "empty-path,Source name format empty-path missing partition pattern {{partition}} please configure the expected source to include the partition pattern." }) + void malformedPathSetup(final String expectedPathFormat, final String expectedErrorMessage) { + assertThatThrownBy(() -> new FilePatternUtils(expectedPathFormat)).isInstanceOf(ConfigException.class) + .hasMessage(expectedErrorMessage); + } + + public static Context getContext(final String configuredFilenamePattern, final String filename) { + final FilePatternUtils utils = new FilePatternUtils(configuredFilenamePattern); + final Optional ctx = utils.process(filename); + assertThat(ctx.isPresent()).isTrue(); + return ctx.get(); + } +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java index 50ef7396..da451585 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -37,13 +38,13 @@ final class HashDistributionStrategyTest { void hashDistributionExactlyOnce(final String path) { final int maxTaskId = 10; final DistributionStrategy taskDistribution = new HashDistributionStrategy(maxTaskId); - final List results = new ArrayList<>(); + final Context ctx = getContext(path); + + final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTaskId; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } @ParameterizedTest @@ -55,21 +56,27 @@ void hashDistributionExactlyOnce(final String path) { void hashDistributionExactlyOnceWithReconfigureEvent(final String path) { int maxTasks = 10; final DistributionStrategy taskDistribution = new HashDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); + final Context ctx = getContext(path); + + final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); results.clear(); maxTasks = 5; taskDistribution.configureDistributionStrategy(maxTasks); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + } + + private Context getContext(final String filename) { + final FilePatternUtils utils = new FilePatternUtils("{{topic}}-{{partition}}-{{start_offset}}"); + final Optional ctx = utils.process(filename); + assertThat(ctx.isPresent()).isTrue(); + // Hash distribution can have an empty context can have an empty context + return ctx.get(); } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java index c62fbb9b..2eb85768 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java @@ -17,12 +17,10 @@ package io.aiven.kafka.connect.common.source.task; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import java.util.ArrayList; import java.util.List; - -import org.apache.kafka.common.config.ConfigException; +import java.util.Optional; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -35,18 +33,8 @@ final class PartitionDistributionStrategyTest { @Test void partitionInFileNameDefaultAivenS3Sink() { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(2); - assertThat(taskDistribution.isPartOfTask(1, "logs-1-00112.gz", - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))).isTrue(); - } - - @Test - void partitionLocationNotSetExpectException() { - assertThatThrownBy(() -> new PartitionDistributionStrategy(2).isPartOfTask(1, "", - FilePatternUtils.configurePattern("logs-23--"))) - .isInstanceOf(ConfigException.class) - .hasMessage( - "Source name format logs-23-- missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); - + final Context ctx = getContext("{{topic}}-{{partition}}-{{start_offset}}", "logs-1-00112.gz"); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(1); } @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") @@ -64,8 +52,8 @@ void partitionLocationNotSetExpectException() { void testPartitionFileNamesAndExpectedOutcomes(final String configuredFilenamePattern, final String filename) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(1); // This test is testing the filename matching not the task allocation. - assertThat(taskDistribution.isPartOfTask(0, filename, - FilePatternUtils.configurePattern(configuredFilenamePattern))).isTrue(); + final Context ctx = getContext(configuredFilenamePattern, filename); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(0); } @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") @@ -73,13 +61,12 @@ void testPartitionFileNamesAndExpectedOutcomes(final String configuredFilenamePa "no-seperator-in-date-partition-offset-{{timestamp}}-{{partition}}-{{start_offset}},no-seperator-in-date-partition-offset-202420220201100112.gz", "logs-2024-{{timestamp}}-{{partition}}-{{start_offset}},logs-20201-1-00112.gz", "logs-2024-{{timestamp}}{{partition}}-{{start_offset}},logs-202011-00112.gz", - "logs-2024-{{timestamp}}{{partition}}-{{start_offset}}, ", "logs-2023-{{partition}}-{{start_offset}},logs-2023-one-00112.gz" }) void expectFalseOnMalformedFilenames(final String configuredFilenamePattern, final String filename) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(1); // This test is testing the filename matching not the task allocation. - assertThat(taskDistribution.isPartOfTask(0, filename, - FilePatternUtils.configurePattern(configuredFilenamePattern))).isFalse(); + final Context ctx = getContext(configuredFilenamePattern, filename); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(-1); } @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1}, Filename: {1}") @@ -91,9 +78,8 @@ void expectFalseOnMalformedFilenames(final String configuredFilenamePattern, fin void checkCorrectDistributionAcrossTasksOnFileName(final int taskId, final int maxTasks, final String path) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("logs-{{partition}}-{{start_offset}}"))).isTrue(); + final Context ctx = getContext("logs-{{partition}}-{{start_offset}}", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(taskId); } @ParameterizedTest(name = "[{index}] MaxTasks: {0}, Filename: {1}") @@ -104,13 +90,13 @@ void checkCorrectDistributionAcrossTasksOnFileName(final int taskId, final int m void filenameDistributionExactlyOnceDistribution(final int maxTasks, final String path) { final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); + final List results = new ArrayList<>(); + final Context ctx = getContext("logs-{{partition}}.txt", path); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("logs-{{partition}}.txt"))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + // TODO Double check this, they should all match the first task. + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } @ParameterizedTest(name = "[{index}] MaxTasks: {0}, TaskId: {1}, Filename: {2}") @@ -121,179 +107,122 @@ void filenameDistributionExactlyOnceDistribution(final int maxTasks, final Strin void filenameDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, final int maxTaskAfterReConfig, final String path) { - final String expectedSourceNameFormat = "logs-{{partition}}.txt"; final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); + final Context ctx = getContext("logs-{{partition}}.txt", path); + + final List results = new ArrayList<>(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); + + results.clear(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); + } + + @ParameterizedTest + @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", + "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", + "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", + "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) + void partitionPathDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, + final int maxTaskAfterReConfig, final String path) { + + final String expectedSourceNameFormat = "topics/{{topic}}/{{partition}}/.*$"; + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final Context ctx = getContext(expectedSourceNameFormat, path); + final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); results.clear(); for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); + results.add(taskDistribution.getTaskFor(ctx)); } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } @ParameterizedTest - @CsvSource({ - "{topic}}-1.txt,'Source name format {topic}}-1.txt missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", - " ,'Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", - "empty-pattern,'Source name format empty-pattern missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'" }) - void malformedFilenameSetup(final String expectedSourceFormat, final String expectedErrorMessage) { - final int maxTaskId = 1; - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", - FilePatternUtils.configurePattern(expectedSourceFormat))).isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); + @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.log", "10,topics/logs/2/logs-0002.txt", + "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", + "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", + "10,topics/logs/9/logs-0002.txt" }) + void partitionPathDistributionExactlyOnceDistribution(final int maxTasks, final String path) { + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final List results = new ArrayList<>(); + final Context ctx = getContext("topics/{{topic}}/{{partition}}/.*$", path); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.getTaskFor(ctx)); + } + assertThat(results).allMatch(i -> i == taskDistribution.getTaskFor(ctx)); } @Test - void errorExpectedNullGivenForSourceNameFormat() { + void expectExceptionOnNonIntPartitionSupplied() { final int maxTaskId = 1; - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", - FilePatternUtils.configurePattern(null))).isInstanceOf(ConfigException.class) - .hasMessage("Source name format null missing partition pattern {{partition}} please configure" - + " the expected source to include the partition pattern."); - } - - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "0,1,topics/logs/partition=5/logs+5+0002.txt,true", - "0,4,topics/logs/partition=5/logs+5+0002.txt,false", "1,4,topics/logs/partition=5/logs+5+0002.txt,true", - "0,3,topics/logs/partition=5/logs+5+0002.txt,false", "0,5,topics/logs/partition=5/logs+5+0002.txt,true", - "2,3,topics/logs/partition=5/logs+5+0002.txt,true" }) - void withLeadingStringPartitionNamingConvention(final int taskId, final int maxTasks, final String path, - final boolean expectedResult) { - - final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/partition={{partition}}/.*$"))) - .isEqualTo(expectedResult); + final String path = "topics/logs/one/test-001.txt"; + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); + final Context ctx = getContext("topics/{{topic}}/{{partition}}/.*$", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(-1); } @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "0,1,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "0,4,bucket/topics/topic-1/5/logs+5+0002.txt,false", "1,4,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "0,3,bucket/topics/topic-1/5/logs+5+0002.txt,false", "0,5,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "2,3,bucket/topics/topic-1/5/logs+5+0002.txt,true" }) - void partitionInPathConvention(final int taskId, final int maxTaskId, final String path, - final boolean expectedResult) { + @CsvSource({ "1,10,topcs/logs/0/logs-0002.txt", "2,10,topics/logs/1", "3,10,S3/logs/2/logs-0002.txt", + "4,10,topicss/log/3/logs-0002.txt", "5,10,prod/logs/4/logs-0002.txt", "6,10,misspelt/logs/5/logs-0002.txt", + "7,10,test/logs/6/logs-0002.txt", "8,10,random/logs/7/logs-0002.txt", "9,10,DEV/logs/8/logs-0002.txt", + "10,10,poll/logs/9/logs-0002.txt" }) + void expectNoMatchOnUnconfiguredPaths(final int taskId, final int maxTaskId, final String path) { final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("bucket/topics/{{topic}}/{{partition}}/.*$"))) - .isEqualTo(expectedResult); + final Context ctx = getContext("topics/{{topic}}/{{partition}}/.*$", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(-1); } - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") @CsvSource({ "0,10,topics/logs/0/logs-0002.txt", "1,10,topics/logs/1/logs-0002.txt", "2,10,topics/logs/2/logs-0002.txt", "3,10,topics/logs/3/logs-0002.txt", "4,10,topics/logs/4/logs-0002.txt", "5,10,topics/logs/5/logs-0002.txt", "6,10,topics/logs/6/logs-0002.txt", "7,10,topics/logs/7/logs-0002.txt", "8,10,topics/logs/8/logs-0002.txt", "9,10,topics/logs/9/logs-0002.txt" }) void checkCorrectDistributionAcrossTasks(final int taskId, final int maxTaskId, final String path) { - final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isTrue(); + final Context ctx = getContext("topics/{{topic}}/{{partition}}/.*$", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(taskId); } - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "1,10,topcs/logs/0/logs-0002.txt", "2,10,topics/logs/1", "3,10,S3/logs/2/logs-0002.txt", - "4,10,topics/log/3/logs-0002.txt", "5,10,prod/logs/4/logs-0002.txt", "6,10,misspelt/logs/5/logs-0002.txt", - "7,10,test/logs/6/logs-0002.txt", "8,10,random/logs/7/logs-0002.txt", "9,10,DEV/logs/8/logs-0002.txt", - "10,10,poll/logs/9/logs-0002.txt" }) - void expectNoMatchOnUnconfiguredPaths(final int taskId, final int maxTaskId, final String path) { + @ParameterizedTest(name = "[{index}] MaxTasks: {1} Filename: {2}") + @CsvSource({ "1,bucket/topics/topic-1/5/logs+5+0002.txt,0", "4,bucket/topics/topic-1/5/logs+5+0002.txt,1", + "4,bucket/topics/topic-1/5/logs+5+0002.txt,1", "3,bucket/topics/topic-1/5/logs+5+0002.txt,2", + "5,bucket/topics/topic-1/5/logs+5+0002.txt,0", "3,bucket/topics/topic-1/5/logs+5+0002.txt,2" }) + void partitionInPathConvention(final int maxTaskId, final String path, final int expectedResult) { final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isFalse(); + final Context ctx = getContext("bucket/topics/{{topic}}/{{partition}}/.*$", path); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(expectedResult); } - @Test - void expectExceptionOnNonIntPartitionSupplied() { - final int taskId = 1; - final int maxTaskId = 1; - final String path = "topics/logs/one/test-001.txt"; + @ParameterizedTest(name = "[{index}] MaxTasks: {1} Filename: {2}") + @CsvSource({ "1,topics/logs/partition=5/logs+5+0002.txt,0", "4,topics/logs/partition=5/logs+5+0002.txt,1", + "4,topics/logs/partition=5/logs+5+0002.txt,1", "3,topics/logs/partition=5/logs+5+0002.txt,2", + "5,topics/logs/partition=5/logs+5+0002.txt,0", "3,topics/logs/partition=5/logs+5+0002.txt,2" }) + void withLeadingStringPartitionNamingConvention(final int maxTasks, final String path, final int expectedResult) { - final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); - assertThat(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isFalse(); - } - - @Test - void malformedRegexSetup() { - final int maxTaskId = 1; - - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", - FilePatternUtils.configurePattern("topics/{{topic}}/"))).isInstanceOf(ConfigException.class) - .hasMessage( - "Source name format topics/{{topic}}/ missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); - } - - @ParameterizedTest - @CsvSource({ - ",Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", - "@adsfs,Source name format @adsfs missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", - "empty-path,Source name format empty-path missing partition pattern {{partition}} please configure the expected source to include the partition pattern." }) - void malformedPathSetup(final String expectedPathFormat, final String expectedErrorMessage) { - final int maxTaskId = 1; - - assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, expectedPathFormat, - FilePatternUtils.configurePattern(expectedPathFormat))).isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); - } + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final Context ctx = getContext("topics/{{topic}}/partition={{partition}}/.*$", path); - @ParameterizedTest - @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.log", "10,topics/logs/2/logs-0002.txt", - "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", - "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", - "10,topics/logs/9/logs-0002.txt" }) - void partitionPathDistributionExactlyOnceDistribution(final int maxTasks, final String path) { - final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + assertThat(taskDistribution.getTaskFor(ctx)).isEqualTo(expectedResult); } - @ParameterizedTest - @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", - "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", - "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", - "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) - void partitionPathDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, - final int maxTaskAfterReConfig, final String path) { - - final String expectedSourceNameFormat = "topics/{{topic}}/{{partition}}/.*$"; - final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); - taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); - - results.clear(); - for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path, - FilePatternUtils.configurePattern(expectedSourceNameFormat))); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); + public static Context getContext(final String configuredFilenamePattern, final String filename) { + final FilePatternUtils utils = new FilePatternUtils(configuredFilenamePattern); + final Optional ctx = utils.process(filename); + assertThat(ctx.isPresent()).isTrue(); + return ctx.get(); } } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 5d95d6eb..140b1910 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; @@ -46,8 +47,6 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.TransformerFactory; -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; -import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; @@ -121,10 +120,14 @@ private Map getConfig(final String topics, final int maxTasks) { @Test void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final var topicName = IntegrationBase.topicName(testInfo); - final Map configData = getConfig(topicName, 1); + final int maxTasks = 1; + final int taskId = 0; + final Map configData = getConfig(topicName, maxTasks); configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); - + configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); + configData.put("task.id", String.valueOf(taskId)); + configData.put("tasks.max", String.valueOf(maxTasks)); final String testData1 = "Hello, Kafka Connect S3 Source! object 1"; final String testData2 = "Hello, Kafka Connect S3 Source! object 2"; @@ -153,8 +156,7 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient, new HashDistributionStrategy(1), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); + TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient); final HashSet seenKeys = new HashSet<>(); while (sourceRecordIterator.hasNext()) { @@ -177,6 +179,9 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { configData.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); configData.put(AVRO_VALUE_SERIALIZER, "io.confluent.kafka.serializers.KafkaAvroSerializer"); + configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); + configData.put("task.id", String.valueOf(taskId)); + configData.put("tasks.max", String.valueOf(maxTasks)); // Define Avro schema final String schemaJson = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestRecord\",\n" @@ -219,9 +224,7 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient, - new HashDistributionStrategy(maxTasks), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId); + TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient); final HashSet seenKeys = new HashSet<>(); final Map> seenRecords = new HashMap<>(); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 3ed3fdaf..5466435a 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -19,18 +19,12 @@ import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.regex.Pattern; import org.apache.kafka.connect.source.SourceRecord; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.AbstractSourceTask; import io.aiven.kafka.connect.common.source.input.Transformer; -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; -import io.aiven.kafka.connect.common.source.task.DistributionStrategy; -import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; -import io.aiven.kafka.connect.common.source.task.PartitionDistributionStrategy; -import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; import io.aiven.kafka.connect.s3.source.utils.OffsetManager; @@ -71,9 +65,6 @@ public class S3SourceTask extends AbstractSourceTask { private OffsetManager offsetManager; private S3SourceConfig s3SourceConfig; - private int taskId; - private Pattern filePattern; - public S3SourceTask() { super(LOGGER); } @@ -136,8 +127,8 @@ protected SourceCommonConfig configure(final Map props) { this.transformer = s3SourceConfig.getTransformer(); offsetManager = new OffsetManager(context, s3SourceConfig); awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig); - setS3SourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, - awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId)); + setS3SourceRecordIterator( + new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient)); return s3SourceConfig; } @@ -179,22 +170,4 @@ public Transformer getTransformer() { return transformer; } - private DistributionStrategy initializeObjectDistributionStrategy() { - final ObjectDistributionStrategy objectDistributionStrategy = s3SourceConfig.getObjectDistributionStrategy(); - final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); - this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; - DistributionStrategy distributionStrategy; - - if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILENAME) { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); - distributionStrategy = new PartitionDistributionStrategy(maxTasks); - } else { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); - distributionStrategy = new HashDistributionStrategy(maxTasks); - } - - return distributionStrategy; - } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 820be20a..3b16fe17 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -19,9 +19,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.function.Function; -import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.kafka.connect.data.SchemaAndValue; @@ -29,7 +29,11 @@ import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.Context; import io.aiven.kafka.connect.common.source.task.DistributionStrategy; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import software.amazon.awssdk.services.s3.model.S3Object; @@ -54,17 +58,18 @@ public final class SourceRecordIterator implements Iterator { private String topic; private int partitionId; + private Context ctx; + private final DistributionStrategy distributionStrategy; - private final int taskId; + private int taskId; private final Iterator inner; private Iterator outer; - private final Pattern filePattern; + private FilePatternUtils filePattern; public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, - final Transformer transformer, final AWSV2SourceClient sourceClient, - final DistributionStrategy distributionStrategy, final Pattern filePattern, final int taskId) { + final Transformer transformer, final AWSV2SourceClient sourceClient) { super(); this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; @@ -72,13 +77,12 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.transformer = transformer; this.sourceClient = sourceClient; - this.filePattern = filePattern; - this.distributionStrategy = distributionStrategy; - this.taskId = taskId; + + this.distributionStrategy = initializeObjectDistributionStrategy(); // Initialize predicates sourceClient.addPredicate(this::isFileMatchingPattern); - sourceClient.addPredicate(this::isFileAssignedToTask); + sourceClient.addPredicate(ass -> isFileAssignedToTask(ctx, taskId)); // call filters out bad file names and extracts topic/partition inner = sourceClient.getS3ObjectIterator(null); @@ -86,19 +90,18 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan } public boolean isFileMatchingPattern(final S3Object s3Object) { - final Optional optionalTopic = FilePatternUtils.getTopic(filePattern, s3Object.key()); - final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, s3Object.key()); - - if (optionalTopic.isPresent() && optionalPartitionId.isPresent()) { - topic = optionalTopic.get(); - partitionId = optionalPartitionId.get(); + final Optional optionalCtx = filePattern.process(s3Object.key()); + if (optionalCtx.isPresent()) { + ctx = optionalCtx.get(); + topic = ctx.getTopic().isPresent() ? ctx.getTopic().get() : null; // NOPMD assigning to null + partitionId = ctx.getPartition().isPresent() ? ctx.getPartition().getAsInt() : -1; return true; } return false; } - public boolean isFileAssignedToTask(final S3Object s3Object) { - return distributionStrategy.isPartOfTask(taskId, s3Object.key(), filePattern); + public boolean isFileAssignedToTask(final Context ctx, final int taskId) { + return taskId == distributionStrategy.getTaskFor(ctx); } @Override @@ -143,6 +146,21 @@ private Stream convert(final S3Object s3Object) { .map(new Mapper(partitionMap, recordCount, keyData, s3Object.key())); } + private DistributionStrategy initializeObjectDistributionStrategy() { + final ObjectDistributionStrategy objectDistributionStrategy = s3SourceConfig.getObjectDistributionStrategy(); + final int maxTasks = Integer.parseInt(s3SourceConfig.originalsStrings().get("tasks.max")); + this.taskId = Integer.parseInt(s3SourceConfig.originalsStrings().get("task.id")) % maxTasks; + + if (Objects.requireNonNull(objectDistributionStrategy) == ObjectDistributionStrategy.PARTITION_IN_FILENAME) { + this.filePattern = new FilePatternUtils( + s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + return new PartitionDistributionStrategy(maxTasks); + } + this.filePattern = new FilePatternUtils( + s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + return new HashDistributionStrategy(maxTasks); + } + /** * maps the data from the @{link Transformer} stream to an S3SourceRecord given all the additional data required. */ diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index f7559ddf..02833484 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,8 +16,7 @@ package io.aiven.kafka.connect.s3.source.utils; -import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; -import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_TOPIC_KEY; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyMap; @@ -31,140 +30,164 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.source.input.AvroTransformer; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; -import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; +import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; +import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; final class SourceRecordIteratorTest { + public static final String TASK_ID = "task.id"; + public static final String MAX_TASKS = "tasks.max"; private S3SourceConfig mockConfig; private OffsetManager mockOffsetManager; private Transformer mockTransformer; + private FileNameFragment mockFileNameFrag; - private AWSV2SourceClient mockSourceApiClient; + private AWSV2SourceClient sourceApiClient; @BeforeEach public void setUp() { mockConfig = mock(S3SourceConfig.class); mockOffsetManager = mock(OffsetManager.class); mockTransformer = mock(Transformer.class); - mockSourceApiClient = mock(AWSV2SourceClient.class); + mockFileNameFrag = mock(FileNameFragment.class); + } + + private S3SourceConfig getConfig(final Map data) { + final Map defaults = new HashMap<>(); + defaults.put(AWS_S3_BUCKET_NAME_CONFIG, "bucket-name"); + defaults.putAll(data); + return new S3SourceConfig(defaults); + } + + private void mockSourceConfig(final S3SourceConfig s3SourceConfig, final String filePattern, final int taskId, final int maxTasks){ + when(s3SourceConfig.getObjectDistributionStrategy()).thenReturn(ObjectDistributionStrategy.OBJECT_HASH); + final Map map = new HashMap<>(); + map.put(TASK_ID,String.valueOf(taskId)); + map.put(MAX_TASKS,String.valueOf(maxTasks)); + when(s3SourceConfig.originalsStrings()).thenReturn(map); + when(s3SourceConfig.getS3FileNameFragment()).thenReturn(mockFileNameFrag); + when(mockFileNameFrag.getFilenameTemplate()).thenReturn(Template.of(filePattern)); } @Test void testIteratorProcessesS3Objects() throws Exception { final String key = "topic-00001-abc123.txt"; + final String filePattern = "{{topic}}-{{partition}}"; + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final S3ClientBuilder builder = new S3ClientBuilder(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); - // Mock InputStream - try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); - - mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Collections.emptyIterator()); - Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(1), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); + mockSourceConfig(mockConfig, filePattern, 0, 1); - assertThat(iterator.hasNext()).isFalse(); - mockPatternMatcher(filePattern); + final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); + assertThat(iterator).isExhausted(); - final S3Object obj = S3Object.builder().key(key).build(); + builder.reset().addObject(key, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + final Iterator s3ObjectIterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, - new HashDistributionStrategy(1), filePattern, 0); + assertThat(s3ObjectIterator).hasNext(); + assertThat(s3ObjectIterator.next()).isNotNull(); + assertThat(s3ObjectIterator).isExhausted(); - assertThat(iterator.hasNext()).isTrue(); - assertThat(iterator.next()).isNotNull(); - } } @Test void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; - final S3Object s3Object = S3Object.builder().key(key).build(); + final String filePattern = "{{partition}}"; + + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final S3ClientBuilder builder = new S3ClientBuilder(); + + builder.reset().addObject(key, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + + mockSourceConfig(mockConfig, filePattern, 0, 1); // With ByteArrayTransformer - try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { - when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); - final Pattern filePattern = mock(Pattern.class); - - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); - - mockTransformer = mock(ByteArrayTransformer.class); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Stream.of(SchemaAndValue.NULL)); - - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); - mockPatternMatcher(filePattern); - - // should skip if any records were produced by source record iterator. - final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator.hasNext()).isFalse(); - verify(mockSourceApiClient, never()).getObject(any()); - verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); - } + + mockTransformer = mock(ByteArrayTransformer.class); + when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + .thenReturn(Stream.of(SchemaAndValue.NULL)); + + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + + when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) + .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + + // should skip if any records were produced by source record iterator. + final Iterator byteArrayIterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); + + assertThat(byteArrayIterator).isExhausted(); + + verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); // With AvroTransformer - try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { - when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); - final Pattern filePattern = mock(Pattern.class); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); - mockTransformer = mock(AvroTransformer.class); - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); - mockPatternMatcher(filePattern); - - when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); - - final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator.hasNext()).isFalse(); - - verify(mockTransformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); - } + + mockTransformer = mock(AvroTransformer.class); + + when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) + .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + + when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); + when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); + + final Iterator avroIterator = new SourceRecordIterator(mockConfig, mockOffsetManager, + mockTransformer, sourceApiClient); + assertThat(avroIterator).isExhausted(); + + verify(mockTransformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + } @ParameterizedTest @@ -174,52 +197,121 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final in mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); - - mockPatternMatcher(filePattern); + final String key = "topic-00001-abc123.txt"; + final String filePattern = "{{partition}}"; + final FilePatternUtils filePatternUtils = new FilePatternUtils(filePattern); + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final S3ClientBuilder builder = new S3ClientBuilder(); + mockSourceConfig(mockConfig, filePattern, taskId, maxTasks); final S3Object obj = S3Object.builder().key(objectKey).build(); - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + // Build s3 Client + builder.reset().addObject(key, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); + sourceApiClient); final Predicate s3ObjectPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) - && iterator.isFileAssignedToTask(s3Object); + && iterator.isFileAssignedToTask(filePatternUtils.process(s3Object.key()).orElseThrow(), taskId); // Assert assertThat(s3ObjectPredicate).accepts(obj); } @ParameterizedTest - @CsvSource({ "4, 1, topic1-2-0", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", + @CsvSource({ "4, 1, topic1-2-0", "4, 3,key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, final String objectKey) { mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); - - mockPatternMatcher(filePattern); + final String filePattern = "{{partition}}"; + mockSourceConfig(mockConfig, filePattern, taskId, maxTasks); + final S3ClientBuilder builder = new S3ClientBuilder(); + final S3SourceConfig config = getConfig(Collections.emptyMap()); + final FilePatternUtils filePatternUtils = new FilePatternUtils(filePattern); final S3Object obj = S3Object.builder().key(objectKey).build(); - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + builder.reset().addObject(objectKey, "Hello World").endOfBlock(); + sourceApiClient = new AWSV2SourceClient(builder.build(), config); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); + sourceApiClient); + final Predicate stringPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) - && iterator.isFileAssignedToTask(s3Object); + && iterator.isFileAssignedToTask(filePatternUtils.process(s3Object.key()).orElseThrow(), taskId); // Assert assertThat(stringPredicate.test(obj)).as("Predicate should accept the objectKey: " + objectKey).isFalse(); } - private static void mockPatternMatcher(final Pattern filePattern) { - final Matcher fileMatcher = mock(Matcher.class); - when(filePattern.matcher(any())).thenReturn(fileMatcher); - when(fileMatcher.find()).thenReturn(true); - when(fileMatcher.group(PATTERN_TOPIC_KEY)).thenReturn("testtopic"); - when(fileMatcher.group(PATTERN_PARTITION_KEY)).thenReturn("0"); + @Test + void testS3ClientIteratorMock() { + final S3ClientBuilder builder = new S3ClientBuilder(); + builder.addObject("Key", "value"); + final S3Client client = builder.build(); // NOPMD is asking to close client is done so on line 254 + final ListObjectsV2Response response = client.listObjectsV2(ListObjectsV2Request.builder().build()); + client.close(); + assertThat(response.contents()).isNotEmpty(); + + sourceApiClient = new AWSV2SourceClient(builder.build(), getConfig(Collections.emptyMap())); + final Iterator iterator = sourceApiClient.getS3ObjectIterator(null); + assertThat(iterator.hasNext()).isTrue(); + + } + + static class S3ClientBuilder { + Queue, Map>> blocks = new LinkedList<>(); + List objects = new ArrayList<>(); + Map data = new HashMap<>(); + + public S3ClientBuilder addObject(final String key, final byte[] data) { + objects.add(S3Object.builder().key(key).size((long) data.length).build()); + this.data.put(key, data); + return this; + } + + public S3ClientBuilder endOfBlock() { + blocks.add(Pair.of(objects, data)); + return reset(); + } + + public S3ClientBuilder reset() { + objects = new ArrayList<>(); + data = new HashMap<>(); + return this; + } + + public S3ClientBuilder addObject(final String key, final String data) { + return addObject(key, data.getBytes(StandardCharsets.UTF_8)); + } + + private ResponseBytes getResponse(final String key) { + return ResponseBytes.fromByteArray(new byte[0], data.get(key)); + } + + private ListObjectsV2Response dequeueData() { + if (blocks.isEmpty()) { + objects = Collections.emptyList(); + data = Collections.emptyMap(); + } else { + final Pair, Map> pair = blocks.remove(); + objects = pair.getLeft(); + data = pair.getRight(); + } + return ListObjectsV2Response.builder().contents(objects).isTruncated(false).build(); + } + + public S3Client build() { + if (!objects.isEmpty()) { + endOfBlock(); + } + final S3Client result = mock(S3Client.class); + when(result.listObjectsV2(any(ListObjectsV2Request.class))).thenAnswer(env -> dequeueData()); + when(result.listObjectsV2(any(Consumer.class))).thenAnswer(env -> dequeueData()); + when(result.getObjectAsBytes(any(GetObjectRequest.class))) + .thenAnswer(env -> getResponse(env.getArgument(0, GetObjectRequest.class).key())); + return result; + } } }