diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java index a481293ba..c39676ad0 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java @@ -20,10 +20,11 @@ import org.slf4j.LoggerFactory; /** - * HashTaskAssignmentStrategy determines which files should be executed by a task by the filename and path supplied by - * the iterator. The RandomTaskAssignment is perfect in use cases where ordering of events is not a requirement and when - * adding files to kafka where the files were not previously created by a supported S3 Sink or where manually created or - * created by another process. + * {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of + * the object's filename, which is uniformly distributed and deterministic across workers. + *

+ * This is well-suited to use cases where the order of events between records from objects is not important, especially + * when ingesting files into Kafka that were not previously created by a supported cloud storage Sink. */ public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy { private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java index 37eff534e..5925d880d 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java @@ -17,11 +17,26 @@ package io.aiven.kafka.connect.common.source.task; /** - * SourceObjectDistributionStrategy is a common interface which allows source connectors to determine what method of - * distributing tasks amongst source connectors in distributed mode. + * An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or + * files) into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers. + *

+ * The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the + * overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together + * sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer + * workers than tasks, and they will be assigned the remaining tasks as work completes. */ public interface ObjectDistributionStrategy { + /** + * Check if the object should be processed by the task with the given {@code taskId}. Any single object should be + * assigned deterministically to a single taskId. + * + * @param taskId + * a task ID, usually for the currently running task + * @param valueToBeEvaluated + * The value to be evaluated to determine if it should be processed by the task. + * @return true if the task should process the object, false if it should not. + */ boolean isPartOfTask(int taskId, String valueToBeEvaluated); /** diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java index 983be720a..f74e56826 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java @@ -26,10 +26,12 @@ import org.slf4j.LoggerFactory; /** - * PartitionInFilenameDistributionStrategy will determine which files to process for each task, based on the partition - * number defined in the filename e.g. the default object names created by the sink connector ex. - * topicname-{{partition}}-{{start_offset}} the partition can be extracted to have one task running per partition. - * + * The {@link PartitionInFilenameDistributionStrategy} finds a partition in the object's filename by matching it to an + * expected format, and assigns all partitions to the same task. + *

+ * This useful when a sink connector has created the object name in a format like + * {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed + * within a single task. */ public final class PartitionInFilenameDistributionStrategy implements ObjectDistributionStrategy { private final static Logger LOG = LoggerFactory.getLogger(PartitionInFilenameDistributionStrategy.class); @@ -70,7 +72,7 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat } /** - * When a connector reconfiguration event is received this method should be called to ensure the correct Startegy is + * When a connector reconfiguration event is received this method should be called to ensure the correct strategy is * being implemented by the connector. * * @param maxTasks diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java index 8ea6755f8..85e1c3e75 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java @@ -24,10 +24,22 @@ import org.slf4j.LoggerFactory; /** - * PartitionInPathDistributionStrategy allows source connectors to distribute tasks based off the folder structure that - * partition number that is defined in that structure. for example /PREFIX/partition={{partition}}/YYYY/MM/DD/mm this - * will split all tasks by the number of unique partitions defined in the storage path. e.g. Task distribution in - * Connect with 10 Partitions and 3 tasks |Task | Partition| |0|0| |1|1| |2|2| |0|3| |1|4| |2|5| |0|6| |1|7| |2|8| |0|9| + * The {@link PartitionInPathDistributionStrategy} finds a partition number in the path by matching a + * {@code {{partition}} } marker in the path. + *

+ * This useful when a sink connector has created the object name in a path like + * {@code /PREFIX/partition={{partition}}/YYYY/MM/DD/mm/}}, and we want all objects with the same partition to be + * processed within a single task. + *

+ * Partitions are evenly distributed between tasks. For example, in Connect with 10 Partitions and 3 tasks: + * + *

+ *   | Task | Partitions |
+ *   |------|------------|
+ *   | 0    | 0, 3, 6, 9 |
+ *   | 1    | 1, 4, 7    |
+ *   | 2    | 2, 5, 8    |
+ * 
*/ public final class PartitionInPathDistributionStrategy implements ObjectDistributionStrategy { public static final String PARTITION_ID_PATTERN = "\\{\\{partition}}"; @@ -70,7 +82,6 @@ public boolean isPartOfTask(final int taskId, final String pathToBeEvaluated) { * @param expectedPathFormat * The format of the path and where to identify */ - @Override public void reconfigureDistributionStrategy(final int maxTasks, final String expectedPathFormat) { configureDistributionStrategy(maxTasks, expectedPathFormat);