Skip to content

Commit

Permalink
fixed issues with s3-source-connector build
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 18, 2024
1 parent e47556f commit cc5e7c3
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import io.aiven.kafka.connect.s3.source.utils.Version;
Expand Down Expand Up @@ -73,10 +73,10 @@ public class S3SourceTask extends SourceTask {
/** The offset manager this task uses */
private OffsetManager<S3OffsetManagerEntry> offsetManager;

// @SuppressWarnings("PMD.UnnecessaryConstructor")
// public S3SourceTask() {
// super();
// }
// @SuppressWarnings("PMD.UnnecessaryConstructor")
// public S3SourceTask() {
// super();
// }

@Override
public String version() {
Expand All @@ -90,8 +90,8 @@ public void start(final Map<String, String> props) {
this.transformer = s3SourceConfig.getTransformer();
offsetManager = new OffsetManager<>(context);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient));
setSourceRecordIterator(
new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient));
this.taskInitialized = true;
}

Expand All @@ -118,8 +118,12 @@ public List<SourceRecord> poll() throws InterruptedException {
exception);
pollLock.wait(ERROR_BACKOFF);

setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, //NOPMD createing object in loop.
awsv2SourceClient));
setSourceRecordIterator(
new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, // NOPMD
// createing
// object in
// loop.
awsv2SourceClient));

} else {
LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception);
Expand All @@ -138,10 +142,11 @@ public List<SourceRecord> poll() throws InterruptedException {
}

