Skip to content

Commit

Permalink
Merge pull request #355 from RyanSkraba/rskraba-docs-object-distribut…
Browse files Browse the repository at this point in the history
…ion-strategy

Clarify javadocs for OjectDistributionStrategy
  • Loading branch information
muralibasani authored Nov 22, 2024
2 parents d1fd4ce + 73e3197 commit 758fedc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import org.slf4j.LoggerFactory;

/**
* HashTaskAssignmentStrategy determines which files should be executed by a task by the filename and path supplied by
* the iterator. The RandomTaskAssignment is perfect in use cases where ordering of events is not a requirement and when
* adding files to kafka where the files were not previously created by a supported S3 Sink or where manually created or
* created by another process.
* {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of
* the object's filename, which is uniformly distributed and deterministic across workers.
* <p>
* This is well-suited to use cases where the order of events between records from objects is not important, especially
* when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
*/
public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,26 @@
package io.aiven.kafka.connect.common.source.task;

/**
* SourceObjectDistributionStrategy is a common interface which allows source connectors to determine what method of
* distributing tasks amongst source connectors in distributed mode.
* An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or
* files) into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* <p>
* The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the
* overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
*/
public interface ObjectDistributionStrategy {

/**
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
* assigned deterministically to a single taskId.
*
* @param taskId
* a task ID, usually for the currently running task
* @param valueToBeEvaluated
* The value to be evaluated to determine if it should be processed by the task.
* @return true if the task should process the object, false if it should not.
*/
boolean isPartOfTask(int taskId, String valueToBeEvaluated);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.slf4j.LoggerFactory;

/**
* PartitionInFilenameDistributionStrategy will determine which files to process for each task, based on the partition
* number defined in the filename e.g. the default object names created by the sink connector ex.
* topicname-{{partition}}-{{start_offset}} the partition can be extracted to have one task running per partition.
*
* The {@link PartitionInFilenameDistributionStrategy} finds a partition in the object's filename by matching it to an
* expected format, and assigns all partitions to the same task.
* <p>
* This useful when a sink connector has created the object name in a format like
* {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed
* within a single task.
*/
public final class PartitionInFilenameDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(PartitionInFilenameDistributionStrategy.class);
Expand Down Expand Up @@ -70,7 +72,7 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat
}

/**
* When a connector reconfiguration event is received this method should be called to ensure the correct Startegy is
* When a connector reconfiguration event is received this method should be called to ensure the correct strategy is
* being implemented by the connector.
*
* @param maxTasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,22 @@
import org.slf4j.LoggerFactory;

/**
* PartitionInPathDistributionStrategy allows source connectors to distribute tasks based off the folder structure that
* partition number that is defined in that structure. for example /PREFIX/partition={{partition}}/YYYY/MM/DD/mm this
* will split all tasks by the number of unique partitions defined in the storage path. e.g. Task distribution in
* Connect with 10 Partitions and 3 tasks |Task | Partition| |0|0| |1|1| |2|2| |0|3| |1|4| |2|5| |0|6| |1|7| |2|8| |0|9|
* The {@link PartitionInPathDistributionStrategy} finds a partition number in the path by matching a
* {@code {{partition}} } marker in the path.
* <p>
* This useful when a sink connector has created the object name in a path like
* {@code /PREFIX/partition={{partition}}/YYYY/MM/DD/mm/}}, and we want all objects with the same partition to be
* processed within a single task.
* <p>
* Partitions are evenly distributed between tasks. For example, in Connect with 10 Partitions and 3 tasks:
*
* <pre>
* | Task | Partitions |
* |------|------------|
* | 0 | 0, 3, 6, 9 |
* | 1 | 1, 4, 7 |
* | 2 | 2, 5, 8 |
* </pre>
*/
public final class PartitionInPathDistributionStrategy implements ObjectDistributionStrategy {
public static final String PARTITION_ID_PATTERN = "\\{\\{partition}}";
Expand Down Expand Up @@ -70,7 +82,6 @@ public boolean isPartOfTask(final int taskId, final String pathToBeEvaluated) {
* @param expectedPathFormat
* The format of the path and where to identify
*/

@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
configureDistributionStrategy(maxTasks, expectedPathFormat);
Expand Down

0 comments on commit 758fedc

Please sign in to comment.