diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java index 6e1e8319..4e088655 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java @@ -83,6 +83,17 @@ public Optional getEntry(final OffsetManagerKey key, final Function entry) { + offsets.put(entry.getManagerKey().getPartitionMap(), entry.getProperties()); + } + /** * Gets any offset information stored in the offsetStorageReader and adds to the local offsets Map. This provides a * performance improvement over when checking if offsets exists individually. diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 2ece623b..8705db9b 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -113,14 +113,13 @@ public final class S3ConfigFragment extends ConfigFragment { public static final String AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG = "aws.s3.backoff.max.retries"; public static final String FETCH_PAGE_SIZE = "aws.s3.fetch.page.size"; + public static final String AWS_S3_FETCH_BUFFER_SIZE = "aws.s3.fetch.buffer.size"; private static final String GROUP_AWS = "AWS"; private static final String GROUP_AWS_STS = "AWS STS"; private static final String GROUP_S3_RETRY_BACKOFF_POLICY = "S3 retry backoff policy"; - public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024; - // Default values from AWS SDK, since they are hidden public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100; public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000; @@ -223,9 +222,13 @@ static void addAwsConfigGroup(final ConfigDef configDef) { ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG); configDef.define(FETCH_PAGE_SIZE, ConfigDef.Type.INT, 10, ConfigDef.Range.atLeast(1), - ConfigDef.Importance.MEDIUM, "AWS S3 Fetch page size", GROUP_AWS, awsGroupCounter++, // NOPMD - // UnusedAssignment + ConfigDef.Importance.MEDIUM, "AWS S3 Fetch page size", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, FETCH_PAGE_SIZE); + + configDef.define(AWS_S3_FETCH_BUFFER_SIZE, ConfigDef.Type.INT, 1, ConfigDef.Range.atLeast(1), + ConfigDef.Importance.MEDIUM, "AWS S3 Fetch buffer size", GROUP_AWS, awsGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_FETCH_BUFFER_SIZE); } static void addAwsStsConfigGroup(final ConfigDef configDef) { @@ -523,4 +526,8 @@ public int getFetchPageSize() { return cfg.getInt(FETCH_PAGE_SIZE); } + public int getS3FetchBufferSize() { + return cfg.getInt(AWS_S3_FETCH_BUFFER_SIZE); + } + } diff --git a/s3-source-connector/README.md b/s3-source-connector/README.md index a5952008..58ae3fe1 100644 --- a/s3-source-connector/README.md +++ b/s3-source-connector/README.md @@ -196,6 +196,7 @@ List of new configuration parameters: - `aws.s3.endpoint` - The endpoint configuration (service endpoint & signing region) to be used for requests. - `aws.s3.prefix` - The prefix that will be added to the file name in the bucket. Can be used for putting output files into a subdirectory. - `aws.s3.region` - Name of the region for the bucket used for storing the records. Defaults to `us-east-1`. +- `aws.s3.fetch.buffer.size` - The Size of the buffer in processing S3 Object Keys to ensure slow to upload objects are not missed by Source Connector. - `aws.sts.role.arn` - AWS role ARN, for cross-account access role instead of `aws.access.key.id` and `aws.secret.access.key` - `aws.sts.role.external.id` - AWS ExternalId for cross-account access role - `aws.sts.role.session.name` - AWS session name for cross-account access role @@ -268,8 +269,31 @@ aws.s3.prefix=file-prefix # Possible values 'none' or 'all'. Default being 'none' # Errors are logged and ignored for 'all' errors.tolerance=none + +#AWS S3 fetch Buffer +# Possible values are any value greater than 0. Default being '1' +aws.s3.fetch.buffer.size=1 + ``` +### How the AWS S3 API works +The Aws S3 ListObjectsV2 api which the S3 Source Connector uses to list all objects in a bucket, +- For general purpose buckets, ListObjectsV2 returns objects in lexicographical order based on their key names +- Directory bucket - For directory buckets, ListObjectsV2 does not return objects in lexicographical order. + - Directory buckets **are not supported** in the current release of the S3 Source Connector +The S3 Source connector uses the S3 ListObjectsV2 API with the 'startAfter' token allowing the connector to minimize the number of objects which are returned to the connector to those that it has yet to process. + +This has a number of impacts which should be considered: + +The first being that when adding Objects to S3 they should be added in lexicographical order, +for example objects should use a date format at the beginning of the key or use the offset value to identify the file when added to S3 from a sink connector. + +The second consideration to consider is that when using the source connector that if it is querying for new entries, if the connector process a file which is lexicographical larger and which uploaded to S3 quicker then another file due to +various reasons such as an API error with retries or the file being generally larger, it might be prudent to increase the `aws.s3.fetch.buffer.size` to ensure that the source connector +is always allowing for scenarios where a file is not uploaded in sequence. Any file which has already been processed will not be reprocessed as it is only added to the buffer after the file has completed processing. +A maximum 1,000 results are returned with each query and it may be of benefit to set the `aws.s3.fetch.buffer.size` to this maximum or slightly larger to ensure you do not miss any Objects. + + ### Retry strategy configuration #### Apache Kafka connect retry strategy configuration property diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 5f3e0b6d..ee37777d 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -25,6 +25,7 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_FETCH_BUFFER_SIZE; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; import static org.assertj.core.api.Assertions.assertThat; @@ -126,6 +127,7 @@ private Map getConfig(final String topics, final int maxTasks) { config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put(MAX_TASKS, String.valueOf(maxTasks)); + config.put(AWS_S3_FETCH_BUFFER_SIZE, "2"); return config; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 9753b6ac..6ee36066 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -24,10 +24,12 @@ import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE; import static io.aiven.kafka.connect.common.config.TransformerFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_FETCH_BUFFER_SIZE; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; import static java.util.Map.entry; @@ -79,6 +81,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,7 +185,7 @@ void bytesTest(final boolean addPrefix) { final Map connectorConfig = getConfig(CONNECTOR_NAME, topic, 1, DistributionType.PARTITION, addPrefix, localS3Prefix, prefixPattern, fileNamePatternSeparator); - + connectorConfig.put("aws.s3.fetch.buffer.size", "10"); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); final String testData1 = "Hello, Kafka Connect S3 Source! object 1"; @@ -218,9 +221,9 @@ void bytesTest(final boolean addPrefix) { verifyOffsetsConsumeableByS3OffsetMgr(connectorConfig, offsetKeys, expectedOffsetRecords); } - @Test - void bytesDefaultBufferTest() { - final int maxBufferSize = 4096; + @ParameterizedTest + @CsvSource({ "4096", "3000", "4101" }) + void bytesBufferTest(final int maxBufferSize) { final var topic = IntegrationBase.getTopic(testInfo); final DistributionType distributionType; final String prefixPattern = "topics/{{topic}}/partition={{partition}}/"; @@ -230,6 +233,8 @@ void bytesDefaultBufferTest() { prefixPattern, "-"); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); + connectorConfig.put(TRANSFORMER_MAX_BUFFER_SIZE, String.valueOf(maxBufferSize)); + connectorConfig.put(AWS_S3_FETCH_BUFFER_SIZE, String.valueOf(10)); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); final int byteArraySize = 6000; diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index e712b0d9..15e19236 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -105,7 +105,7 @@ public boolean hasNext() { @Override public SourceRecord next() { final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next(); - return s3SourceRecord.getSourceRecord(s3SourceConfig.getErrorsTolerance()); + return s3SourceRecord.getSourceRecord(s3SourceConfig.getErrorsTolerance(), offsetManager); } }; return IteratorUtils.filteredIterator(inner, Objects::nonNull); @@ -133,7 +133,6 @@ public void commitRecord(final SourceRecord record) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Committed individual record {} committed", (Map) record.sourceOffset()); } - offsetManager.removeEntry(record); } /** diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index dae9b6b0..92b1f54c 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -127,6 +127,10 @@ public int getS3RetryBackoffMaxRetries() { return s3ConfigFragment.getS3RetryBackoffMaxRetries(); } + public int getS3FetchBufferSize() { + return s3ConfigFragment.getS3FetchBufferSize(); + } + public S3ConfigFragment getS3ConfigFragment() { return s3ConfigFragment; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RingBuffer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RingBuffer.java new file mode 100644 index 00000000..8d912da4 --- /dev/null +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RingBuffer.java @@ -0,0 +1,72 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import java.util.Collection; + +import org.apache.commons.collections.buffer.CircularFifoBuffer; + +public class RingBuffer { + final private CircularFifoBuffer list; + + /** + * Create a Ring Buffer of a maximum Size + * + * @param size + * The size that the linked list should be. + */ + public RingBuffer(final int size) { + this.list = new CircularFifoBuffer(size); + } + + /** + * Create a Ring Buffer from an existing collection + * + * @param collection + * An existing collection of values + */ + public RingBuffer(final Collection collection) { + this.list = new CircularFifoBuffer(collection); + } + + /** + * Add a new item if it is not already present in the ring buffer to the ring buffer and removes the last entry from + * the linked list. Null values are ignored. + * + * @param item + * Item T which is to be added to the Queue + */ + public void enqueue(final Object item) { + if (item != null) { + list.add(item); + } + + } + + /** + * Get the last value in the Ring buffer + * + * @return A value T from the last place in the list, returns null if list is not full. + */ + public String getLast() { + return list.isFull() ? (String) list.get() : null; + } + + public boolean contains(final Object item) { + return list.contains(item); + } +} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 5bf799d3..dc78b16b 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.source.SourceRecord; import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; +import io.aiven.kafka.connect.common.source.OffsetManager; import io.aiven.kafka.connect.common.source.task.Context; import org.slf4j.Logger; @@ -111,10 +112,18 @@ public void setContext(final Context context) { /** * Creates a SourceRecord that can be returned to a Kafka topic * - * @return A kafka {@link org.apache.kafka.connect.source.SourceRecord SourceRecord} + * @return A kafka {@link org.apache.kafka.connect.source.SourceRecord SourceRecord} This can return null if error + * tolerance is set to 'All' */ - public SourceRecord getSourceRecord(final ErrorsTolerance tolerance) { + public SourceRecord getSourceRecord(final ErrorsTolerance tolerance, + final OffsetManager offsetManager) { try { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Source Record: {} for Topic: {} , Partition: {}, recordCount: {}", getObjectKey(), + getTopic(), getPartition(), getRecordCount()); + } + // Update offset Manager concurrent hashmap + offsetManager.addEntry(offsetManagerEntry); return new SourceRecord(offsetManagerEntry.getManagerKey().getPartitionMap(), offsetManagerEntry.getProperties(), getTopic(), getPartition(), keyData.schema(), keyData.value(), valueData.schema(), valueData.value()); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 08687973..063e0fd3 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -33,6 +33,7 @@ import io.aiven.kafka.connect.common.source.task.DistributionType; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.model.S3Object; @@ -53,6 +54,8 @@ public final class SourceRecordIterator implements Iterator { private final AWSV2SourceClient sourceClient; /** the taskId of this running task */ private int taskId; + /** the */ + private final RingBuffer s3ObjectKeyRingBuffer; /** The S3 bucket we are processing */ private final String bucket; @@ -92,13 +95,14 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, this.taskAssignment = new TaskAssignment(initializeDistributionStrategy()); this.taskId = s3SourceConfig.getTaskId(); this.fileMatching = new FileMatching(filePattern); + s3ObjectKeyRingBuffer = new RingBuffer(s3SourceConfig.getS3FetchBufferSize()); inner = getS3SourceRecordStream(sourceClient).iterator(); outer = Collections.emptyIterator(); } private Stream getS3SourceRecordStream(final AWSV2SourceClient sourceClient) { - return sourceClient.getS3ObjectStream(lastSeenObjectKey) + return sourceClient.getS3ObjectStream(s3ObjectKeyRingBuffer.getLast()) .map(fileMatching) .filter(taskAssignment) .map(Optional::get); @@ -106,6 +110,16 @@ private Stream getS3SourceRecordStream(final AWSV2SourceClient s @Override public boolean hasNext() { + if (!outer.hasNext()) { + // update the buffer to contain this new objectKey + s3ObjectKeyRingBuffer.enqueue(lastSeenObjectKey); + // Remove the last seen Object Key from the offsets as the file has been completely processed. + offsetManager + .removeEntry(new S3OffsetManagerEntry(bucket, StringUtils.defaultIfBlank(lastSeenObjectKey, "")) + .getManagerKey()); // NOPMD + // instantiation + // in loop + } if (!inner.hasNext() && !outer.hasNext()) { inner = getS3SourceRecordStream(sourceClient).iterator(); } @@ -210,7 +224,7 @@ class FileMatching implements Function> { public Optional apply(final S3Object s3Object) { final Optional> optionalContext = utils.process(s3Object.key()); - if (optionalContext.isPresent()) { + if (optionalContext.isPresent() && !s3ObjectKeyRingBuffer.contains(s3Object.key())) { final S3SourceRecord s3SourceRecord = new S3SourceRecord(s3Object); final Context context = optionalContext.get(); overrideContextTopic(context); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java index 6561c51c..123c1c42 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecordTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.source.SourceRecord; import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; +import io.aiven.kafka.connect.common.source.OffsetManager; import io.aiven.kafka.connect.common.source.task.Context; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; @@ -45,6 +46,9 @@ class S3SourceRecordTest { @Mock private S3SourceConfig s3SourceConfig; + @Mock + private OffsetManager offsetManager; + private Context context; @Mock @@ -63,7 +67,7 @@ void testCreateSourceRecord() { s3Record.setValueData(new SchemaAndValue(null, "")); s3Record.setKeyData(new SchemaAndValue(null, "")); - final SourceRecord result = s3Record.getSourceRecord(ErrorsTolerance.NONE); + final SourceRecord result = s3Record.getSourceRecord(ErrorsTolerance.NONE, offsetManager); assertThat(result.topic()).isEqualTo(topic); assertThat(result.kafkaPartition()).isEqualTo(null); @@ -81,8 +85,8 @@ void testCreateSourceRecordWithDataError() { s3Record.setContext(context); assertThatExceptionOfType(ConnectException.class).as("Errors tolerance: NONE") - .isThrownBy(() -> s3Record.getSourceRecord(ErrorsTolerance.NONE)); - final SourceRecord result = s3Record.getSourceRecord(ErrorsTolerance.ALL); + .isThrownBy(() -> s3Record.getSourceRecord(ErrorsTolerance.NONE, offsetManager)); + final SourceRecord result = s3Record.getSourceRecord(ErrorsTolerance.ALL, offsetManager); assertThat(result).isNull(); } @@ -102,7 +106,7 @@ void testModifyingInitialContextDoesNotAlterTheSourceRecordsContext() { // alter context and it should have no impact on the source record. context.setPartition(14); context.setTopic("a-diff-topic"); - SourceRecord result = s3Record.getSourceRecord(ErrorsTolerance.NONE); + SourceRecord result = s3Record.getSourceRecord(ErrorsTolerance.NONE, offsetManager); assertThat(result.topic()).isEqualTo(topic); assertThat(result.kafkaPartition()).isEqualTo(null); @@ -110,7 +114,7 @@ void testModifyingInitialContextDoesNotAlterTheSourceRecordsContext() { context = s3Record.getContext(); context.setPartition(99); context.setTopic("another-diff-topic"); - result = s3Record.getSourceRecord(ErrorsTolerance.NONE); + result = s3Record.getSourceRecord(ErrorsTolerance.NONE, offsetManager); assertThat(result.topic()).isEqualTo(topic); assertThat(result.kafkaPartition()).isEqualTo(null); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 4a6e25ec..15ae9fe4 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -89,6 +89,7 @@ private void mockSourceConfig(final S3SourceConfig s3SourceConfig, final String when(mockFileNameFrag.getFilenameTemplate()).thenReturn(Template.of(filePattern)); when(mockConfig.getTargetTopics()).thenReturn(targetTopic); when(mockConfig.getTransformerMaxBufferSize()).thenReturn(4096); + when(mockConfig.getS3FetchBufferSize()).thenReturn(1); } @Test @@ -103,6 +104,7 @@ void testIteratorProcessesS3Objects() throws Exception { mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); mockSourceConfig(mockConfig, filePattern, 0, 1, null); + when(mockConfig.getInputFormat()).thenReturn(InputFormat.BYTES); final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, sourceApiClient); @@ -165,7 +167,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { when(mockConfig.getTransformerMaxBufferSize()).thenReturn(4096); mockSourceConfig(mockConfig, filePattern, 0, 1, null); - + when(mockConfig.getInputFormat()).thenReturn(InputFormat.BYTES); // With ByteArrayTransformer final Iterator byteArrayIterator = new SourceRecordIterator(mockConfig, mockOffsetManager, transformer, sourceApiClient);