Skip to content

Commit

Permalink
Initial commit for adding the context to the source connectors
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 16, 2025
1 parent 6b967d3 commit 8de5797
Show file tree
Hide file tree
Showing 12 changed files with 586 additions and 412 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
package io.aiven.kafka.connect.common.source.input.utils;

import java.util.Optional;
import java.util.OptionalInt;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import io.aiven.kafka.connect.common.source.task.Context;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class FilePatternUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(FilePatternUtils.class);
public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
Expand All @@ -39,10 +44,12 @@ public final class FilePatternUtils {
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";

private FilePatternUtils() {
// hidden
final Pattern pattern;

public FilePatternUtils(final String pattern) {
this.pattern = configurePattern(pattern);
}
public static Pattern configurePattern(final String expectedSourceNameFormat) {
private Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
Expand All @@ -62,26 +69,46 @@ public static Pattern configurePattern(final String expectedSourceNameFormat) {
}
}

public static Optional<String> getTopic(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY));
public Optional<Context> process(final String sourceName) {
final Optional<String> topic = getTopic(sourceName);
final OptionalInt partition = getPartitionId(sourceName);
return Optional.of(new Context(topic.orElse(null), partition, sourceName));

}

public static Optional<Integer> getPartitionId(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).flatMap(matcher -> {
private Optional<String> getTopic(final String sourceName) {
return matchPattern(sourceName).flatMap(matcher -> {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (NumberFormatException e) {
// TODO check why this worked before without the try catch
return Optional.of(matcher.group(PATTERN_TOPIC_KEY));
} catch (IllegalArgumentException ex) {
// It is possible that when checking for the group it does not match and returns an
// illegalArgumentException
return Optional.empty();
}
});
}

private static Optional<Matcher> matchPattern(final Pattern filePattern, final String sourceName) {
if (filePattern == null || sourceName == null) {
throw new IllegalArgumentException("filePattern and sourceName must not be null");
private OptionalInt getPartitionId(final String sourceName) {
final Optional<Matcher> parseIntMatcher = matchPattern(sourceName);
if (parseIntMatcher.isPresent()) {
try {
return OptionalInt.of(Integer.parseInt(parseIntMatcher.get().group(PATTERN_PARTITION_KEY)));
} catch (IllegalArgumentException e) {
// It is possible that when checking for the group it does not match and returns an
// illegalStateException, Number format exception is also covered by this in this case.
LOGGER.debug("No partition found in this entry {}", sourceName);
}
}
return OptionalInt.empty();

}

final Matcher matcher = filePattern.matcher(sourceName);
private Optional<Matcher> matchPattern(final String sourceName) {
if (sourceName == null) {
throw new IllegalArgumentException("filePattern and sourceName must not be null");
}
final Matcher matcher = pattern.matcher(sourceName);
return matcher.find() ? Optional.of(matcher) : Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

import java.util.Optional;
import java.util.OptionalInt;

public class Context {

private String topic;
private OptionalInt partition;
private String storageKey;

public Context(final String topic, final OptionalInt partition, final String storageKey) {
this.topic = topic;
this.partition = partition;
this.storageKey = storageKey;
}

public Optional<String> getTopic() {
return Optional.ofNullable(topic);
}

public void setTopic(final String topic) {
this.topic = topic;
}

public OptionalInt getPartition() {
return partition;
}

public void setPartition(final OptionalInt partition) {
this.partition = partition;
}

public Optional<String> getStorageKey() {
return Optional.ofNullable(storageKey);
}

public void setStorageKey(final String storageKey) {
this.storageKey = storageKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
Expand All @@ -27,18 +27,24 @@
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
*/
public interface DistributionStrategy {
public abstract class DistributionStrategy {
protected int maxTasks;
@SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW", justification = "constructor throws if max tasks is less then 0")
public DistributionStrategy(final int maxTasks) {
if (maxTasks <= 0) {
throw new IllegalArgumentException("tasks.max must be set to a positive number and at least 1.");
}
this.maxTasks = maxTasks;
}
/**
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
* assigned deterministically to a single taskId.
*
* @param taskId
* a task ID, usually for the currently running task
* @param valueToBeEvaluated
* The value to be evaluated to determine if it should be processed by the task.
* @return true if the task should process the object, false if it should not.
* @param ctx
* This is the context which contains optional values for the partition, topic and storage key name
* @return the taskId which this particular task should be assigned to.
*/
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern);
public abstract int getTaskFor(Context ctx);

/**
* When a connector receives a reconfigure event this method should be called to ensure that the distribution
Expand All @@ -47,5 +53,7 @@ public interface DistributionStrategy {
* @param maxTasks
* The maximum number of tasks created for the Connector
*/
void configureDistributionStrategy(int maxTasks);
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,31 +26,28 @@
* This is well-suited to use cases where the order of events between records from objects is not important, especially
* when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
*/
public final class HashDistributionStrategy implements DistributionStrategy {
public final class HashDistributionStrategy extends DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class);
private int maxTasks;
public HashDistributionStrategy(final int maxTasks) {
configureDistributionStrategy(maxTasks);
super(maxTasks);
}

/**
*
* @param ctx
* is the Context which contains the storage key and optional values for the patition and topic
* @return the task id this context should be assigned to or -1 if it is indeterminable
*/
@Override
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) {
if (filenameToBeEvaluated == null) {
public int getTaskFor(final Context ctx) {
if (ctx.getStorageKey().isEmpty()) {
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
return -1;
}
final int taskAssignment = Math.floorMod(filenameToBeEvaluated.hashCode(), maxTasks);

// floor mod returns the remainder of a division so will start at 0 and move up
// tasks start at 0 so there should be no issue.
return taskAssignment == taskId;
}

@Override
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
return Math.floorMod(ctx.getStorageKey().hashCode(), maxTasks);
}

public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.Optional;
import java.util.regex.Pattern;

import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,53 +27,31 @@
* {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed
* within a single task.
*/
public final class PartitionDistributionStrategy implements DistributionStrategy {
public final class PartitionDistributionStrategy extends DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(PartitionDistributionStrategy.class);
private int maxTasks;

public PartitionDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
super(maxTasks);
}

/**
*
* @param sourceNameToBeEvaluated
* is the filename/table name of the source for the connector.
* @return Predicate to confirm if the given source name matches
* @param ctx
* is the Context which contains the storage key and optional values for the patition and topic
* @return the task id this context should be assigned to or -1 if it is indeterminable
*/
@Override
public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated, final Pattern filePattern) {
if (sourceNameToBeEvaluated == null) {
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
public int getTaskFor(final Context ctx) {
if (ctx.getPartition().isPresent()) {
return ctx.getPartition().getAsInt() % maxTasks;
}
final Optional<Integer> optionalPartitionId = FilePatternUtils.getPartitionId(filePattern,
sourceNameToBeEvaluated);

if (optionalPartitionId.isPresent()) {
return optionalPartitionId.get() < maxTasks
? taskMatchesPartition(taskId, optionalPartitionId.get())
: taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks);
}
LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated);
return false;
LOG.warn("Unable to find the partition from this file name {}", ctx.getStorageKey());
return -1;
}

boolean taskMatchesPartition(final int taskId, final int partitionId) {
// The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this
// will break.
return taskId == partitionId;
}

/**
* When a connector reconfiguration event is received this method should be called to ensure the correct strategy is
* being implemented by the connector.
*
* @param maxTasks
* maximum number of configured tasks for this connector
*/
@Override
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Loading

0 comments on commit 8de5797

Please sign in to comment.