Skip to content

Commit

Permalink
Initial commit for adding the context to the source connectors
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 17, 2025
1 parent 6b967d3 commit 04b38e1
Show file tree
Hide file tree
Showing 13 changed files with 665 additions and 437 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

import org.apache.kafka.common.config.ConfigException;

import org.apache.commons.lang3.StringUtils;
import io.aiven.kafka.connect.common.source.task.Context;

public final class FilePatternUtils {
import org.apache.commons.lang3.StringUtils;

public final class FilePatternUtils<K> {
public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
Expand All @@ -39,14 +40,25 @@ public final class FilePatternUtils {
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";

private FilePatternUtils() {
// hidden
final Pattern pattern;
final Optional<String> targetTopic;

public FilePatternUtils(final String pattern, final String targetTopic) {
this.pattern = configurePattern(pattern);
this.targetTopic = Optional.ofNullable(targetTopic);
}
public static Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
expectedSourceNameFormat));

/**
*
* @param expectedSourceNameFormat
* This is a string in the expected compatible format which will allow object name or keys to have unique
* information such as partition number, topic name, offset and timestamp information.
* @return A pattern which is configured to allow extraction of the key information from object names and keys.
*/
private Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null) {
throw new ConfigException(
"Source name format is missing please configure the expected source to include the partition pattern.");
}
// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
Expand All @@ -62,26 +74,55 @@ public static Pattern configurePattern(final String expectedSourceNameFormat) {
}
}

public static Optional<String> getTopic(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY));
public Optional<Context<K>> process(final K sourceName) {
if (fileMatches(sourceName.toString())) {
final Optional<String> topic = getTopic(sourceName.toString());
final Optional<Integer> partition = getPartitionId(sourceName.toString());
return Optional.of(new Context<K>(topic.orElse(null), partition.orElse(null), sourceName));
}
return Optional.empty();

}

