diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java index d527db411..7d653903a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -28,7 +28,7 @@ public class SourceCommonConfig extends CommonConfig { - private final SchemaRegistryFragment schemaRegistryFragment; + private final TransformerFragment transformerFragment; private final SourceConfigFragment sourceConfigFragment; private final FileNameFragment fileNameFragment; private final OutputFormatFragment outputFormatFragment; @@ -36,7 +36,7 @@ public class SourceCommonConfig extends CommonConfig { public SourceCommonConfig(ConfigDef definition, Map originals) {// NOPMD super(definition, originals); // Construct Fragments - schemaRegistryFragment = new SchemaRegistryFragment(this); + transformerFragment = new TransformerFragment(this); sourceConfigFragment = new SourceConfigFragment(this); fileNameFragment = new FileNameFragment(this); outputFormatFragment = new OutputFormatFragment(this); @@ -45,18 +45,18 @@ public SourceCommonConfig(ConfigDef definition, Map originals) {// NOPMD } private void validate() { - schemaRegistryFragment.validate(); + transformerFragment.validate(); sourceConfigFragment.validate(); fileNameFragment.validate(); outputFormatFragment.validate(); } public InputFormat getInputFormat() { - return schemaRegistryFragment.getInputFormat(); + return transformerFragment.getInputFormat(); } public String getSchemaRegistryUrl() { - return schemaRegistryFragment.getSchemaRegistryUrl(); + return transformerFragment.getSchemaRegistryUrl(); } public String getTargetTopics() { @@ -76,7 +76,11 @@ public int getMaxPollRecords() { } public Transformer getTransformer() { - return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat()); + return TransformerFactory.getTransformer(transformerFragment.getInputFormat()); + } + + public int getTransformerMaxBufferSize() { + return transformerFragment.getTransformerMaxBufferSize(); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java similarity index 60% rename from commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java rename to commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java index 8ea7b7f95..4651244b1 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java @@ -20,16 +20,19 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import io.aiven.kafka.connect.common.source.input.InputFormat; -public final class SchemaRegistryFragment extends ConfigFragment { - private static final String SCHEMAREGISTRY_GROUP = "Schema registry group"; +public final class TransformerFragment extends ConfigFragment { + private static final String TRANSFORMER_GROUP = "Transformer group"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url"; public static final String AVRO_VALUE_SERIALIZER = "value.serializer"; public static final String INPUT_FORMAT_KEY = "input.format"; public static final String SCHEMAS_ENABLE = "schemas.enable"; + public static final String TRANSFORMER_MAX_BUFFER_SIZE = "transformer.max.buffer.size"; + private static final int DEFAULT_MAX_BUFFER_SIZE = 4096; /** * Construct the ConfigFragment.. @@ -37,25 +40,28 @@ public final class SchemaRegistryFragment extends ConfigFragment { * @param cfg * the configuration that this fragment is associated with. */ - public SchemaRegistryFragment(final AbstractConfig cfg) { + public TransformerFragment(final AbstractConfig cfg) { super(cfg); } public static ConfigDef update(final ConfigDef configDef) { - int srCounter = 0; + int transformerCounter = 0; configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), - ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++, + ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, transformerCounter++, ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL); configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", - SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL); + new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, + transformerCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL); configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(), new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, - "Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + "Input format of messages read from source avro/json/parquet/bytes", TRANSFORMER_GROUP, + transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY); + configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, DEFAULT_MAX_BUFFER_SIZE, + new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM, + "Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY); - configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, - "Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + "Avro value serializer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD // UnusedAssignment ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER); return configDef; @@ -73,4 +79,21 @@ public Class getAvroValueSerializer() { return cfg.getClass(AVRO_VALUE_SERIALIZER); } + public int getTransformerMaxBufferSize() { + return cfg.getInt(TRANSFORMER_MAX_BUFFER_SIZE); + } + + private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + + // ConfigDef will throw an error if this is not an int that is supplied + if ((int) value <= 0) { + throw new ConfigException( + String.format("%s must be larger then 0 and less then %s", name, Integer.MAX_VALUE)); + } + + } + } + } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java index ba46df870..3647ca129 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java @@ -16,18 +16,17 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; - import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; + import io.confluent.connect.avro.AvroData; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -49,21 +48,15 @@ public class AvroTransformer extends Transformer { } @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { - config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL)); - } - - @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final Integer topicPartition, final AbstractConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final Context context, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { private DataFileStream dataFileStream; private final DatumReader datumReader = new GenericDatumReader<>(); @Override - protected InputStream inputOpened(final InputStream input) throws IOException { + protected void inputOpened(final InputStream input) throws IOException { dataFileStream = new DataFileStream<>(input, datumReader); - return input; } @Override @@ -91,7 +84,7 @@ protected boolean doAdvance(final Consumer action) { @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java index 5b38bfef0..b33cb1272 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java @@ -20,34 +20,42 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; + import org.apache.commons.io.IOUtils; import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * ByteArrayTransformer chunks an entire object into a maximum size specified by the + * {@link io.aiven.kafka.connect.common.config.TransformerFragment#TRANSFORMER_MAX_BUFFER_SIZE} configuration option. + * This will split large files into multiple records and each record will have the same key. + */ public class ByteArrayTransformer extends Transformer { private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class); - private static final int MAX_BUFFER_SIZE = 4096; - @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { - // For byte array transformations, ByteArrayConverter is the converter which is the default config. - } - - @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final Integer topicPartition, final AbstractConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final Context context, final SourceCommonConfig sourceConfig) { + if (streamLength == 0) { + LOGGER.warn( + "Object sent for processing has an invalid streamLength of {}, object is empty returning an empty spliterator.", + streamLength); + return emptySpliterator(inputStreamIOSupplier); + } + // The max buffer size for the byte array the default is 4096 if not set by the user. + final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize(); return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { + @Override - protected InputStream inputOpened(final InputStream input) { - return input; + protected void inputOpened(final InputStream input) { + } @Override @@ -57,18 +65,16 @@ protected void doClose() { @Override protected boolean doAdvance(final Consumer action) { - final byte[] buffer = new byte[MAX_BUFFER_SIZE]; + try { - final int bytesRead = IOUtils.read(inputStream, buffer); - if (bytesRead == 0) { - return false; + final byte[] buffer = new byte[maxBufferSize]; + final byte[] chunk = Arrays.copyOf(buffer, IOUtils.read(inputStream, buffer)); + if (chunk.length > 0) { + action.accept(new SchemaAndValue(null, chunk)); + return true; } - if (bytesRead < MAX_BUFFER_SIZE) { - action.accept(new SchemaAndValue(null, Arrays.copyOf(buffer, bytesRead))); - } else { - action.accept(new SchemaAndValue(null, buffer)); - } - return true; + + return false; } catch (IOException e) { LOGGER.error("Error trying to advance inputStream: {}", e.getMessage(), e); return false; @@ -77,9 +83,35 @@ protected boolean doAdvance(final Consumer action) { }; } + /** + * This method returns an empty spliterator when an empty input stream is supplied to be split + * + * @param inputStreamIOSupplier + * The empty input stream that was supplied + * @return an Empty spliterator to return to the calling method. + */ + private static StreamSpliterator emptySpliterator(final IOSupplier inputStreamIOSupplier) { + return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { + @Override + protected boolean doAdvance(final Consumer action) { + return false; + } + + @Override + protected void doClose() { + // nothing to do + } + + @Override + protected void inputOpened(final InputStream input) { + } + + }; + } + @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index aeea9700c..7b39552a7 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -21,13 +21,14 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; + import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.function.IOSupplier; import org.apache.commons.lang3.StringUtils; @@ -48,19 +49,15 @@ public class JsonTransformer extends Transformer { } @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { - } - - @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final Integer topicPartition, final AbstractConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final Context context, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { BufferedReader reader; @Override - protected InputStream inputOpened(final InputStream input) throws IOException { + protected void inputOpened(final InputStream input) { reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); - return input; + } @Override @@ -87,7 +84,9 @@ public boolean doAdvance(final Consumer action) { } } line = line.trim(); - action.accept(jsonConverter.toConnectData(topic, line.getBytes(StandardCharsets.UTF_8))); + // toConnectData does not actually use topic in the conversion so its fine if it is null. + action.accept(jsonConverter.toConnectData(context.getTopic().orElse(null), + line.getBytes(StandardCharsets.UTF_8))); return true; } catch (IOException e) { LOGGER.error("Error reading input stream: {}", e.getMessage(), e); @@ -99,7 +98,7 @@ public boolean doAdvance(final Consumer action) { @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java index 9acea1a41..679d70b3d 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java @@ -16,8 +16,6 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; - import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -26,13 +24,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; -import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.input.parquet.LocalInputFile; +import io.aiven.kafka.connect.common.source.task.Context; import io.confluent.connect.avro.AvroData; import org.apache.avro.generic.GenericRecord; @@ -54,20 +52,15 @@ public class ParquetTransformer extends Transformer { this.avroData = avroData; } - @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { - config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL)); - } - @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final Integer topicPartition, final AbstractConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final Context context, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { @@ -75,12 +68,13 @@ public StreamSpliterator createSpliterator(final IOSupplier inputSt private File parquetFile; @Override - protected InputStream inputOpened(final InputStream input) throws IOException { + protected void inputOpened(final InputStream input) throws IOException { final String timestamp = String.valueOf(Instant.now().toEpochMilli()); try { // Create a temporary file for the Parquet data - parquetFile = File.createTempFile(topic + "_" + topicPartition + "_" + timestamp, ".parquet"); + parquetFile = File.createTempFile(context.getTopic().orElse("topic") + "_" + + context.getPartition().orElse(null) + "_" + timestamp, ".parquet"); } catch (IOException e) { LOGGER.error("Error creating temp file for Parquet data: {}", e.getMessage(), e); throw e; @@ -90,7 +84,7 @@ protected InputStream inputOpened(final InputStream input) throws IOException { IOUtils.copy(input, outputStream); // Copy input stream to temporary file } reader = AvroParquetReader.builder(new LocalInputFile(parquetFile.toPath())).build(); - return input; + } @Override diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java index 274494f52..77156d8a5 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java @@ -18,27 +18,28 @@ import java.io.IOException; import java.io.InputStream; -import java.util.Map; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; + import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; public abstract class Transformer { - public abstract void configureValueConverter(Map config, AbstractConfig sourceConfig); + public final static long UNKNOWN_STREAM_LENGTH = -1; public final Stream getRecords(final IOSupplier inputStreamIOSupplier, - final String topic, final Integer topicPartition, final AbstractConfig sourceConfig, + final long streamLength, final Context context, final SourceCommonConfig sourceConfig, final long skipRecords) { - final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition, + final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, streamLength, context, sourceConfig); return StreamSupport.stream(spliterator, false).onClose(spliterator::close).skip(skipRecords); } @@ -48,18 +49,19 @@ public final Stream getRecords(final IOSupplier inp * * @param inputStreamIOSupplier * the input stream supplier. - * @param topic - * the topic. - * @param topicPartition - * the partition. + * @param streamLength + * the length of the input stream, {@link #UNKNOWN_STREAM_LENGTH} may be used to specify a stream with an + * unknown length, streams of length zero will log an error and return an empty stream + * @param context + * the context * @param sourceConfig - * the source configuraiton. + * the source configuration. * @return a StreamSpliterator instance. */ - protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, String topic, - Integer topicPartition, AbstractConfig sourceConfig); + protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, + long streamLength, Context context, SourceCommonConfig sourceConfig); - public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, AbstractConfig sourceConfig); + public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, SourceCommonConfig sourceConfig); /** * A Spliterator that performs various checks on the opening/closing of the input stream. @@ -74,7 +76,7 @@ protected abstract static class StreamSpliterator implements Spliterator action) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java index 574604306..06d872be2 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE; import java.util.Map; diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java new file mode 100644 index 000000000..cb6df54f8 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class TransformerFragmentTest { + + @Test + void validateCorrectBufferSizeIsAccepted() { + final int bufferSize = 50; + final ConfigDef configDef = TransformerFragment.update(new ConfigDef()); + final Map props = new HashMap<>(); + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, bufferSize); + + final TransformerFragment schemaReg = new TransformerFragment(new AbstractConfig(configDef, props)); + assertThat(schemaReg.getTransformerMaxBufferSize()).isEqualTo(bufferSize); + } + + @ParameterizedTest + @CsvSource({ + "21474836471,Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT", + "-1,transformer.max.buffer.size must be larger then 0 and less then 2147483647", + "MAX,Invalid value MAX for configuration transformer.max.buffer.size: Not a number of type INT", + "0,transformer.max.buffer.size must be larger then 0 and less then 2147483647", + "-9000,transformer.max.buffer.size must be larger then 0 and less then 2147483647", + "MTA=,Invalid value MTA= for configuration transformer.max.buffer.size: Not a number of type INT" }) + void validateInvalidBufferSizeThrowsConfigException(final String value, final String expectedMessage) { + final ConfigDef configDef = TransformerFragment.update(new ConfigDef()); + final Map props = new HashMap<>(); + + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, value); + assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props))) + .isInstanceOf(ConfigException.class) + .hasMessage(expectedMessage); + } + +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java index 617dd290a..92c47278d 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java @@ -16,9 +16,8 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -26,9 +25,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -36,6 +33,7 @@ import org.apache.kafka.connect.data.Struct; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; import io.confluent.connect.avro.AvroData; import org.apache.avro.Schema; @@ -57,29 +55,18 @@ final class AvroTransformerTest { private SourceCommonConfig sourceCommonConfig; private AvroTransformer avroTransformer; - private Map config; @BeforeEach void setUp() { avroTransformer = new AvroTransformer(new AvroData(100)); - config = new HashMap<>(); - } - - @Test - void testConfigureValueConverter() { - final String value = "http://localhost:8081"; - when(sourceCommonConfig.getString(SCHEMA_REGISTRY_URL)).thenReturn(value); - avroTransformer.configureValueConverter(config, sourceCommonConfig); - assertThat(config.get(SCHEMA_REGISTRY_URL)).isEqualTo("http://localhost:8081") - .describedAs("The schema registry URL should be correctly set in the config."); } @Test void testReadAvroRecordsInvalidData() { final InputStream inputStream = new ByteArrayInputStream("mock-avro-data".getBytes(StandardCharsets.UTF_8)); - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 0); + final Stream records = avroTransformer.getRecords(() -> inputStream, UNKNOWN_STREAM_LENGTH, + new Context<>("storage-key"), sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).isEmpty(); @@ -95,8 +82,8 @@ void testReadAvroRecords() throws Exception { expected.add("Hello, Kafka Connect S3 Source! object " + i); } - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 0); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), + new Context<>("storage-key"), sourceCommonConfig, 0); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Struct) sv).getString("message")) @@ -112,8 +99,8 @@ void testReadAvroRecordsSkipFew() throws Exception { for (int i = 5; i < 20; i++) { expected.add("Hello, Kafka Connect S3 Source! object " + i); } - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 5); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), + new Context<>("storage-key"), sourceCommonConfig, 5); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Struct) sv).getString("message")) @@ -125,8 +112,8 @@ void testReadAvroRecordsSkipMoreRecordsThanExist() throws Exception { final ByteArrayOutputStream avroData = generateMockAvroData(20); final InputStream inputStream = new ByteArrayInputStream(avroData.toByteArray()); - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 25); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), + new Context<>("storage-key"), sourceCommonConfig, 25); assertThat(records).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java index 80820e13b..3c7eb0edd 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java @@ -17,6 +17,9 @@ package io.aiven.kafka.connect.common.source.input; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -27,11 +30,15 @@ import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; import org.apache.commons.io.function.IOSupplier; +import org.apache.http.util.ByteArrayBuffer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -44,9 +51,13 @@ final class ByteArrayTransformerTest { @Mock private SourceCommonConfig sourceCommonConfig; + private Context context; + @BeforeEach void setUp() { byteArrayTransformer = new ByteArrayTransformer(); + context = new Context<>("storage-key"); + context.setTopic(TEST_TOPIC); } @Test @@ -54,9 +65,9 @@ void testGetRecordsSingleChunk() { final byte[] data = { 1, 2, 3, 4, 5 }; final InputStream inputStream = new ByteArrayInputStream(data); final IOSupplier inputStreamIOSupplier = () -> inputStream; - - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, - sourceCommonConfig, 0); + when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(4096); + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, + (long) data.length, context, sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).hasSize(1); @@ -69,9 +80,39 @@ void testGetRecordsEmptyInputStream() { final IOSupplier inputStreamIOSupplier = () -> inputStream; - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, 0, context, sourceCommonConfig, 0); assertThat(records).hasSize(0); } + + /** + * @param maxBufferSize + * the maximum buffer size + * @param numberOfExpectedRecords + * the number of records the byte array is split into based off the max buffer size + */ + @ParameterizedTest + @CsvSource({ "1,10", "2,5", "3,4", "4,3", "5,2", "6,2", "7,2", "8,2", "9,2", "10,1", "11,1", "12,1" }) + void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int numberOfExpectedRecords) { + final byte[] data = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + final InputStream inputStream = new ByteArrayInputStream(data); + final IOSupplier inputStreamIOSupplier = () -> inputStream; + when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(maxBufferSize); + + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, + (long) data.length, context, sourceCommonConfig, 0); + + final List recs = records.collect(Collectors.toList()); + assertThat(recs).hasSize(numberOfExpectedRecords); + final ByteArrayBuffer processedData = new ByteArrayBuffer(10); + + recs.forEach(rec -> { + final byte[] val = (byte[]) rec.value(); + processedData.append(val, 0, val.length); + }); + assertThat(processedData.buffer()).isEqualTo(data); + // Should only get called once per splitIterator + verify(sourceCommonConfig, times(1)).getTransformerMaxBufferSize(); + } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java index e482fd61c..be2ad6edf 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java @@ -16,7 +16,8 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,6 +36,7 @@ import org.apache.kafka.connect.json.JsonConverter; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; import org.apache.commons.io.function.IOSupplier; import org.junit.jupiter.api.AfterEach; @@ -56,6 +58,7 @@ final class JsonTransformerTest { private IOSupplier inputStreamIOSupplierMock; JsonConverter jsonConverter; + private Context context; @BeforeEach void setUp() { @@ -63,6 +66,9 @@ void setUp() { final Map config = new HashMap<>(); config.put(SCHEMAS_ENABLE, "false"); jsonConverter.configure(config, false); + context = new Context<>("storage-key"); + context.setTopic(TESTTOPIC); + context.setPartition(1); jsonTransformer = new JsonTransformer(jsonConverter); sourceCommonConfig = mock(SourceCommonConfig.class); @@ -74,7 +80,7 @@ void destroy() { } @Test - void testHandleValueDataWithValidJson() { + void testHandleValueDataWithValidJson() throws IOException { final InputStream validJsonInputStream = new ByteArrayInputStream( getJsonRecs(100).getBytes(StandardCharsets.UTF_8)); @@ -83,8 +89,8 @@ void testHandleValueDataWithValidJson() { expected.add("value" + i); } - final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, TESTTOPIC, 1, - sourceCommonConfig, 0); + final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, + UNKNOWN_STREAM_LENGTH, context, sourceCommonConfig, 0); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Map) sv).get("key")) @@ -92,7 +98,7 @@ void testHandleValueDataWithValidJson() { } @Test - void testHandleValueDataWithValidJsonSkipFew() { + void testHandleValueDataWithValidJsonSkipFew() throws IOException { final InputStream validJsonInputStream = new ByteArrayInputStream( getJsonRecs(100).getBytes(StandardCharsets.UTF_8)); @@ -101,8 +107,8 @@ void testHandleValueDataWithValidJsonSkipFew() { expected.add("value" + i); } - final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, TESTTOPIC, 1, - sourceCommonConfig, 25L); + final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, + UNKNOWN_STREAM_LENGTH, context, sourceCommonConfig, 25L); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Map) sv).get("key")) @@ -111,13 +117,13 @@ void testHandleValueDataWithValidJsonSkipFew() { } @Test - void testHandleValueDataWithInvalidJson() { + void testHandleValueDataWithInvalidJson() throws IOException { final InputStream invalidJsonInputStream = new ByteArrayInputStream( "invalid-json".getBytes(StandardCharsets.UTF_8)); final IOSupplier inputStreamIOSupplier = () -> invalidJsonInputStream; - final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, - sourceCommonConfig, 0); + final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, + UNKNOWN_STREAM_LENGTH, context, sourceCommonConfig, 0); assertThat(jsonNodes).isEmpty(); @@ -126,7 +132,8 @@ void testHandleValueDataWithInvalidJson() { @Test void testGetRecordsWithIOException() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0); + context.setPartition(0); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, UNKNOWN_STREAM_LENGTH, context, null, 0); assertThat(resultStream).isEmpty(); } @@ -134,7 +141,8 @@ void testGetRecordsWithIOException() throws IOException { @Test void testCustomSpliteratorWithIOExceptionDuringInitialization() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException during initialization")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0); + context.setPartition(0); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, UNKNOWN_STREAM_LENGTH, context, null, 0); assertThat(resultStream).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java index 2f7a405fe..f4f7ec7dd 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.data.Struct; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.source.task.Context; import io.confluent.connect.avro.AvroData; import org.apache.commons.io.IOUtils; @@ -51,23 +52,24 @@ @ExtendWith(MockitoExtension.class) final class ParquetTransformerTest { private ParquetTransformer parquetTransformer; - + private Context context; @BeforeEach public void setUp() { parquetTransformer = new ParquetTransformer(new AvroData(100)); + context = new Context<>("storage-key"); + context.setTopic("test-topic"); + context.setPartition(0); } @Test - void testHandleValueDataWithZeroBytes() { + void testHandleValueDataWithZeroBytes() throws IOException { final byte[] mockParquetData = new byte[0]; final InputStream inputStream = new ByteArrayInputStream(mockParquetData); final IOSupplier inputStreamIOSupplier = () -> inputStream; final SourceCommonConfig s3SourceConfig = mock(SourceCommonConfig.class); - final String topic = "test-topic"; - final int topicPartition = 0; - final Stream recs = parquetTransformer.getRecords(inputStreamIOSupplier, topic, topicPartition, - s3SourceConfig, 0L); + final Stream recs = parquetTransformer.getRecords(inputStreamIOSupplier, mockParquetData.length, + context, s3SourceConfig, 0L); assertThat(recs).isEmpty(); } @@ -79,14 +81,12 @@ void testGetRecordsWithValidData() throws Exception { final IOSupplier inputStreamIOSupplier = () -> inputStream; final SourceCommonConfig s3SourceConfig = mock(SourceCommonConfig.class); - final String topic = "test-topic"; - final int topicPartition = 0; final List expected = new ArrayList<>(); for (int i = 0; i < 100; i++) { expected.add("name" + i); } final List records = parquetTransformer - .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 0L) + .getRecords(inputStreamIOSupplier, mockParquetData.length, context, s3SourceConfig, 0L) .collect(Collectors.toList()); assertThat(records).extracting(SchemaAndValue::value) @@ -101,16 +101,13 @@ void testGetRecordsWithValidDataSkipFew() throws Exception { final IOSupplier inputStreamIOSupplier = () -> inputStream; final SourceCommonConfig s3SourceConfig = mock(SourceCommonConfig.class); - final String topic = "test-topic"; - final int topicPartition = 0; - final List expected = new ArrayList<>(); for (int i = 25; i < 100; i++) { expected.add("name" + i); } final List records = parquetTransformer - .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 25L) + .getRecords(inputStreamIOSupplier, mockParquetData.length, context, s3SourceConfig, 25L) .collect(Collectors.toList()); assertThat(records).extracting(SchemaAndValue::value) @@ -126,11 +123,8 @@ void testGetRecordsWithInvalidData() { final SourceCommonConfig s3SourceConfig = mock(SourceCommonConfig.class); - final String topic = "test-topic"; - final int topicPartition = 0; - - final Stream records = parquetTransformer.getRecords(inputStreamIOSupplier, topic, - topicPartition, s3SourceConfig, 0L); + final Stream records = parquetTransformer.getRecords(inputStreamIOSupplier, invalidData.length, + context, s3SourceConfig, 0L); assertThat(records).isEmpty(); } @@ -153,10 +147,10 @@ void testIOExceptionCreatingTempFile() { try (var mockStatic = Mockito.mockStatic(File.class)) { mockStatic.when(() -> File.createTempFile(anyString(), anyString())) .thenThrow(new IOException("Test IOException for temp file")); - + context.setPartition(1); final IOSupplier inputStreamSupplier = mock(IOSupplier.class); - final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", - 1, null, 0L); + final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, 0L, context, + null, 0L); assertThat(resultStream).isEmpty(); } @@ -166,10 +160,10 @@ void testIOExceptionCreatingTempFile() { void testIOExceptionDuringDataCopy() throws IOException { try (InputStream inputStreamMock = mock(InputStream.class)) { when(inputStreamMock.read(any(byte[].class))).thenThrow(new IOException("Test IOException during copy")); - + context.setPartition(1); final IOSupplier inputStreamSupplier = () -> inputStreamMock; - final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", - 1, null, 0L); + final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, 0L, context, + null, 0L); assertThat(resultStream).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java index 73b27b01f..b631ff5c1 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.common.source.input; +import static io.aiven.kafka.connect.common.config.OutputFormatFragmentFixture.OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -32,13 +34,16 @@ import java.util.List; import java.util.stream.Stream; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.SchemaAndValue; -import io.aiven.kafka.connect.common.config.CommonConfig; +import io.aiven.kafka.connect.common.config.OutputFormatFragment; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.config.TransformerFragment; +import io.aiven.kafka.connect.common.source.task.Context; import org.apache.commons.io.function.IOSupplier; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -48,20 +53,29 @@ */ class TransformerStreamingTest { + private Context context; + + @BeforeEach + public void setup() { + context = new Context<>("storage-key"); + context.setTopic("topic"); + context.setPartition(1); + } + @ParameterizedTest @MethodSource("testData") - void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData, final AbstractConfig config, - final int expectedCount) throws IOException { + void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData, + final SourceCommonConfig config, final int expectedCount) throws IOException { final IOSupplier ioSupplier = mock(IOSupplier.class); when(ioSupplier.get()).thenThrow(new IOException("Test IOException during initialization")); - final Stream objStream = transformer.getRecords(ioSupplier, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(ioSupplier, UNKNOWN_STREAM_LENGTH, context, config, 0); assertThat(objStream).isEmpty(); } @ParameterizedTest @MethodSource("testData") - void verifyExceptionDuringRead(final Transformer transformer, final byte[] testData, final AbstractConfig config, - final int expectedCount) throws IOException { + void verifyExceptionDuringRead(final Transformer transformer, final byte[] testData, + final SourceCommonConfig config, final int expectedCount) throws IOException { try (InputStream inputStream = mock(InputStream.class)) { when(inputStream.read()).thenThrow(new IOException("Test IOException during read")); when(inputStream.read(any())).thenThrow(new IOException("Test IOException during read")); @@ -72,7 +86,8 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD when(inputStream.readNBytes(anyInt())).thenThrow(new IOException("Test IOException during read")); when(inputStream.readAllBytes()).thenThrow(new IOException("Test IOException during read")); try (CloseTrackingStream stream = new CloseTrackingStream(inputStream)) { - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, context, config, + 0); assertThat(objStream).isEmpty(); assertThat(stream.closeCount).isGreaterThan(0); } @@ -81,10 +96,10 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD @ParameterizedTest @MethodSource("testData") - void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final AbstractConfig config, + void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, context, config, 0); final long count = objStream.count(); assertThat(count).isEqualTo(expectedCount); assertThat(stream.closeCount).isGreaterThan(0); @@ -93,9 +108,10 @@ void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData @ParameterizedTest @MethodSource("testData") void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] testData, - final AbstractConfig config, final int expectedCount) throws IOException { + final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, context, + config, 0); final Iterator iter = objStream.iterator(); long count = 0L; while (iter.hasNext()) { @@ -108,19 +124,23 @@ void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] static Stream testData() throws IOException { final List lst = new ArrayList<>(); + final var props = new HashMap<>(); + props.put(FORMAT_OUTPUT_TYPE_CONFIG.key(), "avro"); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.AVRO), AvroTransformerTest.generateMockAvroData(100).toByteArray(), - new CommonConfig(new ConfigDef(), new HashMap<>()) { + new SourceCommonConfig(OutputFormatFragment.update(new ConfigDef(), null), props) { }, 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.BYTES), - "Hello World".getBytes(StandardCharsets.UTF_8), new CommonConfig(new ConfigDef(), new HashMap<>()) { + "Hello World".getBytes(StandardCharsets.UTF_8), new SourceCommonConfig( + TransformerFragment.update(OutputFormatFragment.update(new ConfigDef(), null)), props) { }, 1)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.JSONL), JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8), - new CommonConfig(new ConfigDef(), new HashMap<>()) { + new SourceCommonConfig(OutputFormatFragment.update(new ConfigDef(), null), props) { }, 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.PARQUET), - ParquetTransformerTest.generateMockParquetData(), new CommonConfig(new ConfigDef(), new HashMap<>()) { + ParquetTransformerTest.generateMockParquetData(), + new SourceCommonConfig(OutputFormatFragment.update(new ConfigDef(), null), props) { }, 100)); return lst.stream(); } diff --git a/s3-source-connector/README.md b/s3-source-connector/README.md index 3c236d4d0..a5952008d 100644 --- a/s3-source-connector/README.md +++ b/s3-source-connector/README.md @@ -201,6 +201,10 @@ List of new configuration parameters: - `aws.sts.role.session.name` - AWS session name for cross-account access role - `aws.sts.role.session.duration` - Session duration for cross-account access role in Seconds. Minimum value - 900. - `aws.sts.config.endpoint` - AWS STS endpoint for cross-account access role. +- `transformer.max.buffer.size` - [Optional] When using the ByteArrayTransformer you can alter the buffer size from 1 up to 2147483647 default is 4096 +- `input.format` - Specify the format of the files being read from S3 supported values are avro, parquet, jsonl, and bytes, bytes is also the default +- `schema.registry.url` [Optional] The url of the schema registry you want to use +- `` ## Configuration diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index b57fe6b72..5f3e0b6d6 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -19,9 +19,9 @@ import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS; import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID; import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index aec5e7a79..83e54de84 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -253,6 +253,20 @@ static List consumeByteMessages(final String topic, final int expectedMe return objects.stream().map(String::new).collect(Collectors.toList()); } + static List consumeRawByteMessages(final String topic, final int expectedMessageCount, + String bootstrapServers) { + final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class, + ByteArrayDeserializer.class); + final List objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), + consumerProperties); + return objects.stream().map(obj -> { + final byte[] byteArray = new byte[obj.length]; + System.arraycopy(obj, 0, byteArray, 0, obj.length); + return byteArray; + }).collect(Collectors.toList()); + + } + static List consumeAvroMessages(final String topic, final int expectedMessageCount, final Duration expectedMaxDuration, final String bootstrapServers, final String schemaRegistryUrl) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class, diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index e2a79defd..9753b6acd 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -19,12 +19,12 @@ import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS; import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.DISTRIBUTION_TYPE; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; @@ -42,6 +42,7 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -211,12 +212,53 @@ void bytesTest(final boolean addPrefix) { .stream() .collect(Collectors.toMap(Function.identity(), s -> 1L)); verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers()); - // add keys we haent processed before + // add keys we haven't processed before offsetKeys.add("s3-object-key-from-bucket"); offsetKeys.add("topic-one/s3-object-key-from-bucket"); verifyOffsetsConsumeableByS3OffsetMgr(connectorConfig, offsetKeys, expectedOffsetRecords); } + @Test + void bytesDefaultBufferTest() { + final int maxBufferSize = 4096; + final var topic = IntegrationBase.getTopic(testInfo); + final DistributionType distributionType; + final String prefixPattern = "topics/{{topic}}/partition={{partition}}/"; + distributionType = DistributionType.PARTITION; + + final Map connectorConfig = getConfig(CONNECTOR_NAME, topic, 1, distributionType, false, null, + prefixPattern, "-"); + + connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); + connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); + + final int byteArraySize = 6000; + final byte[] testData1 = new byte[byteArraySize]; + for (int i = 0; i < byteArraySize; i++) { + testData1[i] = ((Integer) i).byteValue(); + } + final List offsetKeys = new ArrayList<>(); + + offsetKeys.add(writeToS3(topic, testData1, "0", s3Prefix)); + + assertThat(testBucketAccessor.listObjects()).hasSize(1); + + // Poll messages from the Kafka topic and verify the consumed data + final List records = IntegrationBase.consumeRawByteMessages(topic, 2, + connectRunner.getBootstrapServers()); + + assertThat(records.get(0)).hasSize(maxBufferSize); + assertThat(records.get(1)).hasSize(byteArraySize - maxBufferSize); + + assertThat(records.get(0)).isEqualTo(Arrays.copyOfRange(testData1, 0, maxBufferSize)); + assertThat(records.get(1)).isEqualTo(Arrays.copyOfRange(testData1, maxBufferSize, testData1.length)); + + // Verify offset positions + final Map expectedOffsetRecords = offsetKeys.stream() + .collect(Collectors.toMap(Function.identity(), s -> 2L)); + verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers()); + } + @Test void avroTest(final TestInfo testInfo) throws IOException { final var topic = IntegrationBase.getTopic(testInfo); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index ebcffdba5..dae9b6b06 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -25,9 +25,9 @@ import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.common.config.OutputFormatFragment; -import io.aiven.kafka.connect.common.config.SchemaRegistryFragment; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.config.SourceConfigFragment; +import io.aiven.kafka.connect.common.config.TransformerFragment; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; import io.aiven.kafka.connect.iam.AwsStsRole; @@ -56,7 +56,7 @@ public static ConfigDef configDef() { S3ConfigFragment.update(configDef); SourceConfigFragment.update(configDef); FileNameFragment.update(configDef); - SchemaRegistryFragment.update(configDef); + TransformerFragment.update(configDef); OutputFormatFragment.update(configDef, OutputFieldType.VALUE); return configDef; @@ -66,7 +66,7 @@ private void validate() { // s3ConfigFragment is validated in this method as it is created here. // Other Fragments created in the ConfigDef are validated in the parent classes their instances are created in. - // e.g. SourceConfigFragment, FileNameFragment, SchemaRegistryFragment and OutputFormatFragment are all + // e.g. SourceConfigFragment, FileNameFragment, TransformerFragment and OutputFormatFragment are all // validated in SourceCommonConfig. s3ConfigFragment.validate(); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 53e7baa7d..5bf799d30 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -94,6 +94,10 @@ public S3OffsetManagerEntry getOffsetManagerEntry() { return offsetManagerEntry.fromProperties(offsetManagerEntry.getProperties()); // return a defensive copy } + public long getS3ObjectSize() { + return s3Object.size(); + } + public Context getContext() { return new Context<>(context) { }; diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 35056125d..086879731 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -139,8 +139,8 @@ private Stream convert(final S3SourceRecord s3SourceRecord) { lastSeenObjectKey = s3SourceRecord.getObjectKey(); return transformer - .getRecords(sourceClient.getObject(s3SourceRecord.getObjectKey()), s3SourceRecord.getTopic(), - s3SourceRecord.getPartition(), s3SourceConfig, s3SourceRecord.getRecordCount()) + .getRecords(sourceClient.getObject(s3SourceRecord.getObjectKey()), s3SourceRecord.getS3ObjectSize(), + s3SourceRecord.getContext(), s3SourceConfig, s3SourceRecord.getRecordCount()) .map(new Mapper(s3SourceRecord)); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 78f8d00d1..1c832216d 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -18,8 +18,8 @@ import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS; import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.BUCKET; import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.OBJECT_KEY; import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.RECORD_COUNT; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index 3d02a30c5..9df8e9588 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -16,9 +16,9 @@ package io.aiven.kafka.connect.s3.source.config; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index a512a00c0..4a6e25ec6 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -88,6 +88,7 @@ private void mockSourceConfig(final S3SourceConfig s3SourceConfig, final String when(s3SourceConfig.getS3FileNameFragment()).thenReturn(mockFileNameFrag); when(mockFileNameFrag.getFilenameTemplate()).thenReturn(Template.of(filePattern)); when(mockConfig.getTargetTopics()).thenReturn(targetTopic); + when(mockConfig.getTransformerMaxBufferSize()).thenReturn(4096); } @Test @@ -128,6 +129,7 @@ void testIteratorExpectExceptionWhenGetsContextWithNoTopic() throws Exception { sourceApiClient = new AWSV2SourceClient(builder.build(), config); mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(mockConfig.getTransformerMaxBufferSize()).thenReturn(4096); mockSourceConfig(mockConfig, filePattern, 0, 1, null); @@ -160,6 +162,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { sourceApiClient = new AWSV2SourceClient(builder.build(), config); Transformer transformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(mockConfig.getTransformerMaxBufferSize()).thenReturn(4096); mockSourceConfig(mockConfig, filePattern, 0, 1, null);