diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 84a4e0d5..5c5c6d20 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -26,15 +26,15 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import io.aiven.kafka.connect.common.OffsetManager; -import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import io.aiven.kafka.connect.common.OffsetManager; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; +import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import io.aiven.kafka.connect.s3.source.utils.Version; @@ -73,10 +73,10 @@ public class S3SourceTask extends SourceTask { /** The offset manager this task uses */ private OffsetManager offsetManager; -// @SuppressWarnings("PMD.UnnecessaryConstructor") -// public S3SourceTask() { -// super(); -// } + // @SuppressWarnings("PMD.UnnecessaryConstructor") + // public S3SourceTask() { + // super(); + // } @Override public String version() { @@ -90,8 +90,8 @@ public void start(final Map props) { this.transformer = s3SourceConfig.getTransformer(); offsetManager = new OffsetManager<>(context); awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); - setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, - awsv2SourceClient)); + setSourceRecordIterator( + new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient)); this.taskInitialized = true; } @@ -118,8 +118,12 @@ public List poll() throws InterruptedException { exception); pollLock.wait(ERROR_BACKOFF); - setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, //NOPMD createing object in loop. - awsv2SourceClient)); + setSourceRecordIterator( + new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, // NOPMD + // createing + // object in + // loop. + awsv2SourceClient)); } else { LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception); @@ -138,10 +142,11 @@ public List poll() throws InterruptedException { } /** - * Create a list of source records. - * Package private for testing. - * @param results a list of SourceRecords to add the results to. - * @return the {@code results} parameter. + * Create a list of source records. Package private for testing. + * + * @param results + * a list of SourceRecords to add the results to. + * @return the {@code results} parameter. */ List extractSourceRecords(final List results) { if (connectorStopped.get()) { @@ -165,9 +170,10 @@ List extractSourceRecords(final List results) { } /** - * Set the S3 source record iterator that this task is using. - * protected to be overridden in testing impl. - * @param iterator The S3SourceRecord iterator to use. + * Set the S3 source record iterator that this task is using. protected to be overridden in testing impl. + * + * @param iterator + * The S3SourceRecord iterator to use. */ protected void setSourceRecordIterator(final Iterator iterator) { sourceRecordIterator = iterator; @@ -175,13 +181,18 @@ protected void setSourceRecordIterator(final Iterator iterator) /** * Wait until objects are available to be read - * @throws InterruptedException on error. + * + * @throws InterruptedException + * on error. */ private void waitForObjects() throws InterruptedException { while (!sourceRecordIterator.hasNext() && !connectorStopped.get()) { LOGGER.debug("Blocking until new S3 files are available."); Thread.sleep(S_3_POLL_INTERVAL_MS); - setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, // NOPMD creating object in loop + setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, // NOPMD + // creating + // object + // in loop awsv2SourceClient)); } } @@ -203,6 +214,7 @@ private void closeResources() { /** * Get the transformer that we are using. + * * @return the transformer that we are using. */ public Transformer getTransformer() { @@ -211,6 +223,7 @@ public Transformer getTransformer() { /** * Get the initialized flag. + * * @return {@code true} if the task has been initialized, {@code false} otherwise. */ public boolean isTaskInitialized() { @@ -219,6 +232,7 @@ public boolean isTaskInitialized() { /** * Gets the state of the connector stopped flag. + * * @return The state of the connector stopped flag. */ public boolean isConnectorStopped() { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index d8f43b67..c6b00540 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -39,12 +39,12 @@ * Called AWSV2SourceClient as this source client implements the V2 version of the aws client library. Handles all calls * and authentication to AWS and returns useable objects to the SourceRecordIterator. */ -public class AWSV2SourceClient { +public final class AWSV2SourceClient { /** The logger to use */ private static final Logger LOGGER = LoggerFactory.getLogger(AWSV2SourceClient.class); /** - * How many pages of data we will attempt to read at one go. - * The page size is defined in {@link S3ConfigFragment#getFetchPageSize()} + * How many pages of data we will attempt to read at one go. The page size is defined in + * {@link S3ConfigFragment#getFetchPageSize()} */ public static final int PAGE_SIZE_FACTOR = 2; /** The source configuration */ @@ -92,8 +92,14 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set // TODO the code below should be configured in some sort of taks assignement method/process/call. int maxTasks; try { - maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); - } catch (NullPointerException | NumberFormatException e) { // NOPMD catch null pointer + final Object value = s3SourceConfig.originals().get("tasks.max"); + if (value == null) { + LOGGER.info("Setting tasks.max to 1"); + maxTasks = 1; + } else { + maxTasks = Integer.parseInt(value.toString()); + } + } catch (NumberFormatException e) { // NOPMD catch null pointer LOGGER.warn("Invalid tasks.max: {}", e.getMessage()); LOGGER.info("Setting tasks.max to 1"); maxTasks = 1; @@ -101,8 +107,14 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set this.maxTasks = maxTasks; int taskId; try { - taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; - } catch (NullPointerException | NumberFormatException e) { // NOPMD catch null pointer + final Object value = s3SourceConfig.originals().get("task.id"); + if (value == null) { + LOGGER.info("Setting task.id to 0"); + taskId = 0; + } else { + taskId = Integer.parseInt(value.toString()) % maxTasks; + } + } catch (NumberFormatException e) { // NOPMD catch null pointer LOGGER.warn("Invalid task.id: {}", e.getMessage()); LOGGER.info("Setting task.id to 0"); taskId = 0; @@ -111,9 +123,11 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set } /** - * Create an iterator of S3Objects. The iterator will automatically close the objects when the next object is retrieved or when - * the end of the iterator is reached. - * @param startToken the Key to start searching from. + * Create an iterator of S3Objects. The iterator will automatically close the objects when the next object is + * retrieved or when the end of the iterator is reached. + * + * @param startToken + * the Key to start searching from. * @return An iterator on S3Objects. */ public Iterator getIteratorOfObjects(final String startToken) { @@ -131,30 +145,38 @@ public Iterator getIteratorOfObjects(final String startToken) { final Predicate filter = filterPredicate.and(this::checkTaskAssignment) .and(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey())); - final Iterator summaryIterator = IteratorUtils.filteredIterator(new S3ObjectSummaryIterator(s3Client, request), filter::test); - final Iterator objectIterator = IteratorUtils.transformedIterator(summaryIterator, s3ObjectSummary -> s3Client.getObject(bucketName, s3ObjectSummary.getKey())); + final Iterator summaryIterator = IteratorUtils + .filteredIterator(new S3ObjectSummaryIterator(s3Client, request), filter::test); + final Iterator objectIterator = IteratorUtils.transformedIterator(summaryIterator, + s3ObjectSummary -> s3Client.getObject(bucketName, s3ObjectSummary.getKey())); return ClosableIterator.wrap(objectIterator); } /** * Add a failed object to the list of failed object keys. - * @param objectKey the object key that failed. + * + * @param objectKey + * the object key that failed. */ public void addFailedObjectKeys(final String objectKey) { this.failedObjectKeys.add(objectKey); } /** - * Set the filter predicate. Overrides the default predicate. - * @param predicate the predicate to use instead of the default predicate. + * Set the filter predicate. Overrides the default predicate. + * + * @param predicate + * the predicate to use instead of the default predicate. */ public void setFilterPredicate(final Predicate predicate) { filterPredicate = predicate; } /** - * Checks the task assignment. This method should probalby be delivered by a task assignment object. - * @param summary the object summary to check + * Checks the task assignment. This method should probalby be delivered by a task assignment object. + * + * @param summary + * the object summary to check * @return {@code true} if the summary should be processed, {@code false otherwise}. */ private boolean checkTaskAssignment(final S3ObjectSummary summary) { @@ -162,7 +184,7 @@ private boolean checkTaskAssignment(final S3ObjectSummary summary) { } /** - * Shut down this source client. Shuts down the attached AmazonS3 client. + * Shut down this source client. Shuts down the attached AmazonS3 client. */ public void shutdown() { s3Client.shutdown(); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java index 02b99091..cbc7b627 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java @@ -91,4 +91,4 @@ public S3ObjectSummary next() { lastObjectSummaryKey = result.getKey(); return result; } -} \ No newline at end of file +} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java index a4efa67b..b6248934 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java @@ -16,12 +16,14 @@ package io.aiven.kafka.connect.s3.source.utils; -import io.aiven.kafka.connect.common.OffsetManager; - import java.util.HashMap; import java.util.List; import java.util.Map; +import io.aiven.kafka.connect.common.OffsetManager; + +import com.google.common.base.Objects; + public final class S3OffsetManagerEntry implements OffsetManager.OffsetManagerEntry { // package private statics for testing. @@ -38,18 +40,23 @@ public final class S3OffsetManagerEntry implements OffsetManager.OffsetManagerEn static final List RESTRICTED_KEYS = List.of(BUCKET, OBJECT_KEY, TOPIC, PARTITION, RECORD_COUNT); /** The data map that stores all the values */ private final Map data; - /** THe record count for the data map. Extracted here because it is used/updated frequently duirng processing */ + /** THe record count for the data map. Extracted here because it is used/updated frequently duirng processing */ private long recordCount; /** * Construct the S3OffsetManagerEntry. - * @param bucket the bucket we are using. - * @param s3ObjectKey the S3Object key. - * @param topic The topic we are using. - * @param partition the partition we are using. + * + * @param bucket + * the bucket we are using. + * @param s3ObjectKey + * the S3Object key. + * @param topic + * The topic we are using. + * @param partition + * the partition we are using. */ public S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final String topic, - final Integer partition) { + final Integer partition) { data = new HashMap<>(); data.put(BUCKET, bucket); data.put(OBJECT_KEY, s3ObjectKey); @@ -58,26 +65,29 @@ public S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final } /** - * Constructs an OffsetManagerEntry from an existing map. - * used by {@link #fromProperties(Map)}. - * Package private for testing - * @param properties the property map. + * Constructs an OffsetManagerEntry from an existing map. used by {@link #fromProperties(Map)}. Package private for + * testing + * + * @param properties + * the property map. */ - private S3OffsetManagerEntry(final Map properties) { + private S3OffsetManagerEntry(final Map properties) { data = new HashMap<>(properties); for (final String field : RESTRICTED_KEYS) { if (data.get(field) == null) { - throw new IllegalArgumentException("Missing '"+field+"' property"); + throw new IllegalArgumentException("Missing '" + field + "' property"); } } } /** - * Creates an S3OffsetManagerEntry. - * Will return {@code null} if properties is {@code null}. - * @param properties the properties to wrap. May be {@code null}. + * Creates an S3OffsetManagerEntry. Will return {@code null} if properties is {@code null}. + * + * @param properties + * the properties to wrap. May be {@code null}. * @return an S3OffsetManagerEntry. - * @throws IllegalArgumentException if one of the {@link #RESTRICTED_KEYS} is missing. + * @throws IllegalArgumentException + * if one of the {@link #RESTRICTED_KEYS} is missing. */ @Override public S3OffsetManagerEntry fromProperties(final Map properties) { @@ -100,8 +110,6 @@ public Object getProperty(final String key) { return data.get(key); } - - @Override public void setProperty(final String property, final Object value) { if (RESTRICTED_KEYS.contains(property)) { @@ -117,7 +125,8 @@ public void incrementRecordCount() { } /** - * Gets the umber of records extracted from data returned from S3. + * Gets the umber of records extracted from data returned from S3. + * * @return the umber of records extracted from data returned from S3. */ public long getRecordCount() { @@ -125,7 +134,8 @@ public long getRecordCount() { } /** - * Gets the S3Object key for the current object. + * Gets the S3Object key for the current object. + * * @return the S3ObjectKey. */ public String getKey() { @@ -144,6 +154,7 @@ public String getTopic() { /** * Gets the S3 bucket for the current object. + * * @return the S3 Bucket for the current object. */ public String getBucket() { @@ -163,16 +174,31 @@ public Map getProperties() { /** * Returns the OffsetManagerKey for this Entry. + * * @return the OffsetManagerKey for this Entry. */ @Override public OffsetManager.OffsetManagerKey getManagerKey() { - return () -> Map.of(BUCKET, data.get(BUCKET), TOPIC, data.get(TOPIC), PARTITION, data.get(PARTITION), OBJECT_KEY, data.get(OBJECT_KEY)); + return () -> Map.of(BUCKET, data.get(BUCKET), TOPIC, data.get(TOPIC), PARTITION, data.get(PARTITION), + OBJECT_KEY, data.get(OBJECT_KEY)); + } + + @Override + public boolean equals(final Object other) { + if (other instanceof S3OffsetManagerEntry) { + return compareTo((S3OffsetManagerEntry) other) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(getBucket(), getTopic(), getPartition()); } @Override public int compareTo(final S3OffsetManagerEntry other) { - if (this == other) { // NOPMD comparing instance + if (this == other) { // NOPMD comparing instance return 0; } int result = ((String) getProperty(BUCKET)).compareTo((String) other.getProperty(BUCKET)); @@ -180,7 +206,7 @@ public int compareTo(final S3OffsetManagerEntry other) { result = getTopic().compareTo(other.getTopic()); if (result == 0) { result = getPartition().compareTo(other.getPartition()); - if (result == 0) { // NOPMD deeply nested if. + if (result == 0) { // NOPMD deeply nested if. result = getKey().compareTo(other.getKey()); if (result == 0) { result = Long.compare(getRecordCount(), other.getRecordCount()); @@ -191,4 +217,3 @@ public int compareTo(final S3OffsetManagerEntry other) { return result; } } - diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 71dd8591..0856145e 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -29,10 +29,8 @@ public final class S3SourceRecord { private final Optional recordKey; private final SchemaAndValue recordValue; - - - public S3SourceRecord(final S3OffsetManagerEntry offsetManagerEntry, final Optional keyData, - final SchemaAndValue valueData) { + public S3SourceRecord(final S3OffsetManagerEntry offsetManagerEntry, final Optional keyData, + final SchemaAndValue valueData) { this.offsetManagerEntry = offsetManagerEntry.fromProperties(offsetManagerEntry.getProperties()); this.recordKey = keyData; this.recordValue = valueData; @@ -57,7 +55,6 @@ public String getObjectKey() { public SourceRecord getSourceRecord() { return new SourceRecord(offsetManagerEntry.getManagerKey().getPartitionMap(), offsetManagerEntry.getProperties(), offsetManagerEntry.getTopic(), offsetManagerEntry.getPartition(), - recordKey.map(SchemaAndValue::schema).orElse(null), key(), - recordValue.schema(), recordValue.value()); + recordKey.map(SchemaAndValue::schema).orElse(null), key(), recordValue.schema(), recordValue.value()); } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index fe5782f7..579e302f 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -24,7 +24,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.commons.io.function.IOSupplier; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; @@ -35,25 +34,27 @@ import com.amazonaws.services.s3.model.S3Object; import org.apache.commons.collections4.IteratorUtils; +import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Iterator that reads from an S3Object iterator and processes each S3Object into one or more records to be returned - * as S3SourceRecords. + * Iterator that reads from an S3Object iterator and processes each S3Object into one or more records to be returned as + * S3SourceRecords. */ public final class SourceRecordIterator implements Iterator { /** The logger to write to */ private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class); - /** The file name pattern topic key. TODO move this to a file matching class */ + /** The file name pattern topic key. TODO move this to a file matching class */ public static final String PATTERN_TOPIC_KEY = "topicName"; - /** The file name pattern partition key. TODO move this to a file matching class */ + /** The file name pattern partition key. TODO move this to a file matching class */ public static final String PATTERN_PARTITION_KEY = "partitionId"; - /** The file name matching pattern. TODO move this to a file matching class */ + /** The file name matching pattern. TODO move this to a file matching class */ public static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile("(?[^/]+?)-" - + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // e.g. topic-00001.txt - /** Maximum number of records that the Byte transformer returns. This should be handled by the Transformer itself */ + + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // e.g. + // topic-00001.txt + /** Maximum number of records that the Byte transformer returns. This should be handled by the Transformer itself */ public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L; /** The OffsetManager that we are using */ private final OffsetManager offsetManager; @@ -72,24 +73,32 @@ public final class SourceRecordIterator implements Iterator { /** * Constructor. - * @param s3SourceConfig The configuration. - * @param offsetManager the offset manager. - * @param transformer the transformer to sue. - * @param sourceClient the source client to read from. + * + * @param s3SourceConfig + * The configuration. + * @param offsetManager + * the offset manager. + * @param transformer + * the transformer to sue. + * @param sourceClient + * the source client to read from. */ - public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, - final Transformer transformer, final AWSV2SourceClient sourceClient) { + public SourceRecordIterator(final S3SourceConfig s3SourceConfig, + final OffsetManager offsetManager, final Transformer transformer, + final AWSV2SourceClient sourceClient) { this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; this.transformer = transformer; this.sourceClient = sourceClient; - this.s3ObjectIterator = IteratorUtils.filteredIterator(sourceClient.getIteratorOfObjects(null), s3Object -> extractOffsetManagerEntry(s3Object)); + this.s3ObjectIterator = IteratorUtils.filteredIterator(sourceClient.getIteratorOfObjects(null), + s3Object -> extractOffsetManagerEntry(s3Object)); this.outerIterator = Collections.emptyIterator(); } /** - * Construct the OffsetManagerEntry from the S3Object file name. This probalby should occur earlier in the chain. - * If this method returns false the object will be skipped. + * Construct the OffsetManagerEntry from the S3Object file name. This probalby should occur earlier in the chain. If + * this method returns false the object will be skipped. + * * @param s3Object * @return true if the offset can be extracted, false otherwise. */ @@ -97,8 +106,9 @@ private boolean extractOffsetManagerEntry(final S3Object s3Object) { final Matcher fileMatcher = FILE_DEFAULT_PATTERN.matcher(s3Object.getKey()); if (fileMatcher.find()) { // TODO move this from the SourceRecordIterator so that we can decouple it from S3 and make it API agnostic - final S3OffsetManagerEntry keyEntry = new S3OffsetManagerEntry(s3SourceConfig.getAwsS3BucketName(), s3Object.getKey(), fileMatcher.group(PATTERN_TOPIC_KEY), - Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY))); + final S3OffsetManagerEntry keyEntry = new S3OffsetManagerEntry(s3SourceConfig.getAwsS3BucketName(), + s3Object.getKey(), fileMatcher.group(PATTERN_TOPIC_KEY), + Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY))); offsetManagerEntry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties); return !checkBytesTransformation(transformer, offsetManagerEntry.getRecordCount()); } @@ -108,29 +118,33 @@ private boolean extractOffsetManagerEntry(final S3Object s3Object) { } /** - * Checks if the transformer should be skipped becasue the number of records exceeds what it returns. - * This check should probalby be converted into a S3OffsetManager entry call for "isComplete" and converted into - * a predicate check. + * Checks if the transformer should be skipped becasue the number of records exceeds what it returns. This check + * should probalby be converted into a S3OffsetManager entry call for "isComplete" and converted into a predicate + * check. * - * @param transformer The transformer we are using. - * @param numberOfRecsAlreadyProcessed the number of records processed. + * @param transformer + * The transformer we are using. + * @param numberOfRecsAlreadyProcessed + * the number of records processed. * @return true if we should skip the object. */ - private boolean checkBytesTransformation(final Transformer transformer, - final long numberOfRecsAlreadyProcessed) { + private boolean checkBytesTransformation(final Transformer transformer, final long numberOfRecsAlreadyProcessed) { return transformer instanceof ByteArrayTransformer && numberOfRecsAlreadyProcessed == BYTES_TRANSFORMATION_NUM_OF_RECS; } /** - * Get the S3SourceRecord iterator that reads from a single object. - * This method applies the transformer to the object and returns an iterator based on the stream - * returned from {@link Transformer#getRecords(IOSupplier, OffsetManager.OffsetManagerEntry, AbstractConfig)}. - * @param s3Object the object to get S3Source records from. + * Get the S3SourceRecord iterator that reads from a single object. This method applies the transformer to the + * object and returns an iterator based on the stream returned from + * {@link Transformer#getRecords(IOSupplier, OffsetManager.OffsetManagerEntry, AbstractConfig)}. + * + * @param s3Object + * the object to get S3Source records from. * @return An iterator over the S3SourceRecords from the Object. */ private Iterator getS3SourceRecordIterator(final S3Object s3Object) { - final Optional key = Optional.of(new SchemaAndValue(transformer.getKeySchema(), s3Object.getKey().getBytes(StandardCharsets.UTF_8))); + final Optional key = Optional + .of(new SchemaAndValue(transformer.getKeySchema(), s3Object.getKey().getBytes(StandardCharsets.UTF_8))); return transformer.getRecords(s3Object::getObjectContent, offsetManagerEntry, s3SourceConfig) .map(value -> new S3SourceRecord(offsetManagerEntry, key, value)) .iterator(); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index dff5ffaf..9b03e7be 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -33,8 +33,6 @@ import java.util.Optional; import java.util.Random; -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; @@ -44,7 +42,9 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; +import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; +import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; @@ -60,7 +60,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - final class S3SourceTaskTest { private static final Random RANDOM = new Random(); @@ -230,15 +229,14 @@ private Optional keyOf(final Object value) { return value == null ? Optional.empty() : Optional.of(new SchemaAndValue(null, value)); } - @Test - void testExtractSourceRecordsWithRecords() { + void testExtractSourceRecordsWithRecords() { final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); when(s3SourceConfig.getMaxPollRecords()).thenReturn(5); final List lst = new ArrayList<>(); S3OffsetManagerEntry offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); lst.add(new S3SourceRecord(offsetManagerEntry, keyOf("Hello"), valueOf("Hello World"))); - offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY+"a", TOPIC, PARTITION); + offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY + "a", TOPIC, PARTITION); lst.add(new S3SourceRecord(offsetManagerEntry, keyOf("Goodbye"), valueOf("Goodbye cruel World"))); final Iterator sourceRecordIterator = lst.iterator(); @@ -259,7 +257,7 @@ void testExtractSourceRecordsWhenConnectorStopped() { final List lst = new ArrayList<>(); S3OffsetManagerEntry offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); lst.add(new S3SourceRecord(offsetManagerEntry, keyOf("Hello"), valueOf("Hello World"))); - offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY+"a", TOPIC, PARTITION); + offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY + "a", TOPIC, PARTITION); lst.add(new S3SourceRecord(offsetManagerEntry, keyOf("Goodbye"), valueOf("Goodbye cruel World"))); final Iterator sourceRecordIterator = lst.iterator(); @@ -268,7 +266,6 @@ void testExtractSourceRecordsWhenConnectorStopped() { startSourceTask(s3SourceTask); s3SourceTask.stop(); - final List results = s3SourceTask.extractSourceRecords(new ArrayList<>()); assertThat(results).isEmpty(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3ObjectsUtils.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3ObjectsUtils.java index 55bf5625..a1b666b6 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3ObjectsUtils.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3ObjectsUtils.java @@ -30,8 +30,8 @@ * Standard utilities to create objects from S3 for testing. */ public final class S3ObjectsUtils { - /** A moarker that makes the ObjectSummaryIterator return false for {@code hasNext()}.*/ - public static final ListObjectsV2Result LAST_RESULT = createListObjectsV2Result(Collections.EMPTY_LIST, null); + /** A moarker that makes the ObjectSummaryIterator return false for {@code hasNext()}. */ + public static final ListObjectsV2Result LAST_RESULT = createListObjectsV2Result(Collections.emptyList(), null); private S3ObjectsUtils() { // do not instantiate. @@ -47,7 +47,7 @@ private S3ObjectsUtils() { * @return the ListObjectV2Result from a list of summaries and an next token. */ public static ListObjectsV2Result createListObjectsV2Result(final List summaries, - final String nextToken) { + final String nextToken) { final ListObjectsV2Result result = new ListObjectsV2Result() { @Override public List getObjectSummaries() { @@ -84,7 +84,7 @@ public static S3ObjectSummary createObjectSummary(final String bucket, final Str * @return an S3ObjectSummary with the specified size and object key. */ public static S3ObjectSummary createObjectSummary(final long sizeOfObject, final String bucket, - final String objectKey) { + final String objectKey) { final S3ObjectSummary summary = new S3ObjectSummary(); summary.setSize(sizeOfObject); summary.setKey(objectKey); @@ -146,4 +146,4 @@ public static void populateS3Client(final AmazonS3 s3Client, final S3ObjectSumma when(s3Client.listObjectsV2(summary.getBucketName())).thenReturn(result).thenReturn(LAST_RESULT); populateS3Client(s3Client, result); } -} \ No newline at end of file +} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index e3987c61..2eb07f9c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -19,8 +19,8 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils.LAST_RESULT; -import static io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils.createObjectSummary; import static io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils.createListObjectsV2Result; +import static io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils.createObjectSummary; import static io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils.populateS3Client; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -37,12 +37,12 @@ import java.util.List; import java.util.Map; -import com.amazonaws.services.s3.model.S3Object; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -87,11 +87,12 @@ public void initializeSourceClient() { } @ParameterizedTest - @CsvSource({"3, 1", "1, 0"}) + @CsvSource({ "3, 1", "1, 0" }) void testFetchObjectsWithNoObjects(final int maxTasks, final int taskId) { initializeSourceClient(maxTasks, taskId); final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result(Collections.emptyList(), null); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result).thenReturn(LAST_RESULT); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(LAST_RESULT); final Iterator objects = awsv2SourceClient.getIteratorOfObjects(null); assertThat(objects).isExhausted(); @@ -101,20 +102,20 @@ void testFetchObjectsWithNoObjects(final int maxTasks, final int taskId) { void testFetchOneObjectWithBasicConfig() { final String objectKey = "any-key"; initializeSourceClient(1, 0); - final ListObjectsV2Result result = createListObjectsV2Result(List.of(createObjectSummary(BUCKET_NAME, objectKey)), null); + final ListObjectsV2Result result = createListObjectsV2Result( + List.of(createObjectSummary(BUCKET_NAME, objectKey)), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(result).thenReturn(LAST_RESULT); populateS3Client(s3Client, result); - final Iterator objects = awsv2SourceClient.getIteratorOfObjects(null); assertThat(objects).hasNext(); - final S3Object object = objects.next(); // NOPMD object clsoed in exhausted check. + final S3Object object = objects.next(); // NOPMD object clsoed in exhausted check. assertThat(object.getKey()).isEqualTo(objectKey); assertThat(objects).isExhausted(); } @ParameterizedTest - @CsvSource({"key1", "key2", "key3", "key4"}) + @CsvSource({ "key1", "key2", "key3", "key4" }) void testFetchObjectsWithWithTaskIdAssigned(final String objectKey) { initializeSourceClient(4, keyTaskMap.get(objectKey)); final List lst = new ArrayList<>(); @@ -130,17 +131,20 @@ void testFetchObjectsWithWithTaskIdAssigned(final String objectKey) { final Iterator objects = awsv2SourceClient.getIteratorOfObjects(null); assertThat(objects).hasNext(); - final S3Object object = objects.next(); // NOPMD object clsoed in exhausted check. + final S3Object object = objects.next(); // NOPMD object clsoed in exhausted check. assertThat(object.getKey()).isEqualTo(objectKey); assertThat(objects).isExhausted(); } @ParameterizedTest - @CsvSource({"key1", "key2", "key3", "key4"}) + @CsvSource({ "key1", "key2", "key3", "key4" }) void testFetchObjectWithTaskIdUnassigned(final String objectKey) { initializeSourceClient(4, keyTaskMap.get(objectKey)); final List lst = new ArrayList<>(); - keyTaskMap.keySet().stream().filter(k -> !objectKey.equals(k)).forEach(key -> lst.add(createObjectSummary(BUCKET_NAME, key))); + keyTaskMap.keySet() + .stream() + .filter(k -> !objectKey.equals(k)) + .forEach(key -> lst.add(createObjectSummary(BUCKET_NAME, key))); final ListObjectsV2Result result = createListObjectsV2Result(lst, null); populateS3Client(s3Client, result); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(result).thenReturn(LAST_RESULT); @@ -150,14 +154,14 @@ void testFetchObjectWithTaskIdUnassigned(final String objectKey) { } @ParameterizedTest - @CsvSource({"4, 3", "4, 0"}) + @CsvSource({ "4, 3", "4, 0" }) void testFetchObjectsFiltersOutZeroByteObject(final int maxTasks, final int taskId) { initializeSourceClient(maxTasks, taskId); final List lst = new ArrayList<>(); lst.add(createObjectSummary(0, BUCKET_NAME, "key1")); lst.add(createObjectSummary(BUCKET_NAME, "key2")); lst.add(createObjectSummary(BUCKET_NAME, "key3")); - final ListObjectsV2Result result = getListObjectsV2Result(); + final ListObjectsV2Result result = createListObjectsV2Result(lst, null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(result).thenReturn(LAST_RESULT); populateS3Client(s3Client, result); @@ -183,7 +187,9 @@ void testFetchObjectWithPrefix() { final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult).thenReturn(LAST_RESULT); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult) + .thenReturn(secondResult) + .thenReturn(LAST_RESULT); final Iterator objects = awsv2SourceClient.getIteratorOfObjects(null); while (objects.hasNext()) { @@ -210,7 +216,9 @@ void testFetchObjectWithInitialStartAfter() { final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult).thenReturn(LAST_RESULT); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult) + .thenReturn(secondResult) + .thenReturn(LAST_RESULT); populateS3Client(s3Client, firstResult); populateS3Client(s3Client, secondResult); @@ -222,7 +230,6 @@ void testFetchObjectWithInitialStartAfter() { assertThat(request.getStartAfter()).isEqualTo(startAfter); objects.next(); - assertThat(objects).hasNext(); verify(s3Client, times(2)).listObjectsV2(requestCaptor.capture()); request = requestCaptor.getValue(); @@ -237,19 +244,13 @@ void testFetchObjectWithInitialStartAfter() { assertThat(request.getContinuationToken()).isNull(); } - private ListObjectsV2Result getListObjectsV2Result() { - final S3ObjectSummary zeroByteObject = createObjectSummary(0, BUCKET_NAME, "key1"); - final S3ObjectSummary nonZeroByteObject1 = createObjectSummary(BUCKET_NAME, "key2"); - final S3ObjectSummary nonZeroByteObject2 = createObjectSummary(BUCKET_NAME, "key3"); - return createListObjectsV2Result(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); - } - @Test void testFetchObjectsWithOneObject() throws IOException { final String objectKey = "any-key"; initializeSourceClient(); final S3ObjectSummary objectSummary = createObjectSummary(BUCKET_NAME, objectKey); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result(Collections.singletonList(objectSummary), null); + final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result( + Collections.singletonList(objectSummary), null); populateS3Client(s3Client, listObjectsV2Result); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) .thenReturn(LAST_RESULT); @@ -270,7 +271,8 @@ void testFetchObjectsFiltersOutFailedObject() throws IOException { final S3ObjectSummary zeroByteObject = createObjectSummary(BUCKET_NAME, "key1"); final S3ObjectSummary nonZeroByteObject1 = createObjectSummary(BUCKET_NAME, "key2"); final S3ObjectSummary nonZeroByteObject2 = createObjectSummary(BUCKET_NAME, "key3"); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); + final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result( + List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) .thenReturn(LAST_RESULT); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java index d44d0a2a..8176676b 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java @@ -26,10 +26,10 @@ import java.util.HashMap; import java.util.Map; -import io.aiven.kafka.connect.common.OffsetManager; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; +import io.aiven.kafka.connect.common.OffsetManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -89,7 +89,6 @@ void testGetEntry() { assertThat(entry.getProperty("random_entry")).isEqualTo(5L); verify(sourceTaskContext, times(1)).offsetStorageReader(); - // verify second read reads from local data final S3OffsetManagerEntry entry2 = offsetManager.getEntry(entry.getManagerKey(), entry::fromProperties); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 91e90163..8199e3ed 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -29,6 +29,12 @@ import java.util.Collections; import java.util.function.Consumer; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; + import io.aiven.kafka.connect.common.ClosableIterator; import io.aiven.kafka.connect.common.OffsetManager; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; @@ -39,17 +45,11 @@ import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.io.function.IOSupplier; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.source.SourceTaskContext; -import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - final class SourceRecordIteratorTest { private S3SourceConfig mockConfig; @@ -80,18 +80,21 @@ void testIteratorProcessesS3Objects() { final String key = "topic-00001-abc123.txt"; when(offsetStorageReader.offset(any())).thenReturn(null); - when(mockSourceApiClient.getIteratorOfObjects(any())).thenReturn(ClosableIterator.wrap(Collections.emptyIterator())); + when(mockSourceApiClient.getIteratorOfObjects(any())) + .thenReturn(ClosableIterator.wrap(Collections.emptyIterator())); SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, offsetManager, transformer, mockSourceApiClient); assertThat(iterator).isExhausted(); - final S3Object result = new S3Object(); // NOPMD closed during testing below. + final S3Object result = new S3Object(); // NOPMD closed during testing below. result.setKey(key); result.setObjectContent(new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))); - when(mockSourceApiClient.getIteratorOfObjects(any())).thenReturn(Collections.singletonList(result).listIterator()).thenReturn(Collections.emptyIterator()); + when(mockSourceApiClient.getIteratorOfObjects(any())) + .thenReturn(Collections.singletonList(result).listIterator()) + .thenReturn(Collections.emptyIterator()); iterator = new SourceRecordIterator(mockConfig, offsetManager, transformer, mockSourceApiClient); @@ -111,22 +114,26 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws IOException transformer = new ByteArrayTransformer(); try (S3Object mockS3Object = mock(S3Object.class)) { - when(mockS3Object.getObjectContent()).thenReturn(new S3ObjectInputStream(new ByteArrayInputStream("This is a test".getBytes(StandardCharsets.UTF_8)), null)); + when(mockS3Object.getObjectContent()).thenReturn(new S3ObjectInputStream( + new ByteArrayInputStream("This is a test".getBytes(StandardCharsets.UTF_8)), null)); when(mockSourceApiClient.getIteratorOfObjects(any())).thenReturn(Collections.emptyIterator()); final S3OffsetManagerEntry entry = new S3OffsetManagerEntry("BUCKET", key, "topic", 1); entry.incrementRecordCount(); when(offsetStorageReader.offset(any())).thenReturn(entry.getProperties()); final S3Object s3Object = new S3Object(); // NOPMD object closed below s3Object.setKey(key); - when(mockSourceApiClient.getIteratorOfObjects(any())).thenReturn(Collections.singletonList(s3Object).listIterator()); + when(mockSourceApiClient.getIteratorOfObjects(any())) + .thenReturn(Collections.singletonList(s3Object).listIterator()); - final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, offsetManager, transformer, mockSourceApiClient); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, offsetManager, transformer, + mockSourceApiClient); assertThat(iterator).isExhausted(); } } - @SuppressWarnings("TestClassWithoutTestCases") - private static class TestingTransformer extends Transformer { + @SuppressWarnings("PMD.TestClassWithoutTestCases") // TODO figure out why this fails. + private static class TestingTransformer extends Transformer { // NOPMD because the above supress warnings does not + // work. private final static Logger LOGGER = LoggerFactory.getLogger(TestingTransformer.class); @Override @@ -135,38 +142,39 @@ public Schema getKeySchema() { } @Override - protected StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { - - return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { - private boolean wasRead; - @Override - protected InputStream inputOpened(final InputStream input) { - return input; - } - - @Override - protected void doClose() { - // nothing to do. + protected StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { + + return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { + private boolean wasRead; + @Override + protected InputStream inputOpened(final InputStream input) { + return input; + } + + @Override + protected void doClose() { + // nothing to do. + } + + @Override + protected boolean doAdvance(final Consumer action) { + if (wasRead) { + return false; } - - @Override - protected boolean doAdvance(final Consumer action) { - if (wasRead) { - return false; - } - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - IOUtils.copy(inputStream, baos); - final String result = "Transformed: " + baos; - action.accept(new SchemaAndValue(null, result)); - wasRead = true; - return true; - } catch (RuntimeException | IOException e) { // NOPMD must catch runtime exception here. - LOGGER.error("Error trying to advance inputStream: {}", e.getMessage(), e); - wasRead = true; - return false; - } + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + IOUtils.copy(inputStream, baos); + final String result = "Transformed: " + baos; + action.accept(new SchemaAndValue(null, result)); + wasRead = true; + return true; + } catch (RuntimeException | IOException e) { // NOPMD must catch runtime exception here. + LOGGER.error("Error trying to advance inputStream: {}", e.getMessage(), e); + wasRead = true; + return false; } - }; + } + }; } } }