From b4475c978a4a88e1684e85a6901d1c08d0c0337e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aindri=C3=BA=20Lavelle?= <121855584+aindriu-aiven@users.noreply.github.com> Date: Mon, 30 Dec 2024 12:50:45 +0000 Subject: [PATCH] AWS SDK 2.X migration for source connector [KCON-84] (#374) The AWS 1.X sdk is in maintenance mode and will be out of support by December 2025. Key differences are * Use of the builder pattern when creating objects * get and set removed from getters and setters e.g. getKey(), setKey(newKey) -> key(), key(newKey) * S3Client is immutable * different package names * Additional built in functionality removing some of the work from the connector implementation and having the existing library handle it. SDK 1.X still in use by sink connector but that will be required to be updated as well in the future, but this means the s3-commons code has both the 1.x and 2.x jars. --------- Signed-off-by: Aindriu Lavelle --- s3-commons/build.gradle.kts | 3 + .../connect/config/s3/S3ConfigFragment.java | 41 +++- .../iam/AwsCredentialProviderFactory.java | 34 ++++ s3-source-connector/build.gradle.kts | 9 +- .../connect/s3/source/IntegrationBase.java | 24 +-- .../connect/s3/source/IntegrationTest.java | 16 +- .../kafka/connect/s3/source/S3SourceTask.java | 10 +- .../s3/source/config/S3ClientFactory.java | 62 +++--- .../s3/source/config/S3SourceConfig.java | 20 +- .../s3/source/utils/AWSV2SourceClient.java | 64 ++++--- .../s3/source/utils/SourceRecordIterator.java | 34 ++-- .../connect/s3/source/S3SourceTaskTest.java | 46 +++-- .../s3/source/config/S3SourceConfigTest.java | 11 +- .../s3/source/testutils/BucketAccessor.java | 71 ++++--- .../s3/source/testutils/S3OutputStream.java | 181 ------------------ .../source/utils/AWSV2SourceClientTest.java | 95 +++++---- .../utils/SourceRecordIteratorTest.java | 21 +- 17 files changed, 325 insertions(+), 417 deletions(-) delete mode 100644 s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java diff --git a/s3-commons/build.gradle.kts b/s3-commons/build.gradle.kts index 0e3d825aa..5e54c05ef 100644 --- a/s3-commons/build.gradle.kts +++ b/s3-commons/build.gradle.kts @@ -18,10 +18,13 @@ plugins { id("aiven-apache-kafka-connectors-all.java-conventions") } val amazonS3Version by extra("1.12.777") val amazonSTSVersion by extra("1.12.777") +val amazonV2Version by extra("2.29.34") dependencies { implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version") implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion") + implementation("software.amazon.awssdk:auth:$amazonV2Version") + implementation("software.amazon.awssdk:sts:$amazonV2Version") implementation(project(":commons")) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 1e87265b9..2ece623bf 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -41,11 +41,13 @@ import com.amazonaws.services.s3.internal.BucketNameUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; /** * The configuration fragment that defines the S3 specific characteristics. */ -@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports" }) +@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports", "PMD.GodClass" }) public final class S3ConfigFragment extends ConfigFragment { private static final Logger LOGGER = LoggerFactory.getLogger(S3ConfigFragment.class); @@ -345,7 +347,8 @@ public void validateCredentials() { } } else { final BasicAWSCredentials awsCredentials = getAwsCredentials(); - if (awsCredentials == null) { + final AwsBasicCredentials awsCredentialsV2 = getAwsCredentialsV2(); + if (awsCredentials == null && awsCredentialsV2 == null) { LOGGER.info( "Connector use {} as credential Provider, " + "when configuration for {{}, {}} OR {{}, {}} are absent", @@ -410,11 +413,13 @@ public AwsStsEndpointConfig getStsEndpointConfig() { return new AwsStsEndpointConfig(cfg.getString(AWS_STS_CONFIG_ENDPOINT), cfg.getString(AWS_S3_REGION_CONFIG)); } + @Deprecated public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() { final AwsStsEndpointConfig config = getStsEndpointConfig(); return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion()); } + @Deprecated public BasicAWSCredentials getAwsCredentials() { if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG)) && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { @@ -430,12 +435,26 @@ public BasicAWSCredentials getAwsCredentials() { return null; } + public AwsBasicCredentials getAwsCredentialsV2() { + if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG)) + && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { + + return AwsBasicCredentials.create(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(), + cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value()); + } else if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID)) + && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY))) { + LOGGER.warn("Config options {} and {} are not supported for this Connector", AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY); + } + return null; + } + public String getAwsS3EndPoint() { return Objects.nonNull(cfg.getString(AWS_S3_ENDPOINT_CONFIG)) ? cfg.getString(AWS_S3_ENDPOINT_CONFIG) : cfg.getString(AWS_S3_ENDPOINT); } - + @Deprecated public Region getAwsS3Region() { // we have priority of properties if old one not set or both old and new one set // the new property value will be selected @@ -448,6 +467,18 @@ public Region getAwsS3Region() { } } + public software.amazon.awssdk.regions.Region getAwsS3RegionV2() { + // we have priority of properties if old one not set or both old and new one set + // the new property value will be selected + if (Objects.nonNull(cfg.getString(AWS_S3_REGION_CONFIG))) { + return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION_CONFIG)); + } else if (Objects.nonNull(cfg.getString(AWS_S3_REGION))) { + return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION)); + } else { + return software.amazon.awssdk.regions.Region.of(Regions.US_EAST_1.getName()); + } + } + public String getAwsS3BucketName() { return Objects.nonNull(cfg.getString(AWS_S3_BUCKET_NAME_CONFIG)) ? cfg.getString(AWS_S3_BUCKET_NAME_CONFIG) @@ -484,6 +515,10 @@ public AWSCredentialsProvider getCustomCredentialsProvider() { return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class); } + public AwsCredentialsProvider getCustomCredentialsProviderV2() { + return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AwsCredentialsProvider.class); + } + public int getFetchPageSize() { return cfg.getInt(FETCH_PAGE_SIZE); } diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java index 2a5089726..167d872a7 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java @@ -26,6 +26,11 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; public class AwsCredentialProviderFactory { @@ -58,4 +63,33 @@ private AWSSecurityTokenService securityTokenService(final S3ConfigFragment conf } return AWSSecurityTokenServiceClientBuilder.defaultClient(); } + + public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) { + + if (config.hasAwsStsRole()) { + return getV2StsProvider(config); + } + final AwsBasicCredentials awsCredentials = config.getAwsCredentialsV2(); + if (Objects.isNull(awsCredentials)) { + return config.getCustomCredentialsProviderV2(); + } + return StaticCredentialsProvider.create(awsCredentials); + + } + + private StsAssumeRoleCredentialsProvider getV2StsProvider(final S3ConfigFragment config) { + if (config.hasAwsStsRole()) { + return StsAssumeRoleCredentialsProvider.builder() + .refreshRequest(() -> AssumeRoleRequest.builder() + .roleArn(config.getStsRole().getArn()) + // Maker this a unique identifier + .roleSessionName("AwsV2SDKConnectorSession") + .build()) + .build(); + } + + return StsAssumeRoleCredentialsProvider.builder().build(); + + } + } diff --git a/s3-source-connector/build.gradle.kts b/s3-source-connector/build.gradle.kts index 3530724e0..20d5a3b82 100644 --- a/s3-source-connector/build.gradle.kts +++ b/s3-source-connector/build.gradle.kts @@ -18,8 +18,8 @@ import com.github.spotbugs.snom.SpotBugsTask plugins { id("aiven-apache-kafka-connectors-all.java-conventions") } -val amazonS3Version by extra("1.12.729") -val amazonSTSVersion by extra("1.12.729") +val amazonS3Version by extra("2.29.34") +val amazonSTSVersion by extra("2.29.34") val s3mockVersion by extra("0.2.6") val testKafkaVersion by extra("3.7.1") @@ -67,8 +67,8 @@ dependencies { implementation(project(":commons")) implementation(project(":s3-commons")) - implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version") - implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion") + implementation("software.amazon.awssdk:s3:$amazonS3Version") + implementation("software.amazon.awssdk:sts:$amazonSTSVersion") implementation(tools.spotbugs.annotations) implementation(logginglibs.slf4j) @@ -154,7 +154,6 @@ dependencies { exclude(group = "org.apache.commons", module = "commons-math3") exclude(group = "org.apache.httpcomponents", module = "httpclient") exclude(group = "commons-codec", module = "commons-codec") - exclude(group = "commons-io", module = "commons-io") exclude(group = "commons-net", module = "commons-net") exclude(group = "org.eclipse.jetty") exclude(group = "org.eclipse.jetty.websocket") diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 9ce09172b..6b505b996 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.net.ServerSocket; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; @@ -47,11 +48,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.json.JsonDeserializer; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,6 +57,10 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; public interface IntegrationBase { String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; @@ -101,13 +101,13 @@ static void waitForRunningContainer(final Container container) { await().atMost(Duration.ofMinutes(1)).until(container::isRunning); } - static AmazonS3 createS3Client(final LocalStackContainer localStackContainer) { - return AmazonS3ClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString(), - localStackContainer.getRegion())) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials( - localStackContainer.getAccessKey(), localStackContainer.getSecretKey()))) + static S3Client createS3Client(final LocalStackContainer localStackContainer) { + return S3Client.builder() + .endpointOverride( + URI.create(localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .region(Region.of(localStackContainer.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials + .create(localStackContainer.getAccessKey(), localStackContainer.getSecretKey()))) .build(); } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 7f96842f3..884051e30 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -33,7 +33,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -62,9 +61,6 @@ import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.testutils.ContentUtils; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; import com.fasterxml.jackson.databind.JsonNode; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; @@ -83,6 +79,9 @@ import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; @Testcontainers @SuppressWarnings("PMD.ExcessiveImports") @@ -111,7 +110,7 @@ final class IntegrationTest implements IntegrationBase { private AdminClient adminClient; private ConnectRunner connectRunner; - private static AmazonS3 s3Client; + private static S3Client s3Client; @BeforeAll static void setUpAll() throws IOException, InterruptedException { @@ -263,7 +262,7 @@ void parquetTest(final TestInfo testInfo) throws IOException { final Path path = ContentUtils.getTmpFilePath(name); try { - s3Client.putObject(TEST_BUCKET_NAME, fileName, Files.newInputStream(path), null); + s3Client.putObject(PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(fileName).build(), path); } catch (final Exception e) { // NOPMD broad exception caught LOGGER.error("Error in reading file {}", e.getMessage(), e); } finally { @@ -341,9 +340,8 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) { final String objectKey = addPrefixOrDefault("") + topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; - final PutObjectRequest request = new PutObjectRequest(TEST_BUCKET_NAME, objectKey, - new ByteArrayInputStream(testDataBytes), new ObjectMetadata()); - s3Client.putObject(request); + final PutObjectRequest request = PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(objectKey).build(); + s3Client.putObject(request, RequestBody.fromBytes(testDataBytes)); return OBJECT_KEY + SEPARATOR + objectKey; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index aa331b4aa..320fa19cb 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -40,10 +40,10 @@ import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import io.aiven.kafka.connect.s3.source.utils.Version; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AmazonS3Exception; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; /** * S3SourceTask is a Kafka Connect SourceTask implementation that reads from source-s3 buckets and generates Kafka @@ -64,7 +64,7 @@ public class S3SourceTask extends SourceTask { private static final long ERROR_BACKOFF = 1000L; private S3SourceConfig s3SourceConfig; - private AmazonS3 s3Client; + private S3Client s3Client; private Iterator sourceRecordIterator; private Transformer transformer; @@ -122,8 +122,8 @@ public List poll() throws InterruptedException { extractSourceRecords(results); LOGGER.info("Number of records extracted and sent: {}", results.size()); return results; - } catch (AmazonS3Exception exception) { - if (exception.isRetryable()) { + } catch (SdkException exception) { + if (exception.retryable()) { LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...", exception); pollLock.wait(ERROR_BACKOFF); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java index 346ec5825..13ff4d690 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java @@ -16,45 +16,51 @@ package io.aiven.kafka.connect.s3.source.config; +import java.net.URI; +import java.time.Duration; import java.util.Objects; +import java.util.Random; import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; -import com.amazonaws.PredefinedClientConfigurations; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.retry.PredefinedBackoffStrategies; -import com.amazonaws.retry.PredefinedRetryPolicies; -import com.amazonaws.retry.RetryPolicy; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.retries.api.internal.backoff.ExponentialDelayWithJitter; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; public class S3ClientFactory { private final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); - public AmazonS3 createAmazonS3Client(final S3SourceConfig config) { - final var awsEndpointConfig = newEndpointConfiguration(config); - final var clientConfig = PredefinedClientConfigurations.defaultConfig() - .withRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, - new PredefinedBackoffStrategies.FullJitterBackoffStrategy( - Math.toIntExact(config.getS3RetryBackoffDelayMs()), - Math.toIntExact(config.getS3RetryBackoffMaxDelayMs())), - config.getS3RetryBackoffMaxRetries(), false)); - final var s3ClientBuilder = AmazonS3ClientBuilder.standard() - .withCredentials(credentialFactory.getProvider(config.getS3ConfigFragment())) - .withClientConfiguration(clientConfig); - if (Objects.isNull(awsEndpointConfig)) { - s3ClientBuilder.withRegion(config.getAwsS3Region().getName()); - } else { - s3ClientBuilder.withEndpointConfiguration(awsEndpointConfig).withPathStyleAccessEnabled(true); - } - return s3ClientBuilder.build(); - } + public S3Client createAmazonS3Client(final S3SourceConfig config) { + + final ExponentialDelayWithJitter backoffStrategy = new ExponentialDelayWithJitter(Random::new, + Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffDelayMs())), + Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffMaxDelayMs()))); - private AwsClientBuilder.EndpointConfiguration newEndpointConfiguration(final S3SourceConfig config) { + final ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder() + .retryStrategy(RetryMode.STANDARD) + .build(); if (Objects.isNull(config.getAwsS3EndPoint())) { - return null; + return S3Client.builder() + .overrideConfiguration(clientOverrideConfiguration) + .overrideConfiguration(o -> o.retryStrategy( + r -> r.backoffStrategy(backoffStrategy).maxAttempts(config.getS3RetryBackoffMaxRetries()))) + .region(config.getAwsS3Region()) + .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) + .build(); + } else { + // TODO This is definitely used for testing but not sure if customers use it. + return S3Client.builder() + .overrideConfiguration(clientOverrideConfiguration) + .region(config.getAwsS3Region()) + .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) + .endpointOverride(URI.create(config.getAwsS3EndPoint())) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build(); } - return new AwsClientBuilder.EndpointConfiguration(config.getAwsS3EndPoint(), config.getAwsS3Region().getName()); + } + } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index 68b9b2f98..23dc69e9e 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -32,12 +32,10 @@ import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; import io.aiven.kafka.connect.iam.AwsStsRole; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.Region; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.regions.Region; final public class S3SourceConfig extends SourceCommonConfig { @@ -87,12 +85,8 @@ public AwsStsEndpointConfig getStsEndpointConfig() { return s3ConfigFragment.getStsEndpointConfig(); } - public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() { - return s3ConfigFragment.getAwsEndpointConfiguration(); - } - - public BasicAWSCredentials getAwsCredentials() { - return s3ConfigFragment.getAwsCredentials(); + public AwsBasicCredentials getAwsCredentials() { + return s3ConfigFragment.getAwsCredentialsV2(); } public String getAwsS3EndPoint() { @@ -100,7 +94,7 @@ public String getAwsS3EndPoint() { } public Region getAwsS3Region() { - return s3ConfigFragment.getAwsS3Region(); + return s3ConfigFragment.getAwsS3RegionV2(); } public String getAwsS3BucketName() { @@ -131,10 +125,6 @@ public int getS3RetryBackoffMaxRetries() { return s3ConfigFragment.getS3RetryBackoffMaxRetries(); } - public AWSCredentialsProvider getCustomCredentialsProvider() { - return s3ConfigFragment.getCustomCredentialsProvider(); - } - public S3ConfigFragment getS3ConfigFragment() { return s3ConfigFragment; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index 1bbc477ee..44e28dfa7 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source.utils; +import java.io.InputStream; import java.util.HashSet; import java.util.Iterator; import java.util.Objects; @@ -26,11 +27,14 @@ import io.aiven.kafka.connect.s3.source.config.S3ClientFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.commons.io.function.IOSupplier; import org.codehaus.plexus.util.StringUtils; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.S3Object; /** * Called AWSV2SourceClient as this source client implements the V2 version of the aws client library. Handles all calls @@ -40,10 +44,10 @@ public class AWSV2SourceClient { public static final int PAGE_SIZE_FACTOR = 2; private final S3SourceConfig s3SourceConfig; - private final AmazonS3 s3Client; + private final S3Client s3Client; private final String bucketName; - private Predicate filterPredicate = summary -> summary.getSize() > 0; + private Predicate filterPredicate = s3Object -> s3Object.size() > 0; private final Set failedObjectKeys; /** @@ -70,7 +74,7 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set * @param failedObjectKeys * all objectKeys which have already been tried but have been unable to process. */ - AWSV2SourceClient(final AmazonS3 s3Client, final S3SourceConfig s3SourceConfig, + AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, final Set failedObjectKeys) { this.s3SourceConfig = s3SourceConfig; this.s3Client = s3Client; @@ -79,46 +83,52 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set } public Iterator getListOfObjectKeys(final String startToken) { - final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) - .withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR); - - if (StringUtils.isNotBlank(startToken)) { - request.withStartAfter(startToken); - } - // Prefix is optional so only use if supplied - if (StringUtils.isNotBlank(s3SourceConfig.getAwsS3Prefix())) { - request.withPrefix(s3SourceConfig.getAwsS3Prefix()); - } + final ListObjectsV2Request request = ListObjectsV2Request.builder() + .bucket(bucketName) + .maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) + .prefix(optionalKey(s3SourceConfig.getAwsS3Prefix())) + .startAfter(optionalKey(startToken)) + .build(); final Stream s3ObjectKeyStream = Stream .iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { // This is called every time next() is called on the iterator. if (response.isTruncated()) { - return s3Client.listObjectsV2( - new ListObjectsV2Request().withContinuationToken(response.getNextContinuationToken())); + return s3Client.listObjectsV2(ListObjectsV2Request.builder() + .maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) + .continuationToken(response.nextContinuationToken()) + .build()); } else { return null; } }) - .flatMap(response -> response.getObjectSummaries() + .flatMap(response -> response.contents() .stream() .filter(filterPredicate) - .filter(objectSummary -> assignObjectToTask(objectSummary.getKey())) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))) - .map(S3ObjectSummary::getKey); + .filter(objectSummary -> assignObjectToTask(objectSummary.key())) + .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key()))) + .map(S3Object::key); return s3ObjectKeyStream.iterator(); } + private String optionalKey(final String key) { + if (StringUtils.isNotBlank(key)) { + return key; + } + return null; + } - public S3Object getObject(final String objectKey) { - return s3Client.getObject(bucketName, objectKey); + public IOSupplier getObject(final String objectKey) { + final GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucketName).key(objectKey).build(); + final ResponseBytes s3ObjectResponse = s3Client.getObjectAsBytes(getObjectRequest); + return s3ObjectResponse::asInputStream; } public void addFailedObjectKeys(final String objectKey) { this.failedObjectKeys.add(objectKey); } - public void setFilterPredicate(final Predicate predicate) { + public void setFilterPredicate(final Predicate predicate) { filterPredicate = predicate; } @@ -130,7 +140,7 @@ private boolean assignObjectToTask(final String objectKey) { } public void shutdown() { - s3Client.shutdown(); + s3Client.close(); } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index ac5a3061a..26f3c03cf 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.s3.source.utils; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -32,10 +33,10 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.model.S3Object; +import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; /** * Iterator that processes S3 files and creates Kafka source records. Supports different output formats (Avro, JSON, @@ -91,7 +92,7 @@ private void nextS3Object() { recordIterator = createIteratorForCurrentFile(); } } catch (IOException e) { - throw new AmazonClientException(e); + throw SdkException.create(e.getMessage(), e.getCause()); } } @@ -103,20 +104,20 @@ private Iterator createIteratorForCurrentFile() throws IOExcepti if (fileMatcher.find()) { // TODO move this from the SourceRecordIterator so that we can decouple it from S3 and make it API agnostic - try (S3Object s3Object = sourceClient.getObject(currentObjectKey);) { - topicName = fileMatcher.group(PATTERN_TOPIC_KEY); - defaultPartitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); + final IOSupplier s3Object = sourceClient.getObject(currentObjectKey); + topicName = fileMatcher.group(PATTERN_TOPIC_KEY); + defaultPartitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); - final long defaultStartOffsetId = 1L; + final long defaultStartOffsetId = 1L; - final String finalTopic = topicName; - final Map partitionMap = ConnectUtils.getPartitionMap(topicName, defaultPartitionId, - bucketName); + final String finalTopic = topicName; + final Map partitionMap = ConnectUtils.getPartitionMap(topicName, defaultPartitionId, + bucketName); + + return getObjectIterator(s3Object, finalTopic, defaultPartitionId, defaultStartOffsetId, transformer, + partitionMap); - return getObjectIterator(s3Object, finalTopic, defaultPartitionId, defaultStartOffsetId, transformer, - partitionMap); - } } else { LOGGER.error("File naming doesn't match to any topic. {}", currentObjectKey); return Collections.emptyIterator(); @@ -124,7 +125,7 @@ private Iterator createIteratorForCurrentFile() throws IOExcepti } @SuppressWarnings("PMD.CognitiveComplexity") - private Iterator getObjectIterator(final S3Object s3Object, final String topic, + private Iterator getObjectIterator(final IOSupplier s3Object, final String topic, final int topicPartition, final long startOffset, final Transformer transformer, final Map partitionMap) { return new Iterator<>() { @@ -142,8 +143,9 @@ private List readNext() { return sourceRecords; } - try (Stream recordStream = transformer.getRecords(s3Object::getObjectContent, topic, - topicPartition, s3SourceConfig, numberOfRecsAlreadyProcessed)) { + try (Stream recordStream = transformer.getRecords(s3Object, topic, topicPartition, + s3SourceConfig, numberOfRecsAlreadyProcessed)) { + final Iterator recordIterator = recordStream.iterator(); while (recordIterator.hasNext()) { final Object record = recordIterator.next(); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 590ad23bb..13ac66844 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.when; import java.lang.reflect.Field; +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,15 +41,12 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; +import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; +import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import io.findify.s3mock.S3Mock; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -57,6 +56,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; @ExtendWith(MockitoExtension.class) final class S3SourceTaskTest { @@ -66,9 +69,10 @@ final class S3SourceTaskTest { private static BucketAccessor testBucketAccessor; private static final String TEST_BUCKET = "test-bucket"; - + // TODO S3Mock has not been maintained in 4 years + // Adobe have an alternative we can move to. private static S3Mock s3Api; - private static AmazonS3 s3Client; + private static S3Client s3Client; private static Map commonProperties; @@ -79,7 +83,7 @@ final class S3SourceTaskTest { private OffsetStorageReader mockedOffsetStorageReader; @BeforeAll - public static void setUpClass() { + public static void setUpClass() throws URISyntaxException { final int s3Port = RANDOM.nextInt(10_000) + 10_000; s3Api = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); @@ -90,17 +94,19 @@ public static void setUpClass() { S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET, S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG, "http://localhost:" + s3Port, S3ConfigFragment.AWS_S3_REGION_CONFIG, "us-west-2"); - final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); - final BasicAWSCredentials awsCreds = new BasicAWSCredentials( - commonProperties.get(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG), - commonProperties.get(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG)); - builder.withCredentials(new AWSStaticCredentialsProvider(awsCreds)); - builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - commonProperties.get(S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG), - commonProperties.get(S3ConfigFragment.AWS_S3_REGION_CONFIG))); - builder.withPathStyleAccessEnabled(true); + final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); + final S3SourceConfig config = new S3SourceConfig(commonProperties); + final ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder() + .retryStrategy(RetryMode.STANDARD) + .build(); - s3Client = builder.build(); + s3Client = S3Client.builder() + .overrideConfiguration(clientOverrideConfiguration) + .region(config.getAwsS3Region()) + .endpointOverride(URI.create(config.getAwsS3EndPoint())) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) + .build(); testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET); testBucketAccessor.createBucket(); @@ -114,14 +120,14 @@ public static void tearDownClass() { @BeforeEach public void setUp() { properties = new HashMap<>(commonProperties); - s3Client.createBucket(TEST_BUCKET); + s3Client.createBucket(create -> create.bucket(TEST_BUCKET).build()); mockedSourceTaskContext = mock(SourceTaskContext.class); mockedOffsetStorageReader = mock(OffsetStorageReader.class); } @AfterEach public void tearDown() { - s3Client.deleteBucket(TEST_BUCKET); + s3Client.deleteBucket(delete -> delete.bucket(TEST_BUCKET).build()); } @Test diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index edbe8dc98..10939c511 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -27,9 +27,8 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import com.amazonaws.regions.RegionUtils; -import com.amazonaws.regions.Regions; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; final class S3SourceConfigTest { @Test @@ -42,7 +41,7 @@ void correctFullConfig() { props.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket"); props.put(S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG, "AWS_S3_ENDPOINT"); props.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "AWS_S3_PREFIX"); - props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); + props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Region.US_EAST_1.id()); // record, topic specific props props.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); @@ -53,11 +52,11 @@ void correctFullConfig() { final var conf = new S3SourceConfig(props); final var awsCredentials = conf.getAwsCredentials(); - assertThat(awsCredentials.getAWSAccessKeyId()).isEqualTo("AWS_ACCESS_KEY_ID"); - assertThat(awsCredentials.getAWSSecretKey()).isEqualTo("AWS_SECRET_ACCESS_KEY"); + assertThat(awsCredentials.accessKeyId()).isEqualTo("AWS_ACCESS_KEY_ID"); + assertThat(awsCredentials.secretAccessKey()).isEqualTo("AWS_SECRET_ACCESS_KEY"); assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket"); assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT"); - assertThat(conf.getAwsS3Region()).isEqualTo(RegionUtils.getRegion("us-east-1")); + assertThat(conf.getAwsS3Region()).isEqualTo(Region.of("us-east-1")); assertThat(conf.getInputFormat()).isEqualTo(InputFormat.AVRO); assertThat(conf.getTargetTopics()).isEqualTo("testtopic"); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java index 212088560..8b34f73d0 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java @@ -32,58 +32,71 @@ import io.aiven.kafka.connect.common.config.CompressionType; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.MultiObjectDeleteException; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.github.luben.zstd.ZstdInputStream; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xerial.snappy.SnappyInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; public class BucketAccessor { private final String bucketName; - private final AmazonS3 s3Client; + private final S3Client s3Client; private static final Logger LOGGER = LoggerFactory.getLogger(BucketAccessor.class); @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "stores mutable s3Client object") - public BucketAccessor(final AmazonS3 s3Client, final String bucketName) { + public BucketAccessor(final S3Client s3Client, final String bucketName) { this.bucketName = bucketName; this.s3Client = s3Client; } public final void createBucket() { - s3Client.createBucket(bucketName); + s3Client.createBucket(builder -> builder.bucket(bucketName).build()); } public final void removeBucket() { - final var chunk = s3Client.listObjects(bucketName) - .getObjectSummaries() + final var deleteIds = s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucketName).build()) + .contents() .stream() - .map(S3ObjectSummary::getKey) - .toArray(String[]::new); + .map(S3Object::key) + .map(key -> ObjectIdentifier.builder().key(key).build()) + .collect(Collectors.toList()); - final var deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(chunk); try { - s3Client.deleteObjects(deleteObjectsRequest); - } catch (final MultiObjectDeleteException e) { - for (final var err : e.getErrors()) { - LOGGER.warn(String.format("Couldn't delete object: %s. Reason: [%s] %s", err.getKey(), err.getCode(), - err.getMessage())); - } - } catch (final AmazonClientException e) { - LOGGER.error("Couldn't delete objects: {}", - Arrays.stream(chunk).reduce(" ", String::concat) + e.getMessage()); + s3Client.deleteObjects(DeleteObjectsRequest.builder() + .bucket(bucketName) + .delete(Delete.builder().objects(deleteIds).build()) + .build()); + } catch (final S3Exception e) { + LOGGER.warn( + String.format("Couldn't delete objects. Reason: [%s] %s", e.awsErrorDetails().errorMessage(), e)); + } catch (final SdkException e) { + + LOGGER.error("Couldn't delete objects: {}, Exception{} ", deleteIds, e.getMessage()); } - s3Client.deleteBucket(bucketName); + s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()); } + // TODO NOT Currently used public final Boolean doesObjectExist(final String objectName) { - return s3Client.doesObjectExist(bucketName, objectName); + try { + s3Client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(objectName).build()); + return true; + } catch (NoSuchKeyException e) { + return false; + } } public final List> readAndDecodeLines(final String blobName, final String compression, @@ -104,7 +117,8 @@ private List> readAndDecodeLines0(final String blobName, final Stri public final byte[] readBytes(final String blobName, final String compression) throws IOException { Objects.requireNonNull(blobName, "blobName cannot be null"); - final byte[] blobBytes = s3Client.getObject(bucketName, blobName).getObjectContent().readAllBytes(); + final byte[] blobBytes = s3Client.getObjectAsBytes(builder -> builder.key(blobName).bucket(bucketName).build()) + .asByteArray(); try (ByteArrayInputStream bais = new ByteArrayInputStream(blobBytes); InputStream decompressedStream = getDecompressedStream(bais, compression); ByteArrayOutputStream decompressedBytes = new ByteArrayOutputStream()) { @@ -135,10 +149,11 @@ public final List readLines(final String blobName, final String compress } public final List listObjects() { - return s3Client.listObjects(bucketName) - .getObjectSummaries() + + return s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucketName).build()) + .contents() .stream() - .map(S3ObjectSummary::getKey) + .map(S3Object::key) .collect(Collectors.toList()); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java deleted file mode 100644 index 4d33e46c5..000000000 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright 2020 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.testutils; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.UploadPartRequest; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class S3OutputStream extends OutputStream { - - private final Logger logger = LoggerFactory.getLogger(S3OutputStream.class); - - public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024; // 5 MB - - private final AmazonS3 client; - - private final ByteBuffer byteBuffer; - - private final String bucketName; - - private final String key; - - private MultipartUpload multipartUpload; - - private final int partSize; - - private final String serverSideEncryptionAlgorithm; - - private boolean closed; - - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "AmazonS3 client is mutable") - public S3OutputStream(final String bucketName, final String key, final int partSize, final AmazonS3 client) { - this(bucketName, key, partSize, client, null); - } - - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "AmazonS3 client is mutable") - public S3OutputStream(final String bucketName, final String key, final int partSize, final AmazonS3 client, - final String serverSideEncryptionAlgorithm) { - super(); - this.bucketName = bucketName; - this.key = key; - this.client = client; - this.partSize = partSize; - this.byteBuffer = ByteBuffer.allocate(partSize); - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; - } - - @Override - public void write(final int singleByte) throws IOException { - write(new byte[] { (byte) singleByte }, 0, 1); - } - - @Override - public void write(final byte[] bytes, final int off, final int len) throws IOException { - if (Objects.isNull(bytes) || bytes.length == 0) { - return; - } - if (Objects.isNull(multipartUpload)) { - multipartUpload = newMultipartUpload(); - } - final var source = ByteBuffer.wrap(bytes, off, len); - while (source.hasRemaining()) { - final var transferred = Math.min(byteBuffer.remaining(), source.remaining()); - final var offset = source.arrayOffset() + source.position(); - byteBuffer.put(source.array(), offset, transferred); - source.position(source.position() + transferred); - if (!byteBuffer.hasRemaining()) { - flushBuffer(0, partSize, partSize); - } - } - } - - private MultipartUpload newMultipartUpload() throws IOException { - logger.debug("Create new multipart upload request"); - final var initialRequest = new InitiateMultipartUploadRequest(bucketName, key); - initialRequest.setObjectMetadata(this.buildObjectMetadata()); - final var initiateResult = client.initiateMultipartUpload(initialRequest); - logger.debug("Upload ID: {}", initiateResult.getUploadId()); - return new MultipartUpload(initiateResult.getUploadId()); - } - - private ObjectMetadata buildObjectMetadata() { - final ObjectMetadata metadata = new ObjectMetadata(); - - if (this.serverSideEncryptionAlgorithm != null) { - metadata.setSSEAlgorithm(this.serverSideEncryptionAlgorithm); - } - - return metadata; - } - - @Override - public void close() throws IOException { - if (closed) { - return; - } - if (byteBuffer.position() > 0 && Objects.nonNull(multipartUpload)) { - flushBuffer(byteBuffer.arrayOffset(), byteBuffer.position(), byteBuffer.position()); - } - if (Objects.nonNull(multipartUpload)) { - multipartUpload.complete(); - multipartUpload = null; // NOPMD NullAssignment - } - closed = true; - super.close(); - } - - private void flushBuffer(final int offset, final int length, final int partSize) throws IOException { - try { - multipartUpload.uploadPart(new ByteArrayInputStream(byteBuffer.array(), offset, length), partSize); - byteBuffer.clear(); - } catch (final Exception e) { // NOPMD AvoidCatchingGenericException - multipartUpload.abort(); - multipartUpload = null; // NOPMD NullAssignment - throw new IOException(e); - } - } - - private class MultipartUpload { - - private final String uploadId; - - private final List partETags = new ArrayList<>(); - - public MultipartUpload(final String uploadId) { - this.uploadId = uploadId; - } - - public void uploadPart(final InputStream inputStream, final int partSize) throws IOException { - final var partNumber = partETags.size() + 1; - final var uploadPartRequest = new UploadPartRequest().withBucketName(bucketName) - .withKey(key) - .withUploadId(uploadId) - .withPartSize(partSize) - .withPartNumber(partNumber) - .withInputStream(inputStream); - final var uploadResult = client.uploadPart(uploadPartRequest); - partETags.add(uploadResult.getPartETag()); - } - - public void complete() { - client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags)); - } - - public void abort() { - client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId)); - } - - } - -} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index a8174a15c..beed0681c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -34,19 +34,19 @@ import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.S3ObjectSummary; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; class AWSV2SourceClientTest { - private AmazonS3 s3Client; + private S3Client s3Client; private AWSV2SourceClient awsv2SourceClient; @@ -66,8 +66,8 @@ private static Map getConfigMap(final int maxTasks, final int ta @CsvSource({ "3, 1" }) void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) { initializeWithTaskConfigs(maxTasks, taskId); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result(Collections.emptyList(), null); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); + final ListObjectsV2Response listObjectsV2Response = createListObjectsV2Response(Collections.emptyList(), null); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); assertThat(summaries).isExhausted(); @@ -107,8 +107,8 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final @CsvSource({ "4, 3", "4, 0" }) void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int taskId) { initializeWithTaskConfigs(maxTasks, taskId); - final ListObjectsV2Result listObjectsV2Result = getListObjectsV2Result(); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); + final ListObjectsV2Response listObjectsV2Response = getListObjectsV2Response(); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); @@ -121,13 +121,13 @@ void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int ta @Test void testFetchObjectSummariesWithPagination() throws IOException { initializeWithTaskConfigs(4, 3); - final S3ObjectSummary object1 = createObjectSummary(1, "key1"); - final S3ObjectSummary object2 = createObjectSummary(2, "key2"); - final List firstBatch = List.of(object1); - final List secondBatch = List.of(object2); + final S3Object object1 = createObjectSummary(1, "key1"); + final S3Object object2 = createObjectSummary(2, "key2"); + final List firstBatch = List.of(object1); + final List secondBatch = List.of(object2); - final ListObjectsV2Result firstResult = createListObjectsV2Result(firstBatch, "nextToken"); - final ListObjectsV2Result secondResult = createListObjectsV2Result(secondBatch, null); + final ListObjectsV2Response firstResult = createListObjectsV2Response(firstBatch, "nextToken"); + final ListObjectsV2Response secondResult = createListObjectsV2Response(secondBatch, null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); @@ -142,14 +142,14 @@ void testFetchObjectWithPrefix() { final Map configMap = getConfigMap(1, 0); configMap.put(AWS_S3_PREFIX_CONFIG, "test/"); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - s3Client = mock(AmazonS3.class); + s3Client = mock(S3Client.class); awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3ObjectSummary object1 = createObjectSummary(1, "key1"); - final S3ObjectSummary object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "key1"); + final S3Object object2 = createObjectSummary(1, "key2"); - final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); - final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); + final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); + final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); @@ -163,10 +163,10 @@ void testFetchObjectWithPrefix() { final List allRequests = requestCaptor.getAllValues(); assertThat(summaries).isExhausted(); - assertThat(allRequests.get(0).getPrefix()).isEqualTo(s3SourceConfig.getAwsS3Prefix()); + assertThat(allRequests.get(0).prefix()).isEqualTo(s3SourceConfig.getAwsS3Prefix()); // Not required with continuation token - assertThat(allRequests.get(1).getPrefix()).isNull(); - assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken"); + assertThat(allRequests.get(1).prefix()).isNull(); + assertThat(allRequests.get(1).continuationToken()).isEqualTo("nextToken"); } @@ -175,14 +175,14 @@ void testFetchObjectWithInitialStartAfter() { final Map configMap = getConfigMap(1, 0); final String startAfter = "file-option-1-12000.txt"; final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - s3Client = mock(AmazonS3.class); + s3Client = mock(S3Client.class); awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3ObjectSummary object1 = createObjectSummary(1, "key1"); - final S3ObjectSummary object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "key1"); + final S3Object object2 = createObjectSummary(1, "key2"); - final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); - final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); + final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); + final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); @@ -196,32 +196,31 @@ void testFetchObjectWithInitialStartAfter() { final List allRequests = requestCaptor.getAllValues(); assertThat(summaries).isExhausted(); - assertThat(allRequests.get(0).getStartAfter()).isEqualTo(startAfter); + assertThat(allRequests.get(0).startAfter()).isEqualTo(startAfter); // Not required with continuation token - assertThat(allRequests.get(1).getStartAfter()).isNull(); - assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken"); + assertThat(allRequests.get(1).startAfter()).isNull(); + assertThat(allRequests.get(1).continuationToken()).isEqualTo("nextToken"); } - private ListObjectsV2Result createListObjectsV2Result(final List summaries, - final String nextToken) { - final ListObjectsV2Result result = mock(ListObjectsV2Result.class); - when(result.getObjectSummaries()).thenReturn(summaries); - when(result.getNextContinuationToken()).thenReturn(nextToken); + private ListObjectsV2Response createListObjectsV2Response(final List summaries, final String nextToken) { + final ListObjectsV2Response result = mock(ListObjectsV2Response.class); + when(result.contents()).thenReturn(summaries); + when(result.nextContinuationToken()).thenReturn(nextToken); when(result.isTruncated()).thenReturn(nextToken != null); return result; } - private S3ObjectSummary createObjectSummary(final long sizeOfObject, final String objectKey) { - final S3ObjectSummary summary = mock(S3ObjectSummary.class); - when(summary.getSize()).thenReturn(sizeOfObject); - when(summary.getKey()).thenReturn(objectKey); + private S3Object createObjectSummary(final long sizeOfObject, final String objectKey) { + final S3Object summary = mock(S3Object.class); + when(summary.size()).thenReturn(sizeOfObject); + when(summary.key()).thenReturn(objectKey); return summary; } private Iterator getS3ObjectKeysIterator(final String objectKey) { - final S3ObjectSummary objectSummary = createObjectSummary(1, objectKey); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result( + final S3Object objectSummary = createObjectSummary(1, objectKey); + final ListObjectsV2Response listObjectsV2Result = createListObjectsV2Response( Collections.singletonList(objectSummary), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); @@ -231,15 +230,15 @@ private Iterator getS3ObjectKeysIterator(final String objectKey) { public void initializeWithTaskConfigs(final int maxTasks, final int taskId) { final Map configMap = getConfigMap(maxTasks, taskId); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - s3Client = mock(AmazonS3.class); + s3Client = mock(S3Client.class); awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); } - private ListObjectsV2Result getListObjectsV2Result() { - final S3ObjectSummary zeroByteObject = createObjectSummary(0, "key1"); - final S3ObjectSummary nonZeroByteObject1 = createObjectSummary(1, "key2"); - final S3ObjectSummary nonZeroByteObject2 = createObjectSummary(1, "key3"); - return createListObjectsV2Result(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); + private ListObjectsV2Response getListObjectsV2Response() { + final S3Object zeroByteObject = createObjectSummary(0, "key1"); + final S3Object nonZeroByteObject1 = createObjectSummary(1, "key2"); + final S3Object nonZeroByteObject2 = createObjectSummary(1, "key3"); + return createListObjectsV2Response(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); } } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 61d8170f7..b701ea85d 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.Collections; import java.util.stream.Stream; @@ -38,8 +39,6 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,12 +63,9 @@ void testIteratorProcessesS3Objects() throws Exception { final String key = "topic-00001-abc123.txt"; - // Mock S3Object and InputStream - try (S3Object mockS3Object = mock(S3Object.class); - S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(mockS3Object); - when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); + // Mock InputStream + try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { + when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Stream.of(new Object())); @@ -98,12 +94,9 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; - // Mock S3Object and InputStream - try (S3Object mockS3Object = mock(S3Object.class); - S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(mockS3Object); - when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); + // Mock InputStream + try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { + when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); // With ByteArrayTransformer mockTransformer = mock(ByteArrayTransformer.class);