Skip to content

Commit

Permalink
Implement Ring Buffer for S3 API
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 31, 2025
1 parent 1d74693 commit 1bc8ce6
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ public Optional<E> getEntry(final OffsetManagerKey key, final Function<Map<Strin
return data == null ? Optional.empty() : Optional.of(creator.apply(data));
}

/**
* Get add an entry to the offset manager. For retrieval later
*
* @param entry
* the entry that should be added to the offset manager.
*
*/
public void addEntry(final OffsetManagerEntry<E> entry) {
offsets.put(entry.getManagerKey().getPartitionMap(), entry.getProperties());
}

/**
* Gets any offset information stored in the offsetStorageReader and adds to the local offsets Map. This provides a
* performance improvement over when checking if offsets exists individually.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,13 @@ public final class S3ConfigFragment extends ConfigFragment {
public static final String AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG = "aws.s3.backoff.max.retries";

public static final String FETCH_PAGE_SIZE = "aws.s3.fetch.page.size";
public static final String AWS_S3_FETCH_BUFFER_SIZE = "aws.s3.fetch.buffer.size";

private static final String GROUP_AWS = "AWS";
private static final String GROUP_AWS_STS = "AWS STS";

private static final String GROUP_S3_RETRY_BACKOFF_POLICY = "S3 retry backoff policy";

public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024;

// Default values from AWS SDK, since they are hidden
public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100;
public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000;
Expand Down Expand Up @@ -223,9 +222,13 @@ static void addAwsConfigGroup(final ConfigDef configDef) {
ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG);

configDef.define(FETCH_PAGE_SIZE, ConfigDef.Type.INT, 10, ConfigDef.Range.atLeast(1),
ConfigDef.Importance.MEDIUM, "AWS S3 Fetch page size", GROUP_AWS, awsGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Importance.MEDIUM, "AWS S3 Fetch page size", GROUP_AWS, awsGroupCounter++,
ConfigDef.Width.NONE, FETCH_PAGE_SIZE);

configDef.define(AWS_S3_FETCH_BUFFER_SIZE, ConfigDef.Type.INT, 1, ConfigDef.Range.atLeast(1),
ConfigDef.Importance.MEDIUM, "AWS S3 Fetch buffer size", GROUP_AWS, awsGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, AWS_S3_FETCH_BUFFER_SIZE);
}

static void addAwsStsConfigGroup(final ConfigDef configDef) {
Expand Down Expand Up @@ -523,4 +526,8 @@ public int getFetchPageSize() {
return cfg.getInt(FETCH_PAGE_SIZE);
}

public int getS3FetchBufferSize() {
return cfg.getInt(AWS_S3_FETCH_BUFFER_SIZE);
}

}
24 changes: 24 additions & 0 deletions s3-source-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ List of new configuration parameters:
- `aws.s3.endpoint` - The endpoint configuration (service endpoint & signing region) to be used for requests.
- `aws.s3.prefix` - The prefix that will be added to the file name in the bucket. Can be used for putting output files into a subdirectory.
- `aws.s3.region` - Name of the region for the bucket used for storing the records. Defaults to `us-east-1`.
- `aws.s3.fetch.buffer.size` - The Size of the buffer in processing S3 Object Keys to ensure slow to upload objects are not missed by Source Connector.
- `aws.sts.role.arn` - AWS role ARN, for cross-account access role instead of `aws.access.key.id` and `aws.secret.access.key`
- `aws.sts.role.external.id` - AWS ExternalId for cross-account access role
- `aws.sts.role.session.name` - AWS session name for cross-account access role
Expand Down Expand Up @@ -268,8 +269,31 @@ aws.s3.prefix=file-prefix
# Possible values 'none' or 'all'. Default being 'none'
# Errors are logged and ignored for 'all'
errors.tolerance=none

#AWS S3 fetch Buffer
# Possible values are any value greater than 0. Default being '1'
aws.s3.fetch.buffer.size=1

```

### How the AWS S3 API works
The Aws S3 ListObjectsV2 api which the S3 Source Connector uses to list all objects in a bucket,
- For general purpose buckets, ListObjectsV2 returns objects in lexicographical order based on their key names
- Directory bucket - For directory buckets, ListObjectsV2 does not return objects in lexicographical order.
- Directory buckets **are not supported** in the current release of the S3 Source Connector
The S3 Source connector uses the S3 ListObjectsV2 API with the 'startAfter' token allowing the connector to minimize the number of objects which are returned to the connector to those that it has yet to process.

This has a number of impacts which should be considered:

The first being that when adding Objects to S3 they should be added in lexicographical order,
for example objects should use a date format at the beginning of the key or use the offset value to identify the file when added to S3 from a sink connector.

The second consideration to consider is that when using the source connector that if it is querying for new entries, if the connector process a file which is lexicographical larger and which uploaded to S3 quicker then another file due to
various reasons such as an API error with retries or the file being generally larger, it might be prudent to increase the `aws.s3.fetch.buffer.size` to ensure that the source connector
is always allowing for scenarios where a file is not uploaded in sequence. Any file which has already been processed will not be reprocessed as it is only added to the buffer after the file has completed processing.
A maximum 1,000 results are returned with each query and it may be of benefit to set the `aws.s3.fetch.buffer.size` to this maximum or slightly larger to ensure you do not miss any Objects.


### Retry strategy configuration

#### Apache Kafka connect retry strategy configuration property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_FETCH_BUFFER_SIZE;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -126,6 +127,7 @@ private Map<String, String> getConfig(final String topics, final int maxTasks) {
config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(MAX_TASKS, String.valueOf(maxTasks));
config.put(AWS_S3_FETCH_BUFFER_SIZE, "2");
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER;
import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE;
import static io.aiven.kafka.connect.common.config.TransformerFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_FETCH_BUFFER_SIZE;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG;
import static java.util.Map.entry;
Expand Down Expand Up @@ -79,6 +81,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -182,7 +185,7 @@ void bytesTest(final boolean addPrefix) {

final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topic, 1, DistributionType.PARTITION,
addPrefix, localS3Prefix, prefixPattern, fileNamePatternSeparator);

connectorConfig.put("aws.s3.fetch.buffer.size", "10");
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());

final String testData1 = "Hello, Kafka Connect S3 Source! object 1";
Expand Down Expand Up @@ -218,9 +221,9 @@ void bytesTest(final boolean addPrefix) {
verifyOffsetsConsumeableByS3OffsetMgr(connectorConfig, offsetKeys, expectedOffsetRecords);
}

@Test
void bytesDefaultBufferTest() {
final int maxBufferSize = 4096;
@ParameterizedTest
@CsvSource({ "4096", "3000", "4101" })
void bytesBufferTest(final int maxBufferSize) {
final var topic = IntegrationBase.getTopic(testInfo);
final DistributionType distributionType;
final String prefixPattern = "topics/{{topic}}/partition={{partition}}/";
Expand All @@ -230,6 +233,8 @@ void bytesDefaultBufferTest() {
prefixPattern, "-");

connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());
connectorConfig.put(TRANSFORMER_MAX_BUFFER_SIZE, String.valueOf(maxBufferSize));
connectorConfig.put(AWS_S3_FETCH_BUFFER_SIZE, String.valueOf(10));
connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);

final int byteArraySize = 6000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public boolean hasNext() {
@Override
public SourceRecord next() {
final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next();
return s3SourceRecord.getSourceRecord(s3SourceConfig.getErrorsTolerance());
return s3SourceRecord.getSourceRecord(s3SourceConfig.getErrorsTolerance(), offsetManager);
}
};
return IteratorUtils.filteredIterator(inner, Objects::nonNull);
Expand Down Expand Up @@ -133,7 +133,6 @@ public void commitRecord(final SourceRecord record) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Committed individual record {} committed", (Map<String, Object>) record.sourceOffset());
}
offsetManager.removeEntry(record);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public int getS3RetryBackoffMaxRetries() {
return s3ConfigFragment.getS3RetryBackoffMaxRetries();
}

public int getS3FetchBufferSize() {
return s3ConfigFragment.getS3FetchBufferSize();
}

public S3ConfigFragment getS3ConfigFragment() {
return s3ConfigFragment;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.s3.source.utils;

import java.util.Collection;

import org.apache.commons.collections.buffer.CircularFifoBuffer;

public class RingBuffer {
final private CircularFifoBuffer list;

/**
* Create a Ring Buffer of a maximum Size
*
* @param size
* The size that the linked list should be.
*/
public RingBuffer(final int size) {
this.list = new CircularFifoBuffer(size);
}