/**
* Create a list of source records.
* Package private for testing.
* @param results a list of SourceRecords to add the results to.
* @return the {@code results} parameter.
* Create a list of source records. Package private for testing.
*
* @param results
* a list of SourceRecords to add the results to.
* @return the {@code results} parameter.
*/
List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) {
if (connectorStopped.get()) {
Expand All @@ -165,23 +170,29 @@ List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) {
}

/**
* Set the S3 source record iterator that this task is using.
* protected to be overridden in testing impl.
* @param iterator The S3SourceRecord iterator to use.
* Set the S3 source record iterator that this task is using. protected to be overridden in testing impl.
*
* @param iterator
* The S3SourceRecord iterator to use.
*/
protected void setSourceRecordIterator(final Iterator<S3SourceRecord> iterator) {
sourceRecordIterator = iterator;
}

/**
* Wait until objects are available to be read
* @throws InterruptedException on error.
*
* @throws InterruptedException
* on error.
*/
private void waitForObjects() throws InterruptedException {
while (!sourceRecordIterator.hasNext() && !connectorStopped.get()) {
LOGGER.debug("Blocking until new S3 files are available.");
Thread.sleep(S_3_POLL_INTERVAL_MS);
setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, // NOPMD creating object in loop
setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, // NOPMD
// creating
// object
// in loop
awsv2SourceClient));
}
}
Expand All @@ -203,6 +214,7 @@ private void closeResources() {

/**
* Get the transformer that we are using.
*
* @return the transformer that we are using.
*/
public Transformer getTransformer() {
Expand All @@ -211,6 +223,7 @@ public Transformer getTransformer() {

/**
* Get the initialized flag.
*
* @return {@code true} if the task has been initialized, {@code false} otherwise.
*/
public boolean isTaskInitialized() {
Expand All @@ -219,6 +232,7 @@ public boolean isTaskInitialized() {

/**
* Gets the state of the connector stopped flag.
*
* @return The state of the connector stopped flag.
*/
public boolean isConnectorStopped() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
* Called AWSV2SourceClient as this source client implements the V2 version of the aws client library. Handles all calls
* and authentication to AWS and returns useable objects to the SourceRecordIterator.
*/
public class AWSV2SourceClient {
public final class AWSV2SourceClient {
/** The logger to use */
private static final Logger LOGGER = LoggerFactory.getLogger(AWSV2SourceClient.class);
/**
* How many pages of data we will attempt to read at one go.
* The page size is defined in {@link S3ConfigFragment#getFetchPageSize()}
* How many pages of data we will attempt to read at one go. The page size is defined in
* {@link S3ConfigFragment#getFetchPageSize()}
*/
public static final int PAGE_SIZE_FACTOR = 2;
/** The source configuration */
Expand Down Expand Up @@ -92,17 +92,29 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String>
// TODO the code below should be configured in some sort of taks assignement method/process/call.
int maxTasks;
try {
maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString());
} catch (NullPointerException | NumberFormatException e) { // NOPMD catch null pointer
final Object value = s3SourceConfig.originals().get("tasks.max");
if (value == null) {
LOGGER.info("Setting tasks.max to 1");
maxTasks = 1;
} else {
maxTasks = Integer.parseInt(value.toString());
}
} catch (NumberFormatException e) { // NOPMD catch null pointer
LOGGER.warn("Invalid tasks.max: {}", e.getMessage());
LOGGER.info("Setting tasks.max to 1");
maxTasks = 1;
}
this.maxTasks = maxTasks;
int taskId;
try {
taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
} catch (NullPointerException | NumberFormatException e) { // NOPMD catch null pointer
final Object value = s3SourceConfig.originals().get("task.id");
if (value == null) {
LOGGER.info("Setting task.id to 0");
taskId = 0;
} else {
taskId = Integer.parseInt(value.toString()) % maxTasks;
}
} catch (NumberFormatException e) { // NOPMD catch null pointer
LOGGER.warn("Invalid task.id: {}", e.getMessage());
LOGGER.info("Setting task.id to 0");
taskId = 0;
Expand All @@ -111,9 +123,11 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String>
}

/**
* Create an iterator of S3Objects. The iterator will automatically close the objects when the next object is retrieved or when
* the end of the iterator is reached.
* @param startToken the Key to start searching from.
* Create an iterator of S3Objects. The iterator will automatically close the objects when the next object is
* retrieved or when the end of the iterator is reached.
*
* @param startToken
* the Key to start searching from.
* @return An iterator on S3Objects.
*/
public Iterator<S3Object> getIteratorOfObjects(final String startToken) {
Expand All @@ -131,38 +145,46 @@ public Iterator<S3Object> getIteratorOfObjects(final String startToken) {

final Predicate<S3ObjectSummary> filter = filterPredicate.and(this::checkTaskAssignment)
.and(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()));
final Iterator<S3ObjectSummary> summaryIterator = IteratorUtils.filteredIterator(new S3ObjectSummaryIterator(s3Client, request), filter::test);
final Iterator<S3Object> objectIterator = IteratorUtils.transformedIterator(summaryIterator, s3ObjectSummary -> s3Client.getObject(bucketName, s3ObjectSummary.getKey()));
final Iterator<S3ObjectSummary> summaryIterator = IteratorUtils
.filteredIterator(new S3ObjectSummaryIterator(s3Client, request), filter::test);
final Iterator<S3Object> objectIterator = IteratorUtils.transformedIterator(summaryIterator,
s3ObjectSummary -> s3Client.getObject(bucketName, s3ObjectSummary.getKey()));
return ClosableIterator.wrap(objectIterator);
}

/**
* Add a failed object to the list of failed object keys.
* @param objectKey the object key that failed.
*
* @param objectKey
* the object key that failed.
*/
public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

/**
* Set the filter predicate. Overrides the default predicate.
* @param predicate the predicate to use instead of the default predicate.
* Set the filter predicate. Overrides the default predicate.
*
* @param predicate
* the predicate to use instead of the default predicate.
*/
public void setFilterPredicate(final Predicate<S3ObjectSummary> predicate) {
filterPredicate = predicate;
}

/**
* Checks the task assignment. This method should probalby be delivered by a task assignment object.
* @param summary the object summary to check
* Checks the task assignment. This method should probalby be delivered by a task assignment object.
*
* @param summary
* the object summary to check
* @return {@code true} if the summary should be processed, {@code false otherwise}.
*/
private boolean checkTaskAssignment(final S3ObjectSummary summary) {
return taskId == Math.floorMod(summary.getKey().hashCode(), maxTasks);
}

/**
* Shut down this source client. Shuts down the attached AmazonS3 client.
* Shut down this source client. Shuts down the attached AmazonS3 client.
*/
public void shutdown() {
s3Client.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ public S3ObjectSummary next() {
lastObjectSummaryKey = result.getKey();
return result;
}
}
}
Loading

0 comments on commit cc5e7c3

Please sign in to comment.