Skip to content

Commit

Permalink
Fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 7, 2025
1 parent 0861d4c commit d082cb8
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies;

import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;

public final class SourceConfigFragment extends ConfigFragment {
private static final String GROUP_OTHER = "OTHER_CFG";
Expand Down Expand Up @@ -80,7 +80,7 @@ public static ConfigDef update(final ConfigDef configDef) {
"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH,
GROUP_OTHER, sourcePollingConfigCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD

return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.kafka.common.config.ConfigException;

import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;

/**
* An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.kafka.connect.errors.ConnectException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +44,6 @@
*/
public final class PartitionInPathDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class);
private String s3Prefix;
private Pattern filePattern;
private int maxTasks;

Expand All @@ -58,21 +56,12 @@ public boolean isPartOfTask(final int taskId, final String pathToBeEvaluated) {
try {
final Matcher fileMatcher = filePattern.matcher(pathToBeEvaluated);
if (fileMatcher.matches()) {
// TBD this block is not required. prefix verification is done in aws client
final String topicName = fileMatcher.group(PATTERN_TOPIC_KEY);
final String s3PrefixForObject = StringUtils.replace(s3Prefix, TOPIC_PATTERN, topicName);
if (!pathToBeEvaluated.startsWith(s3PrefixForObject)) {
LOG.warn("Ignoring path {}, does not contain the preconfigured prefix {} set up at startup",
pathToBeEvaluated, s3Prefix);
return false;
}
// -- end of block
final String partitionId = fileMatcher.group(PATTERN_PARTITION_KEY);

return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(partitionId));
} else {
LOG.warn("Ignoring path {}, does not contain the preconfigured prefix {} set up at startup",
pathToBeEvaluated, s3Prefix);
LOG.warn(
"Ignoring path {}, does not contain the preconfigured prefix to extract partition set up at startup",
pathToBeEvaluated);
return false;
}
} catch (NumberFormatException ex) {
Expand Down Expand Up @@ -101,6 +90,5 @@ public Pattern getFilePattern() {
private void configureDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedPathFormat);
s3Prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_PATTERN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.SourceConfigFragment.MAX_POLL_RECORDS;
import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.ANY_FILENAME_PATTERN;
import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.DEFAULT_PREFIX_FILE_PATH_PATTERN;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -30,8 +32,13 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.common.source.task.HashObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.PartitionInFilenameDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
Expand Down Expand Up @@ -76,10 +83,12 @@ public class S3SourceTask extends SourceTask {
private final Object pollLock = new Object();
private AWSV2SourceClient awsv2SourceClient;
private final Set<String> failedObjectKeys = new HashSet<>();
private final Set<String> inProcessObjectKeys = new HashSet<>();
private ObjectDistributionStrategy objectDistributionStrategy;

private OffsetManager offsetManager;

private int taskId;

@SuppressWarnings("PMD.UnnecessaryConstructor")
public S3SourceTask() {
super();
Expand All @@ -95,16 +104,16 @@ public void start(final Map<String, String> props) {
LOGGER.info("S3 Source task started.");
s3SourceConfig = new S3SourceConfig(props);
this.transformer = TransformerFactory.getTransformer(s3SourceConfig);
this.objectDistributionStrategy = initializeObjectDistributionStrategy();
offsetManager = new OffsetManager(context, s3SourceConfig);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
awsv2SourceClient.initializeObjectDistributionStrategy();
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys, objectDistributionStrategy, taskId);
prepareReaderFromOffsetStorageReader();
this.taskInitialized = true;
}

private void prepareReaderFromOffsetStorageReader() {
sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient, awsv2SourceClient.getObjectDistributionStrategy().getFilePattern());
awsv2SourceClient, this.objectDistributionStrategy.getFilePattern());
}

@Override
Expand Down Expand Up @@ -188,4 +197,25 @@ public boolean isTaskInitialized() {
public AtomicBoolean getConnectorStopped() {
return new AtomicBoolean(connectorStopped.get());
}

private ObjectDistributionStrategy initializeObjectDistributionStrategy() {
final ObjectDistributionStrategies objectDistributionStrategies = s3SourceConfig
.getObjectDistributionStrategy();
final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString());
this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
ObjectDistributionStrategy objectDistributionStrategy;

if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILENAME) {
objectDistributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks,
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate());
} else if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILEPATH) {
objectDistributionStrategy = new PartitionInPathDistributionStrategy(maxTasks,
DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN);
} else {
objectDistributionStrategy = new HashObjectDistributionStrategy(maxTasks,
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString());
}

return objectDistributionStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package io.aiven.kafka.connect.s3.source.utils;

import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.ANY_FILENAME_PATTERN;
import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.DEFAULT_PREFIX_FILE_PATH_PATTERN;

import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -27,11 +24,7 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies;
import io.aiven.kafka.connect.common.source.task.HashObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.PartitionInFilenameDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3ClientFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