/**
* Create a Ring Buffer from an existing collection
*
* @param collection
* An existing collection of values
*/
public RingBuffer(final Collection<Object> collection) {
this.list = new CircularFifoBuffer(collection);
}

/**
* Add a new item if it is not already present in the ring buffer to the ring buffer and removes the last entry from
* the linked list. Null values are ignored.
*
* @param item
* Item T which is to be added to the Queue
*/
public void enqueue(final Object item) {
if (item != null) {
list.add(item);
}

}

/**
* Get the last value in the Ring buffer
*
* @return A value T from the last place in the list, returns null if list is not full.
*/
public String getLast() {
return list.isFull() ? (String) list.get() : null;
}

public boolean contains(final Object item) {
return list.contains(item);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.connect.source.SourceRecord;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.OffsetManager;
import io.aiven.kafka.connect.common.source.task.Context;

import org.slf4j.Logger;
Expand Down Expand Up @@ -111,10 +112,18 @@ public void setContext(final Context<String> context) {
/**
* Creates a SourceRecord that can be returned to a Kafka topic
*
* @return A kafka {@link org.apache.kafka.connect.source.SourceRecord SourceRecord}
* @return A kafka {@link org.apache.kafka.connect.source.SourceRecord SourceRecord} This can return null if error
* tolerance is set to 'All'
*/
public SourceRecord getSourceRecord(final ErrorsTolerance tolerance) {
public SourceRecord getSourceRecord(final ErrorsTolerance tolerance,
final OffsetManager<S3OffsetManagerEntry> offsetManager) {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Source Record: {} for Topic: {} , Partition: {}, recordCount: {}", getObjectKey(),
getTopic(), getPartition(), getRecordCount());
}
// Update offset Manager concurrent hashmap
offsetManager.addEntry(offsetManagerEntry);
return new SourceRecord(offsetManagerEntry.getManagerKey().getPartitionMap(),
offsetManagerEntry.getProperties(), getTopic(), getPartition(), keyData.schema(), keyData.value(),
valueData.schema(), valueData.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.aiven.kafka.connect.common.source.task.DistributionType;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.S3Object;
Expand All @@ -53,6 +54,8 @@ public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private final AWSV2SourceClient sourceClient;
/** the taskId of this running task */
private int taskId;
/** the */
private final RingBuffer s3ObjectKeyRingBuffer;

/** The S3 bucket we are processing */
private final String bucket;
Expand Down Expand Up @@ -92,20 +95,31 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig,
this.taskAssignment = new TaskAssignment(initializeDistributionStrategy());
this.taskId = s3SourceConfig.getTaskId();
this.fileMatching = new FileMatching(filePattern);
s3ObjectKeyRingBuffer = new RingBuffer(s3SourceConfig.getS3FetchBufferSize());

inner = getS3SourceRecordStream(sourceClient).iterator();
outer = Collections.emptyIterator();
}

private Stream<S3SourceRecord> getS3SourceRecordStream(final AWSV2SourceClient sourceClient) {
return sourceClient.getS3ObjectStream(lastSeenObjectKey)
return sourceClient.getS3ObjectStream(s3ObjectKeyRingBuffer.getLast())
.map(fileMatching)
.filter(taskAssignment)
.map(Optional::get);
}

@Override
public boolean hasNext() {
if (!outer.hasNext()) {
// update the buffer to contain this new objectKey
s3ObjectKeyRingBuffer.enqueue(lastSeenObjectKey);
// Remove the last seen Object Key from the offsets as the file has been completely processed.
offsetManager
.removeEntry(new S3OffsetManagerEntry(bucket, StringUtils.defaultIfBlank(lastSeenObjectKey, ""))
.getManagerKey()); // NOPMD
// instantiation
// in loop
}
if (!inner.hasNext() && !outer.hasNext()) {
inner = getS3SourceRecordStream(sourceClient).iterator();
}
Expand Down Expand Up @@ -210,7 +224,7 @@ class FileMatching implements Function<S3Object, Optional<S3SourceRecord>> {
public Optional<S3SourceRecord> apply(final S3Object s3Object) {

final Optional<Context<String>> optionalContext = utils.process(s3Object.key());
if (optionalContext.isPresent()) {
if (optionalContext.isPresent() && !s3ObjectKeyRingBuffer.contains(s3Object.key())) {
final S3SourceRecord s3SourceRecord = new S3SourceRecord(s3Object);
final Context<String> context = optionalContext.get();
overrideContextTopic(context);
Expand Down
Loading

0 comments on commit 1bc8ce6

Please sign in to comment.