public static Optional<Integer> getPartitionId(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).flatMap(matcher -> {
private boolean fileMatches(final String sourceName) {
return matchPattern(sourceName).isPresent();
}

private Optional<String> getTopic(final String sourceName) {
if (targetTopic.isPresent()) {
return targetTopic;
}

return matchPattern(sourceName).flatMap(matcher -> {
try {
// TODO check why this worked before without the try catch
return Optional.of(matcher.group(PATTERN_TOPIC_KEY));
} catch (IllegalArgumentException ex) {
// It is possible that when checking for the group it does not match and returns an
// illegalArgumentException
return Optional.empty();
}
});
}

private Optional<Integer> getPartitionId(final String sourceName) {
return matchPattern(sourceName).flatMap(matcher -> {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (NumberFormatException e) {
} catch (IllegalArgumentException e) {
// It is possible that when checking for the group it does not match and returns an
// illegalStateException, Number format exception is also covered by this in this case.
return Optional.empty();
}
});

}

private static Optional<Matcher> matchPattern(final Pattern filePattern, final String sourceName) {
if (filePattern == null || sourceName == null) {
private Optional<Matcher> matchPattern(final String sourceName) {
if (sourceName == null) {
throw new IllegalArgumentException("filePattern and sourceName must not be null");
}

final Matcher matcher = filePattern.matcher(sourceName);
final Matcher matcher = pattern.matcher(sourceName);
return matcher.find() ? Optional.of(matcher) : Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

import java.util.Optional;

public class Context<K> {

private String topic;
private Integer partition;
private K storageKey;

public Context(final String topic, final Integer partition, final K storageKey) {
this.topic = topic;
this.partition = partition;
this.storageKey = storageKey;
}

public Optional<String> getTopic() {
return Optional.ofNullable(topic);
}

public void setTopic(final String topic) {
this.topic = topic;
}

public Optional<Integer> getPartition() {
return Optional.ofNullable(partition);
}

public void setPartition(final Integer partition) {
this.partition = partition;
}

public Optional<K> getStorageKey() {
return Optional.ofNullable(storageKey);
}

public void setStorageKey(final K storageKey) {
this.storageKey = storageKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
Expand All @@ -27,18 +27,30 @@
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
*/
public interface DistributionStrategy {
public abstract class DistributionStrategy {
protected int maxTasks;
protected final static int UNDEFINED = -1;
@SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW", justification = "constructor throws if max tasks is less then 0")
public DistributionStrategy(final int maxTasks) {
isValidMaxTask(maxTasks);
this.maxTasks = maxTasks;
}

private static void isValidMaxTask(final int maxTasks) {
if (maxTasks <= 0) {
throw new IllegalArgumentException("tasks.max must be set to a positive number and at least 1.");
}
}

/**
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
* assigned deterministically to a single taskId.
*
* @param taskId
* a task ID, usually for the currently running task
* @param valueToBeEvaluated
* The value to be evaluated to determine if it should be processed by the task.
* @return true if the task should process the object, false if it should not.
* @param ctx
* This is the context which contains optional values for the partition, topic and storage key name
* @return the taskId which this particular task should be assigned to.
*/
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern);
public abstract int getTaskFor(Context<?> ctx);

/**
* When a connector receives a reconfigure event this method should be called to ensure that the distribution
Expand All @@ -47,5 +59,8 @@ public interface DistributionStrategy {
* @param maxTasks
* The maximum number of tasks created for the Connector
*/
void configureDistributionStrategy(int maxTasks);
public void configureDistributionStrategy(final int maxTasks) {
isValidMaxTask(maxTasks);
this.maxTasks = maxTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,31 +26,28 @@
* This is well-suited to use cases where the order of events between records from objects is not important, especially
* when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
*/
public final class HashDistributionStrategy implements DistributionStrategy {
public final class HashDistributionStrategy extends DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class);
private int maxTasks;
public HashDistributionStrategy(final int maxTasks) {
configureDistributionStrategy(maxTasks);
super(maxTasks);
}

/**
*
* @param ctx
* is the Context which contains the storage key and optional values for the patition and topic
* @return the task id this context should be assigned to or -1 if it is indeterminable
*/
@Override
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) {
if (filenameToBeEvaluated == null) {
public int getTaskFor(final Context<?> ctx) {
if (ctx.getStorageKey().isEmpty()) {
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
return UNDEFINED;
}
final int taskAssignment = Math.floorMod(filenameToBeEvaluated.hashCode(), maxTasks);

// floor mod returns the remainder of a division so will start at 0 and move up
// tasks start at 0 so there should be no issue.
return taskAssignment == taskId;
}

@Override
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
return Math.floorMod(ctx.getStorageKey().hashCode(), maxTasks);
}

public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package io.aiven.kafka.connect.common.source.task;

import java.util.Optional;
import java.util.regex.Pattern;

import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,53 +29,26 @@
* {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed
* within a single task.
*/
public final class PartitionDistributionStrategy implements DistributionStrategy {
public final class PartitionDistributionStrategy extends DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(PartitionDistributionStrategy.class);
private int maxTasks;

public PartitionDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
super(maxTasks);
}

/**
*
* @param sourceNameToBeEvaluated
* is the filename/table name of the source for the connector.
* @return Predicate to confirm if the given source name matches
* @param ctx
* is the Context which contains the storage key and optional values for the patition and topic
* @return the task id this context should be assigned to or -1 if it is indeterminable
*/
@Override
public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated, final Pattern filePattern) {
if (sourceNameToBeEvaluated == null) {
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
}
final Optional<Integer> optionalPartitionId = FilePatternUtils.getPartitionId(filePattern,
sourceNameToBeEvaluated);

if (optionalPartitionId.isPresent()) {
return optionalPartitionId.get() < maxTasks
? taskMatchesPartition(taskId, optionalPartitionId.get())
: taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks);
public int getTaskFor(final Context<?> ctx) {
final Optional<Integer> partitionId = ctx.getPartition();
if (partitionId.isPresent()) {
return partitionId.get() % maxTasks;
}
LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated);
return false;
}

boolean taskMatchesPartition(final int taskId, final int partitionId) {
// The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this
// will break.
return taskId == partitionId;
}

/**
* When a connector reconfiguration event is received this method should be called to ensure the correct strategy is
* being implemented by the connector.
*
* @param maxTasks
* maximum number of configured tasks for this connector
*/
@Override
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
LOG.warn("Unable to find the partition from this file name {}", ctx.getStorageKey());
return UNDEFINED;
}
}
Loading

0 comments on commit 04b38e1

Please sign in to comment.