Skip to content

Commit

Permalink
Integrate object dist strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 8, 2025
1 parent 6157773 commit 87ccede
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,14 @@ public void start(final Map<String, String> props) {
s3SourceConfig = new S3SourceConfig(props);
this.transformer = TransformerFactory.getTransformer(s3SourceConfig);
offsetManager = new OffsetManager(context, s3SourceConfig);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys,
initializeObjectDistributionStrategy(), taskId, filePattern);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
prepareReaderFromOffsetStorageReader();
this.taskInitialized = true;
}

private void prepareReaderFromOffsetStorageReader() {
sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient, filePattern);
awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import io.aiven.kafka.connect.common.source.task.DistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3ClientFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

Expand All @@ -52,28 +50,18 @@ public class AWSV2SourceClient {
private Predicate<S3Object> filterPredicate = s3Object -> s3Object.size() > 0;
private final Set<String> failedObjectKeys;

private final DistributionStrategy distributionStrategy;

private final int taskId;

private final Pattern filePattern;

/**
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys,
final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) {
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
this.s3SourceConfig = s3SourceConfig;
final S3ClientFactory s3ClientFactory = new S3ClientFactory();
this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig);
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
this.taskId = taskId;
this.distributionStrategy = distributionStrategy;
this.filePattern = filePattern;
}

/**
Expand All @@ -86,15 +74,12 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String>
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys,
final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) {
AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig,
final Set<String> failedObjectKeys) {
this.s3SourceConfig = s3SourceConfig;
this.s3Client = s3Client;
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
this.taskId = taskId;
this.distributionStrategy = distributionStrategy;
this.filePattern = filePattern;
}

public Iterator<String> getListOfObjectKeys(final String startToken) {
Expand Down Expand Up @@ -141,9 +126,7 @@ public void addFailedObjectKeys(final String objectKey) {
}

public void setFilterPredicate(final Predicate<S3Object> basePredicate) {
this.filterPredicate = basePredicate
.and(objectSummary -> distributionStrategy.isPartOfTask(taskId, objectSummary.key(), filePattern))
.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key()));
this.filterPredicate = basePredicate.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key()));
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand All @@ -34,6 +35,7 @@

import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.task.DistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import org.apache.commons.io.function.IOSupplier;
Expand All @@ -48,6 +50,8 @@
public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class);
private final Pattern filePattern;
private final DistributionStrategy distributionStrategy;
private final int taskId;
public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L;
private String currentObjectKey;

Expand All @@ -65,7 +69,8 @@ public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private final AWSV2SourceClient sourceClient; // NOPMD

public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager,
final Transformer transformer, final AWSV2SourceClient sourceClient, final Pattern filePattern) {
final Transformer transformer, final AWSV2SourceClient sourceClient,
final DistributionStrategy distributionStrategy, final Pattern filePattern, final int taskId) {
this.s3SourceConfig = s3SourceConfig;
this.offsetManager = offsetManager;

Expand All @@ -74,6 +79,8 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan
this.sourceClient = sourceClient;
objectListIterator = sourceClient.getListOfObjectKeys(null);
this.filePattern = filePattern;
this.distributionStrategy = distributionStrategy;
this.taskId = taskId;
}

private void nextS3Object() {
Expand All @@ -88,14 +95,29 @@ private void nextS3Object() {

try {
currentObjectKey = objectListIterator.next();

if (currentObjectKey != null) {
if (validateTaskDistributionStrategy(currentObjectKey)) {
return;
}
recordIterator = createIteratorForCurrentFile();
}
} catch (IOException e) {
throw SdkException.create(e.getMessage(), e.getCause());
}
}

public boolean validateTaskDistributionStrategy(final String currentObjectKey) {
final Predicate<String> isPartOfTaskPredicate = objectKey -> distributionStrategy.isPartOfTask(taskId,
objectKey, filePattern);

if (!isPartOfTaskPredicate.test(currentObjectKey)) {
recordIterator = Collections.emptyIterator();
return true;
}
return false;
}

private Iterator<S3SourceRecord> createIteratorForCurrentFile() throws IOException {

final Matcher fileMatcher = filePattern.matcher(currentObjectKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.aiven.kafka.connect.s3.source.utils;

import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.ANY_FILENAME_PATTERN;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -33,14 +32,9 @@
import java.util.List;
import java.util.Map;

import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;
import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.PartitionInPathDistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import software.amazon.awssdk.services.s3.S3Client;
Expand All @@ -57,74 +51,48 @@ class AWSV2SourceClientTest {
@Captor
ArgumentCaptor<ListObjectsV2Request> requestCaptor;

private static Map<String, String> getConfigMap(final int maxTasks, final int taskId) {
private static Map<String, String> getConfigMap() {
final Map<String, String> configMap = new HashMap<>();
configMap.put("tasks.max", String.valueOf(maxTasks));
configMap.put("task.id", String.valueOf(taskId));

configMap.put(AWS_S3_BUCKET_NAME_CONFIG, "test-bucket");
return configMap;
}

@ParameterizedTest
@CsvSource({ "3, 1" })
void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) {
initializeWithTaskConfigs(maxTasks, taskId);
@Test
void testFetchObjectSummariesWithNoObjects() {
initializeWithTaskConfigs();
final ListObjectsV2Response listObjectsV2Response = createListObjectsV2Response(Collections.emptyList(), null);
when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response);

final Iterator<String> summaries = awsv2SourceClient.getListOfObjectKeys(null);
assertThat(summaries).isExhausted();
}

@ParameterizedTest
@CsvSource({ "1, 0" })
void testFetchObjectSummariesWithOneObjectWithBasicConfig(final int maxTasks, final int taskId) {
@Test
void testFetchObjectSummariesWithOneObjectWithBasicConfig() {
final String objectKey = "any-key";

initializeWithTaskConfigs(maxTasks, taskId);
final Iterator<String> summaries = getS3ObjectKeysIterator(objectKey);
assertThat(summaries).hasNext();
}

@ParameterizedTest
@CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" })
void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId,
final String objectKey) {
initializeWithTaskConfigs(maxTasks, taskId);
initializeWithTaskConfigs();
final Iterator<String> summaries = getS3ObjectKeysIterator(objectKey);
assertThat(summaries).hasNext();
}

@ParameterizedTest
@CsvSource({ "4, 1, key1", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3",
"4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" })
void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId,
final String objectKey) {
initializeWithTaskConfigs(maxTasks, taskId);
final Iterator<String> summaries = getS3ObjectKeysIterator(objectKey);

assertThat(summaries).isExhausted();
}

@ParameterizedTest
@CsvSource({ "4, 3", "4, 0" })
void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int taskId) {
initializeWithTaskConfigs(maxTasks, taskId);
@Test
void testFetchObjectSummariesWithZeroByteObject() {
initializeWithTaskConfigs();
final ListObjectsV2Response listObjectsV2Response = getListObjectsV2Response();
when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response);

final Iterator<String> summaries = awsv2SourceClient.getListOfObjectKeys(null);

// assigned 1 object to taskid
assertThat(summaries).hasNext();
assertThat(summaries.next()).isNotBlank();
assertThat(summaries.next()).isNotBlank();
assertThat(summaries).isExhausted();
}

@Test
void testFetchObjectSummariesWithPagination() throws IOException {
initializeWithTaskConfigs(4, 3);
initializeWithTaskConfigs();
final S3Object object1 = createObjectSummary(1, "key1");
final S3Object object2 = createObjectSummary(2, "key2");
final List<S3Object> firstBatch = List.of(object1);
Expand All @@ -138,19 +106,16 @@ void testFetchObjectSummariesWithPagination() throws IOException {
final Iterator<String> summaries = awsv2SourceClient.getListOfObjectKeys(null);
verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class));
assertThat(summaries.next()).isNotNull();
assertThat(summaries).isExhausted();
assertThat(summaries.next()).isNotNull();
}

@Test
void testFetchObjectWithPrefix() {
final int taskId = 0;
final Map<String, String> configMap = getConfigMap(1, taskId);
final Map<String, String> configMap = getConfigMap();
configMap.put(AWS_S3_PREFIX_CONFIG, "test/");
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(),
new PartitionInPathDistributionStrategy(1), taskId,
FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/" + ANY_FILENAME_PATTERN));
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3Object object1 = createObjectSummary(1, "topics/key1/1/key1.txt");
final S3Object object2 = createObjectSummary(1, "topics/key2/2/key2.txt");
Expand Down Expand Up @@ -178,14 +143,11 @@ void testFetchObjectWithPrefix() {

@Test
void testFetchObjectWithInitialStartAfter() {
final int taskId = 0;
final Map<String, String> configMap = getConfigMap(1, taskId);
final Map<String, String> configMap = getConfigMap();
final String startAfter = "file-option-1-12000.txt";
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(),
new HashDistributionStrategy(1), taskId,
FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"));
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3Object object1 = createObjectSummary(1, "key1-1-10000");
final S3Object object2 = createObjectSummary(1, "key2-2-20000");
Expand Down Expand Up @@ -236,13 +198,11 @@ private Iterator<String> getS3ObjectKeysIterator(final String objectKey) {
return awsv2SourceClient.getListOfObjectKeys(null);
}

public void initializeWithTaskConfigs(final int maxTasks, final int taskId) {
final Map<String, String> configMap = getConfigMap(maxTasks, taskId);
private void initializeWithTaskConfigs() {
final Map<String, String> configMap = getConfigMap();
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet(),
new HashDistributionStrategy(maxTasks), taskId,
FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"));
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
}

private ListObjectsV2Response getListObjectsV2Response() {
Expand Down
Loading

0 comments on commit 87ccede

Please sign in to comment.