From 24df6aba071460254c10a9eb884f2ba266369885 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aindri=C3=BA=20Lavelle?= <121855584+aindriu-aiven@users.noreply.github.com> Date: Mon, 16 Dec 2024 08:43:33 +0000 Subject: [PATCH] Configure s3 api to use AWS Prefix (#370) * This update means we can now use the PREFIX in the AWS API allowing users to configure it to be more specific about what they want processed by the connector. --------- Signed-off-by: Aindriu Lavelle --- .../connect/config/s3/S3ConfigFragment.java | 8 +-- .../connect/s3/source/IntegrationTest.java | 11 ++- .../s3/source/utils/AWSV2SourceClient.java | 9 ++- .../source/utils/AWSV2SourceClientTest.java | 72 +++++++++++++++++++ 4 files changed, 91 insertions(+), 9 deletions(-) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 1e86638e1..1e87265b9 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -216,6 +216,10 @@ static void addAwsConfigGroup(final ConfigDef configDef) { ConfigDef.Importance.MEDIUM, "AWS S3 Region, e.g. us-east-1", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_S3_REGION_CONFIG); + configDef.define(AWS_S3_PREFIX_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), + ConfigDef.Importance.MEDIUM, "Prefix for stored objects, e.g. cluster-1/", GROUP_AWS, awsGroupCounter++, + ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG); + configDef.define(FETCH_PAGE_SIZE, ConfigDef.Type.INT, 10, ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, "AWS S3 Fetch page size", GROUP_AWS, awsGroupCounter++, // NOPMD // UnusedAssignment @@ -252,10 +256,6 @@ static void addAwsStsConfigGroup(final ConfigDef configDef) { } static void addDeprecatedConfiguration(final ConfigDef configDef) { - configDef.define(AWS_S3_PREFIX_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), - ConfigDef.Importance.MEDIUM, - "[Deprecated] Use `file.name.template` instead. Prefix for stored objects, e.g. cluster-1/", GROUP_AWS, - 0, ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG); configDef.define(AWS_ACCESS_KEY_ID, ConfigDef.Type.PASSWORD, null, new NonEmptyPassword() { @Override diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 3cd72f290..7f96842f3 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -77,6 +77,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.platform.commons.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -253,7 +254,8 @@ void parquetTest(final TestInfo testInfo) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); final String partition = "00000"; - final String fileName = topicName + "-" + partition + "-" + System.currentTimeMillis() + ".txt"; + final String fileName = addPrefixOrDefault("") + topicName + "-" + partition + "-" + System.currentTimeMillis() + + ".txt"; final String name = "testuser"; final Map connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET); @@ -337,13 +339,18 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId } private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) { - final String objectKey = topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; + final String objectKey = addPrefixOrDefault("") + topicName + "-" + partitionId + "-" + + System.currentTimeMillis() + ".txt"; final PutObjectRequest request = new PutObjectRequest(TEST_BUCKET_NAME, objectKey, new ByteArrayInputStream(testDataBytes), new ObjectMetadata()); s3Client.putObject(request); return OBJECT_KEY + SEPARATOR + objectKey; } + private static String addPrefixOrDefault(final String defaultValue) { + return StringUtils.isNotBlank(s3Prefix) ? s3Prefix : defaultValue; + } + private Map getConfig(final String connectorName, final String topics, final int maxTasks) { final Map config = new HashMap<>(basicS3ConnectorConfig()); config.put("name", connectorName); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index 1689ec9fa..1bbc477ee 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -85,14 +85,17 @@ public Iterator getListOfObjectKeys(final String startToken) { if (StringUtils.isNotBlank(startToken)) { request.withStartAfter(startToken); } + // Prefix is optional so only use if supplied + if (StringUtils.isNotBlank(s3SourceConfig.getAwsS3Prefix())) { + request.withPrefix(s3SourceConfig.getAwsS3Prefix()); + } final Stream s3ObjectKeyStream = Stream .iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { // This is called every time next() is called on the iterator. if (response.isTruncated()) { - return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName) - .withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) - .withContinuationToken(response.getNextContinuationToken())); + return s3Client.listObjectsV2( + new ListObjectsV2Request().withContinuationToken(response.getNextContinuationToken())); } else { return null; } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index 5b5176690..a8174a15c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.s3.source.utils; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -40,6 +41,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; class AWSV2SourceClientTest { @@ -47,6 +50,9 @@ class AWSV2SourceClientTest { private AWSV2SourceClient awsv2SourceClient; + @Captor + ArgumentCaptor requestCaptor; + private static Map getConfigMap(final int maxTasks, final int taskId) { final Map configMap = new HashMap<>(); configMap.put("tasks.max", String.valueOf(maxTasks)); @@ -131,6 +137,72 @@ void testFetchObjectSummariesWithPagination() throws IOException { assertThat(summaries).isExhausted(); } + @Test + void testFetchObjectWithPrefix() { + final Map configMap = getConfigMap(1, 0); + configMap.put(AWS_S3_PREFIX_CONFIG, "test/"); + final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); + s3Client = mock(AmazonS3.class); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); + final S3ObjectSummary object1 = createObjectSummary(1, "key1"); + final S3ObjectSummary object2 = createObjectSummary(1, "key2"); + + final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); + final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); + + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); + + final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); + verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class)); + + assertThat(summaries.next()).isNotNull(); + assertThat(summaries.next()).isNotNull(); + + verify(s3Client, times(2)).listObjectsV2(requestCaptor.capture()); + final List allRequests = requestCaptor.getAllValues(); + assertThat(summaries).isExhausted(); + + assertThat(allRequests.get(0).getPrefix()).isEqualTo(s3SourceConfig.getAwsS3Prefix()); + // Not required with continuation token + assertThat(allRequests.get(1).getPrefix()).isNull(); + assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken"); + + } + + @Test + void testFetchObjectWithInitialStartAfter() { + final Map configMap = getConfigMap(1, 0); + final String startAfter = "file-option-1-12000.txt"; + final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); + s3Client = mock(AmazonS3.class); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); + final S3ObjectSummary object1 = createObjectSummary(1, "key1"); + final S3ObjectSummary object2 = createObjectSummary(1, "key2"); + + final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); + final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); + + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); + + final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(startAfter); + verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class)); + + assertThat(summaries.next()).isNotNull(); + assertThat(summaries.next()).isNotNull(); + + verify(s3Client, times(2)).listObjectsV2(requestCaptor.capture()); + final List allRequests = requestCaptor.getAllValues(); + assertThat(summaries).isExhausted(); + + assertThat(allRequests.get(0).getStartAfter()).isEqualTo(startAfter); + // Not required with continuation token + assertThat(allRequests.get(1).getStartAfter()).isNull(); + assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken"); + + } + private ListObjectsV2Result createListObjectsV2Result(final List summaries, final String nextToken) { final ListObjectsV2Result result = mock(ListObjectsV2Result.class);