Skip to content

Commit

Permalink
Configure s3 api to use AWS Prefix (#370)
Browse files Browse the repository at this point in the history
* This update means we can now use the PREFIX in the AWS API allowing
users to configure it to be more specific about what they want processed
by the connector.

---------

Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven authored Dec 16, 2024
1 parent 31dd0ab commit 24df6ab
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ static void addAwsConfigGroup(final ConfigDef configDef) {
ConfigDef.Importance.MEDIUM, "AWS S3 Region, e.g. us-east-1", GROUP_AWS, awsGroupCounter++,
ConfigDef.Width.NONE, AWS_S3_REGION_CONFIG);

configDef.define(AWS_S3_PREFIX_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "Prefix for stored objects, e.g. cluster-1/", GROUP_AWS, awsGroupCounter++,
ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG);

configDef.define(FETCH_PAGE_SIZE, ConfigDef.Type.INT, 10, ConfigDef.Range.atLeast(1),
ConfigDef.Importance.MEDIUM, "AWS S3 Fetch page size", GROUP_AWS, awsGroupCounter++, // NOPMD
// UnusedAssignment
Expand Down Expand Up @@ -252,10 +256,6 @@ static void addAwsStsConfigGroup(final ConfigDef configDef) {
}

static void addDeprecatedConfiguration(final ConfigDef configDef) {
configDef.define(AWS_S3_PREFIX_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM,
"[Deprecated] Use `file.name.template` instead. Prefix for stored objects, e.g. cluster-1/", GROUP_AWS,
0, ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG);

configDef.define(AWS_ACCESS_KEY_ID, ConfigDef.Type.PASSWORD, null, new NonEmptyPassword() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.platform.commons.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
Expand Down Expand Up @@ -253,7 +254,8 @@ void parquetTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);

final String partition = "00000";
final String fileName = topicName + "-" + partition + "-" + System.currentTimeMillis() + ".txt";
final String fileName = addPrefixOrDefault("") + topicName + "-" + partition + "-" + System.currentTimeMillis()
+ ".txt";
final String name = "testuser";

final Map<String, String> connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET);
Expand Down Expand Up @@ -337,13 +339,18 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId
}

private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) {
final String objectKey = topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt";
final String objectKey = addPrefixOrDefault("") + topicName + "-" + partitionId + "-"
+ System.currentTimeMillis() + ".txt";
final PutObjectRequest request = new PutObjectRequest(TEST_BUCKET_NAME, objectKey,
new ByteArrayInputStream(testDataBytes), new ObjectMetadata());
s3Client.putObject(request);
return OBJECT_KEY + SEPARATOR + objectKey;
}

private static String addPrefixOrDefault(final String defaultValue) {
return StringUtils.isNotBlank(s3Prefix) ? s3Prefix : defaultValue;
}

private Map<String, String> getConfig(final String connectorName, final String topics, final int maxTasks) {
final Map<String, String> config = new HashMap<>(basicS3ConnectorConfig());
config.put("name", connectorName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,17 @@ public Iterator<String> getListOfObjectKeys(final String startToken) {
if (StringUtils.isNotBlank(startToken)) {
request.withStartAfter(startToken);
}
// Prefix is optional so only use if supplied
if (StringUtils.isNotBlank(s3SourceConfig.getAwsS3Prefix())) {
request.withPrefix(s3SourceConfig.getAwsS3Prefix());
}

final Stream<String> s3ObjectKeyStream = Stream
.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
if (response.isTruncated()) {
return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
.withContinuationToken(response.getNextContinuationToken()));
return s3Client.listObjectsV2(
new ListObjectsV2Request().withContinuationToken(response.getNextContinuationToken()));
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.aiven.kafka.connect.s3.source.utils;

import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -40,13 +41,18 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;

class AWSV2SourceClientTest {

private AmazonS3 s3Client;

private AWSV2SourceClient awsv2SourceClient;

@Captor
ArgumentCaptor<ListObjectsV2Request> requestCaptor;

private static Map<String, String> getConfigMap(final int maxTasks, final int taskId) {
final Map<String, String> configMap = new HashMap<>();
configMap.put("tasks.max", String.valueOf(maxTasks));
Expand Down Expand Up @@ -131,6 +137,72 @@ void testFetchObjectSummariesWithPagination() throws IOException {
assertThat(summaries).isExhausted();
}

@Test
void testFetchObjectWithPrefix() {
final Map<String, String> configMap = getConfigMap(1, 0);
configMap.put(AWS_S3_PREFIX_CONFIG, "test/");
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(AmazonS3.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3ObjectSummary object1 = createObjectSummary(1, "key1");
final S3ObjectSummary object2 = createObjectSummary(1, "key2");

final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken");
final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null);

when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult);

final Iterator<String> summaries = awsv2SourceClient.getListOfObjectKeys(null);
verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class));

assertThat(summaries.next()).isNotNull();
assertThat(summaries.next()).isNotNull();

verify(s3Client, times(2)).listObjectsV2(requestCaptor.capture());
final List<ListObjectsV2Request> allRequests = requestCaptor.getAllValues();
assertThat(summaries).isExhausted();

assertThat(allRequests.get(0).getPrefix()).isEqualTo(s3SourceConfig.getAwsS3Prefix());
// Not required with continuation token
assertThat(allRequests.get(1).getPrefix()).isNull();
assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken");

}

@Test
void testFetchObjectWithInitialStartAfter() {
final Map<String, String> configMap = getConfigMap(1, 0);
final String startAfter = "file-option-1-12000.txt";
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(AmazonS3.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3ObjectSummary object1 = createObjectSummary(1, "key1");
final S3ObjectSummary object2 = createObjectSummary(1, "key2");

final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken");
final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null);

when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult);

final Iterator<String> summaries = awsv2SourceClient.getListOfObjectKeys(startAfter);
verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class));

assertThat(summaries.next()).isNotNull();
assertThat(summaries.next()).isNotNull();

verify(s3Client, times(2)).listObjectsV2(requestCaptor.capture());
final List<ListObjectsV2Request> allRequests = requestCaptor.getAllValues();
assertThat(summaries).isExhausted();

assertThat(allRequests.get(0).getStartAfter()).isEqualTo(startAfter);
// Not required with continuation token
assertThat(allRequests.get(1).getStartAfter()).isNull();
assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken");

}

private ListObjectsV2Result createListObjectsV2Result(final List<S3ObjectSummary> summaries,
final String nextToken) {
final ListObjectsV2Result result = mock(ListObjectsV2Result.class);
Expand Down

0 comments on commit 24df6ab

Please sign in to comment.