Expand All @@ -55,36 +48,28 @@ public class AWSV2SourceClient {
private final S3Client s3Client;
private final String bucketName;

private final Predicate<S3Object> filterPredicate = s3Object -> s3Object.size() > 0;
private Predicate<S3Object> filterPredicate = s3Object -> s3Object.size() > 0;
private final Set<String> failedObjectKeys;

private ObjectDistributionStrategy objectDistributionStrategy;
private final ObjectDistributionStrategy objectDistributionStrategy;

private int taskId;
private int maxTasks;
private final int taskId;

/**
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys,
final ObjectDistributionStrategy objectDistributionStrategy, final int taskId) {
this.s3SourceConfig = s3SourceConfig;
final S3ClientFactory s3ClientFactory = new S3ClientFactory();
this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig);
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
}

public void initializeObjectDistributionStrategy() {
this.maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString());
this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
this.objectDistributionStrategy = getObjectDistributionStrategy(s3SourceConfig.getObjectDistributionStrategy());
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return this.objectDistributionStrategy;
this.taskId = taskId;
this.objectDistributionStrategy = objectDistributionStrategy;
}

/**
Expand All @@ -97,12 +82,14 @@ public ObjectDistributionStrategy getObjectDistributionStrategy() {
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig,
final Set<String> failedObjectKeys) {
AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys,
final ObjectDistributionStrategy objectDistributionStrategy, final int taskId) {
this.s3SourceConfig = s3SourceConfig;
this.s3Client = s3Client;
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
this.taskId = taskId;
this.objectDistributionStrategy = objectDistributionStrategy;
}

public Iterator<String> getListOfObjectKeys(final String startToken) {
Expand All @@ -113,6 +100,8 @@ public Iterator<String> getListOfObjectKeys(final String startToken) {
.startAfter(optionalKey(startToken))
.build();

setFilterPredicate(filterPredicate);

final Stream<String> s3ObjectKeyStream = Stream
.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
Expand All @@ -126,12 +115,7 @@ public Iterator<String> getListOfObjectKeys(final String startToken) {
}

})
.flatMap(response -> response.contents()
.stream()
.filter(filterPredicate)
.filter(objectSummary -> objectDistributionStrategy.isPartOfTask(taskId, objectSummary.key()))
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key())))
.map(S3Object::key);
.flatMap(response -> response.contents().stream().filter(filterPredicate).map(S3Object::key));
return s3ObjectKeyStream.iterator();
}
private String optionalKey(final String key) {
Expand All @@ -151,25 +135,13 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void shutdown() {
s3Client.close();
public void setFilterPredicate(final Predicate<S3Object> basePredicate) {
this.filterPredicate = basePredicate
.and(objectSummary -> objectDistributionStrategy.isPartOfTask(taskId, objectSummary.key()))
.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key()));
}

private ObjectDistributionStrategy getObjectDistributionStrategy(
final ObjectDistributionStrategies objectDistributionStrategies) {
ObjectDistributionStrategy objectDistributionStrategy;

if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILENAME) {
objectDistributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks,
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate());
} else if (objectDistributionStrategies == ObjectDistributionStrategies.PARTITION_IN_FILEPATH) {
objectDistributionStrategy = new PartitionInPathDistributionStrategy(maxTasks,
DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN);
} else {
objectDistributionStrategy = new HashObjectDistributionStrategy(maxTasks,
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString());
}

return objectDistributionStrategy;
public void shutdown() {
s3Client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.s3.source.utils;

import static io.aiven.kafka.connect.common.source.task.ObjectDistributionStrategy.ANY_FILENAME_PATTERN;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -32,6 +33,8 @@
import java.util.List;
import java.util.Map;

import io.aiven.kafka.connect.common.source.task.HashObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -139,15 +142,17 @@ void testFetchObjectSummariesWithPagination() throws IOException {

@Test
void testFetchObjectWithPrefix() {
final Map<String, String> configMap = getConfigMap(1, 0);
final int taskId = 0;
final Map<String, String> configMap = getConfigMap(1, taskId);
configMap.put(AWS_S3_PREFIX_CONFIG, "test/");
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
awsv2SourceClient.initializeObjectDistributionStrategy();
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(),
new PartitionInPathDistributionStrategy(1, "topics/{{topic}}/{{partition}}/" + ANY_FILENAME_PATTERN),
taskId);
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3Object object1 = createObjectSummary(1, "key1");
final S3Object object2 = createObjectSummary(1, "key2");
final S3Object object1 = createObjectSummary(1, "topics/key1/1/key1.txt");
final S3Object object2 = createObjectSummary(1, "topics/key2/2/key2.txt");

final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken");
final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null);
Expand All @@ -173,15 +178,16 @@ void testFetchObjectWithPrefix() {

@Test
void testFetchObjectWithInitialStartAfter() {
final Map<String, String> configMap = getConfigMap(1, 0);
final int taskId = 0;
final Map<String, String> configMap = getConfigMap(1, taskId);
final String startAfter = "file-option-1-12000.txt";
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
awsv2SourceClient.initializeObjectDistributionStrategy();
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(),
new HashObjectDistributionStrategy(1, "{{topic}}-{{partition}}-{{start_offset}}"), taskId);
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3Object object1 = createObjectSummary(1, "key1");
final S3Object object2 = createObjectSummary(1, "key2");
final S3Object object1 = createObjectSummary(1, "key1-1-10000");
final S3Object object2 = createObjectSummary(1, "key2-2-20000");

final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken");
final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null);
Expand Down Expand Up @@ -233,8 +239,8 @@ public void initializeWithTaskConfigs(final int maxTasks, final int taskId) {
final Map<String, String> configMap = getConfigMap(maxTasks, taskId);
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
awsv2SourceClient.initializeObjectDistributionStrategy();
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(),
new HashObjectDistributionStrategy(maxTasks, "{{topic}}-{{partition}}-{{start_offset}}"), taskId);
}

private ListObjectsV2Response getListObjectsV2Response() {
Expand Down

0 comments on commit d082cb8

Please sign in to comment.