From c40997c3bb74397678412caa2e136d23ba07db92 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Sat, 11 Jan 2025 03:01:33 +0000 Subject: [PATCH] Add length to the Transformer to allow improvements to the byte array in the future. Signed-off-by: Aindriu Lavelle --- .../common/config/SourceCommonConfig.java | 16 +++---- ...Fragment.java => TransformerFragment.java} | 46 +++++++++++-------- .../common/source/input/AvroTransformer.java | 7 +-- .../source/input/ByteArrayTransformer.java | 37 +++++++++++++-- .../common/source/input/JsonTransformer.java | 5 +- .../source/input/ParquetTransformer.java | 7 +-- .../common/source/input/Transformer.java | 19 +++++--- .../source/input/TransformerFactory.java | 2 +- ...Test.java => TransformerFragmentTest.java} | 21 +++++---- .../source/input/AvroTransformerTest.java | 21 +++++---- .../input/ByteArrayTransformerTest.java | 16 +++---- .../source/input/JsonTransformerTest.java | 25 +++++----- .../source/input/ParquetTransformerTest.java | 24 +++++----- .../input/TransformerStreamingTest.java | 15 +++--- .../connect/s3/source/AwsIntegrationTest.java | 4 +- .../connect/s3/source/IntegrationTest.java | 8 ++-- .../s3/source/config/S3SourceConfig.java | 6 +-- .../s3/source/utils/SourceRecordIterator.java | 5 +- .../connect/s3/source/S3SourceTaskTest.java | 2 +- .../s3/source/config/S3SourceConfigTest.java | 4 +- .../utils/SourceRecordIteratorTest.java | 15 +++--- 21 files changed, 182 insertions(+), 123 deletions(-) rename commons/src/main/java/io/aiven/kafka/connect/common/config/{SchemaRegistryFragment.java => TransformerFragment.java} (68%) rename commons/src/test/java/io/aiven/kafka/connect/common/config/{SchemaRegistryFragmentTest.java => TransformerFragmentTest.java} (59%) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java index 7e49f748..275562a7 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -27,7 +27,7 @@ public class SourceCommonConfig extends CommonConfig { - private final SchemaRegistryFragment schemaRegistryFragment; + private final TransformerFragment transformerFragment; private final SourceConfigFragment sourceConfigFragment; private final FileNameFragment fileNameFragment; private final OutputFormatFragment outputFormatFragment; @@ -35,7 +35,7 @@ public class SourceCommonConfig extends CommonConfig { public SourceCommonConfig(ConfigDef definition, Map originals) {// NOPMD super(definition, originals); // Construct Fragments - schemaRegistryFragment = new SchemaRegistryFragment(this); + transformerFragment = new TransformerFragment(this); sourceConfigFragment = new SourceConfigFragment(this); fileNameFragment = new FileNameFragment(this); outputFormatFragment = new OutputFormatFragment(this); @@ -44,18 +44,18 @@ public SourceCommonConfig(ConfigDef definition, Map originals) {// NOPMD } private void validate() { - schemaRegistryFragment.validate(); + transformerFragment.validate(); sourceConfigFragment.validate(); fileNameFragment.validate(); outputFormatFragment.validate(); } public InputFormat getInputFormat() { - return schemaRegistryFragment.getInputFormat(); + return transformerFragment.getInputFormat(); } public String getSchemaRegistryUrl() { - return schemaRegistryFragment.getSchemaRegistryUrl(); + return transformerFragment.getSchemaRegistryUrl(); } public String getTargetTopics() { @@ -74,11 +74,11 @@ public int getMaxPollRecords() { } public Transformer getTransformer() { - return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat()); + return TransformerFactory.getTransformer(transformerFragment.getInputFormat()); } - public int getByteArrayTransformerMaxBufferSize() { - return schemaRegistryFragment.getByteArrayTransformerMaxBufferSize(); + public int getTransformerMaxBufferSize() { + return transformerFragment.getTransformerMaxBufferSize(); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java similarity index 68% rename from commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java rename to commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java index 48d5e139..db5a5a20 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java @@ -24,15 +24,15 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; -public final class SchemaRegistryFragment extends ConfigFragment { - private static final String SCHEMAREGISTRY_GROUP = "Schema registry group"; +public final class TransformerFragment extends ConfigFragment { + private static final String TRANSFORMER_GROUP = "Transformer group"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url"; public static final String AVRO_VALUE_SERIALIZER = "value.serializer"; public static final String INPUT_FORMAT_KEY = "input.format"; public static final String SCHEMAS_ENABLE = "schemas.enable"; - public static final String BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE = "byte.array.transformer.max.buffer.size"; - private static final int MAX_BUFFER_SIZE = 4096; + public static final String TRANSFORMER_MAX_BUFFER_SIZE = "transformer.max.buffer.size"; + private static final int DEFAULT_MAX_BUFFER_SIZE = 4096; /** * Construct the ConfigFragment.. @@ -40,29 +40,30 @@ public final class SchemaRegistryFragment extends ConfigFragment { * @param cfg * the configuration that this fragment is associated with. */ - public SchemaRegistryFragment(final AbstractConfig cfg) { + public TransformerFragment(final AbstractConfig cfg) { super(cfg); } public static ConfigDef update(final ConfigDef configDef) { - int srCounter = 0; + int transformerCounter = 0; configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), - ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++, + ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, transformerCounter++, ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL); configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", - SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL); + new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, + transformerCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL); configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(), new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, - "Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + "Input format of messages read from source avro/json/parquet/bytes", TRANSFORMER_GROUP, + transformerCounter++, // NOPMD ConfigDef.Width.NONE, INPUT_FORMAT_KEY); - configDef.define(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, MAX_BUFFER_SIZE, + configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, DEFAULT_MAX_BUFFER_SIZE, new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM, - "Max Size of the byte buffer when using the BYTE Transformer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + "Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD ConfigDef.Width.NONE, INPUT_FORMAT_KEY); configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, - "Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + "Avro value serializer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD // UnusedAssignment ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER); return configDef; @@ -80,17 +81,24 @@ public Class getAvroValueSerializer() { return cfg.getClass(AVRO_VALUE_SERIALIZER); } - public int getByteArrayTransformerMaxBufferSize() { - return cfg.getInt(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE); + public int getTransformerMaxBufferSize() { + return cfg.getInt(TRANSFORMER_MAX_BUFFER_SIZE); } private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator { @Override public void ensureValid(final String name, final Object value) { - - final int size = (int) value; - if (size <= 0) { - throw new ConfigException(String.format("%s must be larger then 0", name)); + try { + if (value instanceof Integer) { + final int size = (int) value; + if (size <= 0) { + throw new ConfigException( + String.format("%s must be larger then 0 and less then %s", name, Integer.MAX_VALUE)); + } + } + } catch (ClassCastException cce) { + throw new ConfigException(String.format("%s must be an integer greater then zero and less then %s.", + name, Integer.MAX_VALUE)); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java index 35039c08..2125acfe 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; import java.io.IOException; import java.io.InputStream; @@ -55,8 +55,9 @@ public void configureValueConverter(final Map config, final Sour } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { private DataFileStream dataFileStream; private final DatumReader datumReader = new GenericDatumReader<>(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java index 98e01b15..1ba0ae59 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java @@ -32,6 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * ByteArrayTransformer chunks an entire object into a maximum size specified by the + * {@link io.aiven.kafka.connect.common.config.TransformerFragment#TRANSFORMER_MAX_BUFFER_SIZE} configuration option. + * This will split large files into multiple records and each record will have the same key. + */ public class ByteArrayTransformer extends Transformer { private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class); @@ -42,10 +47,17 @@ public void configureValueConverter(final Map config, final Sour } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { + if (streamLength == 0) { + LOGGER.warn( + "Object sent for processing has an invalid streamLength of {}, object is empty returning an empty spliterator.", + streamLength); + return emptySpliterator(inputStreamIOSupplier); + } // The max buffer size for the byte array the default is 4096 if not set by the user. - final int maxBufferSize = sourceConfig.getByteArrayTransformerMaxBufferSize(); + final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize(); return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { @Override protected InputStream inputOpened(final InputStream input) { @@ -79,6 +91,25 @@ protected boolean doAdvance(final Consumer action) { }; } + private static StreamSpliterator emptySpliterator(final IOSupplier inputStreamIOSupplier) { + return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { + @Override + protected boolean doAdvance(final Consumer action) { + return false; + } + + @Override + protected void doClose() { + + } + + @Override + protected InputStream inputOpened(final InputStream input) throws IOException { + return InputStream.nullInputStream(); + } + }; + } + @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, final SourceCommonConfig sourceConfig) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index 7555397e..aa5451e7 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -53,8 +53,9 @@ public void configureValueConverter(final Map config, final Sour } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { BufferedReader reader; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java index eb4b1abf..05512e9e 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; import java.io.File; import java.io.IOException; @@ -66,8 +66,9 @@ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topi } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java index 7dfc5f32..64e8a90c 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java @@ -33,14 +33,16 @@ public abstract class Transformer { + public final static long UNKNOWN_STREAM_LENGTH = -1; + public abstract void configureValueConverter(Map config, SourceCommonConfig sourceConfig); public final Stream getRecords(final IOSupplier inputStreamIOSupplier, - final String topic, final int topicPartition, final SourceCommonConfig sourceConfig, - final long skipRecords) { + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig, final long skipRecords) { - final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition, - sourceConfig); + final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, streamLength, topic, + topicPartition, sourceConfig); return StreamSupport.stream(spliterator, false).onClose(spliterator::close).skip(skipRecords); } @@ -49,16 +51,19 @@ public final Stream getRecords(final IOSupplier inp * * @param inputStreamIOSupplier * the input stream supplier. + * @param streamLength + * the length of the input stream, {@link #UNKNOWN_STREAM_LENGTH} may be used to specify a stream with an + * unknown length, streams of length zero will log an error and return an empty stream * @param topic * the topic. * @param topicPartition * the partition. * @param sourceConfig - * the source configuraiton. + * the source configuration. * @return a StreamSpliterator instance. */ - protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, String topic, - int topicPartition, SourceCommonConfig sourceConfig); + protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, + long streamLength, String topic, int topicPartition, SourceCommonConfig sourceConfig); public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, SourceCommonConfig sourceConfig); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java index 57460430..06d872be 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE; import java.util.Map; diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java similarity index 59% rename from commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java rename to commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java index 6c12a3b9..ec70883f 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java @@ -28,26 +28,31 @@ import org.junit.jupiter.api.Test; -public class SchemaRegistryFragmentTest {// NOPMD +class TransformerFragmentTest { @Test void validateCorrectBufferSizeIsAccepted() { final int bufferSize = 50; - final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef()); + final ConfigDef configDef = TransformerFragment.update(new ConfigDef()); final Map props = new HashMap<>(); - props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, bufferSize); + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, bufferSize); - final SchemaRegistryFragment schemaReg = new SchemaRegistryFragment(new AbstractConfig(configDef, props)); - assertThat(schemaReg.getByteArrayTransformerMaxBufferSize()).isEqualTo(bufferSize); + final TransformerFragment schemaReg = new TransformerFragment(new AbstractConfig(configDef, props)); + assertThat(schemaReg.getTransformerMaxBufferSize()).isEqualTo(bufferSize); } @Test void validateInvalidBufferSizeThrowsConfigException() { - final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef()); + final ConfigDef configDef = TransformerFragment.update(new ConfigDef()); final Map props = new HashMap<>(); - props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, 0); + // Too small + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, 0); - assertThatThrownBy(() -> new SchemaRegistryFragment(new AbstractConfig(configDef, props))) + assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props))) + .isInstanceOf(ConfigException.class); + // Too large + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, Integer.MAX_VALUE + "1"); + assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props))) .isInstanceOf(ConfigException.class); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java index 617dd290..55aa1232 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java @@ -16,7 +16,8 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; @@ -75,11 +76,11 @@ void testConfigureValueConverter() { } @Test - void testReadAvroRecordsInvalidData() { + void testReadAvroRecordsInvalidData() throws IOException { final InputStream inputStream = new ByteArrayInputStream("mock-avro-data".getBytes(StandardCharsets.UTF_8)); - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 0); + final Stream records = avroTransformer.getRecords(() -> inputStream, UNKNOWN_STREAM_LENGTH, "", + 0, sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).isEmpty(); @@ -95,8 +96,8 @@ void testReadAvroRecords() throws Exception { expected.add("Hello, Kafka Connect S3 Source! object " + i); } - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 0); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), "", 0, + sourceCommonConfig, 0); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Struct) sv).getString("message")) @@ -112,8 +113,8 @@ void testReadAvroRecordsSkipFew() throws Exception { for (int i = 5; i < 20; i++) { expected.add("Hello, Kafka Connect S3 Source! object " + i); } - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 5); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), "", 0, + sourceCommonConfig, 5); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Struct) sv).getString("message")) @@ -125,8 +126,8 @@ void testReadAvroRecordsSkipMoreRecordsThanExist() throws Exception { final ByteArrayOutputStream avroData = generateMockAvroData(20); final InputStream inputStream = new ByteArrayInputStream(avroData.toByteArray()); - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 25); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), "", 0, + sourceCommonConfig, 25); assertThat(records).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java index 4d8aed15..93c0b5f2 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java @@ -60,9 +60,9 @@ void testGetRecordsSingleChunk() { final byte[] data = { 1, 2, 3, 4, 5 }; final InputStream inputStream = new ByteArrayInputStream(data); final IOSupplier inputStreamIOSupplier = () -> inputStream; - when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(4096); - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, - sourceCommonConfig, 0); + when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(4096); + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, + (long) data.length, TEST_TOPIC, 0, sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).hasSize(1); @@ -75,7 +75,7 @@ void testGetRecordsEmptyInputStream() { final IOSupplier inputStreamIOSupplier = () -> inputStream; - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, 0, TEST_TOPIC, 0, sourceCommonConfig, 0); assertThat(records).hasSize(0); @@ -93,10 +93,10 @@ void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int final byte[] data = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; final InputStream inputStream = new ByteArrayInputStream(data); final IOSupplier inputStreamIOSupplier = () -> inputStream; - when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(maxBufferSize); + when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(maxBufferSize); - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, - sourceCommonConfig, 0); + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, + (long) data.length, TEST_TOPIC, 0, sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).hasSize(numberOfExpectedRecords); @@ -108,6 +108,6 @@ void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int }); assertThat(processedData.buffer()).isEqualTo(data); // Should only get called once per splitIterator - verify(sourceCommonConfig, times(1)).getByteArrayTransformerMaxBufferSize(); + verify(sourceCommonConfig, times(1)).getTransformerMaxBufferSize(); } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java index e482fd61..7b3dde58 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java @@ -16,7 +16,8 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -74,7 +75,7 @@ void destroy() { } @Test - void testHandleValueDataWithValidJson() { + void testHandleValueDataWithValidJson() throws IOException { final InputStream validJsonInputStream = new ByteArrayInputStream( getJsonRecs(100).getBytes(StandardCharsets.UTF_8)); @@ -83,8 +84,8 @@ void testHandleValueDataWithValidJson() { expected.add("value" + i); } - final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, TESTTOPIC, 1, - sourceCommonConfig, 0); + final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, + UNKNOWN_STREAM_LENGTH, TESTTOPIC, 1, sourceCommonConfig, 0); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Map) sv).get("key")) @@ -92,7 +93,7 @@ void testHandleValueDataWithValidJson() { } @Test - void testHandleValueDataWithValidJsonSkipFew() { + void testHandleValueDataWithValidJsonSkipFew() throws IOException { final InputStream validJsonInputStream = new ByteArrayInputStream( getJsonRecs(100).getBytes(StandardCharsets.UTF_8)); @@ -101,8 +102,8 @@ void testHandleValueDataWithValidJsonSkipFew() { expected.add("value" + i); } - final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, TESTTOPIC, 1, - sourceCommonConfig, 25L); + final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, + UNKNOWN_STREAM_LENGTH, TESTTOPIC, 1, sourceCommonConfig, 25L); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Map) sv).get("key")) @@ -111,13 +112,13 @@ void testHandleValueDataWithValidJsonSkipFew() { } @Test - void testHandleValueDataWithInvalidJson() { + void testHandleValueDataWithInvalidJson() throws IOException { final InputStream invalidJsonInputStream = new ByteArrayInputStream( "invalid-json".getBytes(StandardCharsets.UTF_8)); final IOSupplier inputStreamIOSupplier = () -> invalidJsonInputStream; - final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, - sourceCommonConfig, 0); + final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, + UNKNOWN_STREAM_LENGTH, TESTTOPIC, 1, sourceCommonConfig, 0); assertThat(jsonNodes).isEmpty(); @@ -126,7 +127,7 @@ void testHandleValueDataWithInvalidJson() { @Test void testGetRecordsWithIOException() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, 0L, "topic", 0, null, 0); assertThat(resultStream).isEmpty(); } @@ -134,7 +135,7 @@ void testGetRecordsWithIOException() throws IOException { @Test void testCustomSpliteratorWithIOExceptionDuringInitialization() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException during initialization")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, 0L, "topic", 0, null, 0); assertThat(resultStream).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java index 2f7a405f..449240a6 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java @@ -58,7 +58,7 @@ public void setUp() { } @Test - void testHandleValueDataWithZeroBytes() { + void testHandleValueDataWithZeroBytes() throws IOException { final byte[] mockParquetData = new byte[0]; final InputStream inputStream = new ByteArrayInputStream(mockParquetData); final IOSupplier inputStreamIOSupplier = () -> inputStream; @@ -66,8 +66,8 @@ void testHandleValueDataWithZeroBytes() { final String topic = "test-topic"; final int topicPartition = 0; - final Stream recs = parquetTransformer.getRecords(inputStreamIOSupplier, topic, topicPartition, - s3SourceConfig, 0L); + final Stream recs = parquetTransformer.getRecords(inputStreamIOSupplier, mockParquetData.length, + topic, topicPartition, s3SourceConfig, 0L); assertThat(recs).isEmpty(); } @@ -86,7 +86,7 @@ void testGetRecordsWithValidData() throws Exception { expected.add("name" + i); } final List records = parquetTransformer - .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 0L) + .getRecords(inputStreamIOSupplier, mockParquetData.length, topic, topicPartition, s3SourceConfig, 0L) .collect(Collectors.toList()); assertThat(records).extracting(SchemaAndValue::value) @@ -110,7 +110,7 @@ void testGetRecordsWithValidDataSkipFew() throws Exception { } final List records = parquetTransformer - .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 25L) + .getRecords(inputStreamIOSupplier, mockParquetData.length, topic, topicPartition, s3SourceConfig, 25L) .collect(Collectors.toList()); assertThat(records).extracting(SchemaAndValue::value) @@ -119,7 +119,7 @@ void testGetRecordsWithValidDataSkipFew() throws Exception { } @Test - void testGetRecordsWithInvalidData() { + void testGetRecordsWithInvalidData() throws IOException { final byte[] invalidData = "invalid data".getBytes(StandardCharsets.UTF_8); final InputStream inputStream = new ByteArrayInputStream(invalidData); final IOSupplier inputStreamIOSupplier = () -> inputStream; @@ -129,8 +129,8 @@ void testGetRecordsWithInvalidData() { final String topic = "test-topic"; final int topicPartition = 0; - final Stream records = parquetTransformer.getRecords(inputStreamIOSupplier, topic, - topicPartition, s3SourceConfig, 0L); + final Stream records = parquetTransformer.getRecords(inputStreamIOSupplier, invalidData.length, + topic, topicPartition, s3SourceConfig, 0L); assertThat(records).isEmpty(); } @@ -155,8 +155,8 @@ void testIOExceptionCreatingTempFile() { .thenThrow(new IOException("Test IOException for temp file")); final IOSupplier inputStreamSupplier = mock(IOSupplier.class); - final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", - 1, null, 0L); + final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, 0L, + "test-topic", 1, null, 0L); assertThat(resultStream).isEmpty(); } @@ -168,8 +168,8 @@ void testIOExceptionDuringDataCopy() throws IOException { when(inputStreamMock.read(any(byte[].class))).thenThrow(new IOException("Test IOException during copy")); final IOSupplier inputStreamSupplier = () -> inputStreamMock; - final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", - 1, null, 0L); + final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, 0L, + "test-topic", 1, null, 0L); assertThat(resultStream).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java index a4b3c522..8895f4d2 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.common.source.input; import static io.aiven.kafka.connect.common.config.OutputFormatFragmentFixture.OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -37,8 +38,8 @@ import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.config.OutputFormatFragment; -import io.aiven.kafka.connect.common.config.SchemaRegistryFragment; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.config.TransformerFragment; import org.apache.commons.io.function.IOSupplier; import org.junit.jupiter.params.ParameterizedTest; @@ -56,7 +57,7 @@ void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] tes final SourceCommonConfig config, final int expectedCount) throws IOException { final IOSupplier ioSupplier = mock(IOSupplier.class); when(ioSupplier.get()).thenThrow(new IOException("Test IOException during initialization")); - final Stream objStream = transformer.getRecords(ioSupplier, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(ioSupplier, UNKNOWN_STREAM_LENGTH, "topic", 1, config, 0); assertThat(objStream).isEmpty(); } @@ -74,7 +75,8 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD when(inputStream.readNBytes(anyInt())).thenThrow(new IOException("Test IOException during read")); when(inputStream.readAllBytes()).thenThrow(new IOException("Test IOException during read")); try (CloseTrackingStream stream = new CloseTrackingStream(inputStream)) { - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, "topic", 1, + config, 0); assertThat(objStream).isEmpty(); assertThat(stream.closeCount).isGreaterThan(0); } @@ -86,7 +88,7 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, "topic", 1, config, 0); final long count = objStream.count(); assertThat(count).isEqualTo(expectedCount); assertThat(stream.closeCount).isGreaterThan(0); @@ -97,7 +99,8 @@ void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, "topic", 1, + config, 0); final Iterator iter = objStream.iterator(); long count = 0L; while (iter.hasNext()) { @@ -118,7 +121,7 @@ static Stream testData() throws IOException { }, 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.BYTES), "Hello World".getBytes(StandardCharsets.UTF_8), new SourceCommonConfig( - SchemaRegistryFragment.update(OutputFormatFragment.update(new ConfigDef(), null)), props) { + TransformerFragment.update(OutputFormatFragment.update(new ConfigDef(), null)), props) { }, 1)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.JSONL), JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8), diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 42d10aad..e3ce92f8 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -16,10 +16,10 @@ package io.aiven.kafka.connect.s3.source; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 083d8627..96bfe908 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -16,12 +16,12 @@ package io.aiven.kafka.connect.s3.source; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index 23dc69e9..ee50160b 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -25,9 +25,9 @@ import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.common.config.OutputFormatFragment; -import io.aiven.kafka.connect.common.config.SchemaRegistryFragment; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.config.SourceConfigFragment; +import io.aiven.kafka.connect.common.config.TransformerFragment; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; import io.aiven.kafka.connect.iam.AwsStsRole; @@ -54,7 +54,7 @@ public static ConfigDef configDef() { S3ConfigFragment.update(configDef); SourceConfigFragment.update(configDef); FileNameFragment.update(configDef); - SchemaRegistryFragment.update(configDef); + TransformerFragment.update(configDef); OutputFormatFragment.update(configDef, OutputFieldType.VALUE); return configDef; @@ -64,7 +64,7 @@ private void validate() { // s3ConfigFragment is validated in this method as it is created here. // Other Fragments created in the ConfigDef are validated in the parent classes their instances are created in. - // e.g. SourceConfigFragment, FileNameFragment, SchemaRegistryFragment and OutputFormatFragment are all + // e.g. SourceConfigFragment, FileNameFragment, TransformerFragment and OutputFormatFragment are all // validated in SourceCommonConfig. s3ConfigFragment.validate(); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index bded51d1..9ba7d3e1 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -129,8 +129,9 @@ private Stream convert(final S3Object s3Object) { final SchemaAndValue keyData = transformer.getKeyData(s3Object.key(), topic, s3SourceConfig); - return transformer - .getRecords(sourceClient.getObject(s3Object.key()), topic, partitionId, s3SourceConfig, recordCount) + return transformer // s3Object.Size() in bytes of the object + .getRecords(sourceClient.getObject(s3Object.key()), s3Object.size(), topic, partitionId, s3SourceConfig, + recordCount) .map(new Mapper(partitionMap, recordCount, keyData, s3Object.key())); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 944ccbfd..ae672411 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -16,9 +16,9 @@ package io.aiven.kafka.connect.s3.source; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index 10939c51..031b4d1c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -16,10 +16,10 @@ package io.aiven.kafka.connect.s3.source.config; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 2cb1a624..8b0eb97c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source.utils; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyMap; @@ -76,7 +77,7 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); - when(mockConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(4096); + when(mockConfig.getTransformerMaxBufferSize()).thenReturn(4096); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); @@ -86,7 +87,7 @@ void testIteratorProcessesS3Objects() throws Exception { assertThat(iterator.hasNext()).isFalse(); - final S3Object obj = S3Object.builder().key(key).build(); + final S3Object obj = S3Object.builder().key(key).size(UNKNOWN_STREAM_LENGTH).build(); final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); @@ -103,7 +104,7 @@ void testIteratorProcessesS3Objects() throws Exception { void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; - final S3Object s3Object = S3Object.builder().key(key).build(); + final S3Object s3Object = S3Object.builder().key(key).size(UNKNOWN_STREAM_LENGTH).build(); // With ByteArrayTransformer try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { @@ -112,7 +113,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); mockTransformer = mock(ByteArrayTransformer.class); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + when(mockTransformer.getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Stream.of(SchemaAndValue.NULL)); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); @@ -127,7 +128,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { mockTransformer, mockSourceApiClient); assertThat(iterator.hasNext()).isFalse(); verify(mockSourceApiClient, never()).getObject(any()); - verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + verify(mockTransformer, never()).getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong()); } // With AvroTransformer @@ -142,7 +143,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + when(mockTransformer.getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, @@ -150,7 +151,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { assertThat(iterator.hasNext()).isTrue(); iterator.next(); - verify(mockTransformer, times(1)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + verify(mockTransformer, times(1)).getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong()); } }