diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java index 94bf12c7..3647ca12 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java @@ -55,9 +55,8 @@ public StreamSpliterator createSpliterator(final IOSupplier inputSt private final DatumReader datumReader = new GenericDatumReader<>(); @Override - protected InputStream inputOpened(final InputStream input) throws IOException { + protected void inputOpened(final InputStream input) throws IOException { dataFileStream = new DataFileStream<>(input, datumReader); - return input; } @Override diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java index 43059cc0..b33cb127 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java @@ -52,9 +52,10 @@ public StreamSpliterator createSpliterator(final IOSupplier inputSt // The max buffer size for the byte array the default is 4096 if not set by the user. final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize(); return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { + @Override - protected InputStream inputOpened(final InputStream input) { - return input; + protected void inputOpened(final InputStream input) { + } @Override @@ -102,9 +103,9 @@ protected void doClose() { } @Override - protected InputStream inputOpened(final InputStream input) throws IOException { - return InputStream.nullInputStream(); + protected void inputOpened(final InputStream input) { } + }; } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index 56eb8e81..7b39552a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -55,9 +55,9 @@ public StreamSpliterator createSpliterator(final IOSupplier inputSt BufferedReader reader; @Override - protected InputStream inputOpened(final InputStream input) throws IOException { + protected void inputOpened(final InputStream input) { reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); - return input; + } @Override diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java index 6a4d5d1b..679d70b3 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java @@ -68,7 +68,7 @@ public StreamSpliterator createSpliterator(final IOSupplier inputSt private File parquetFile; @Override - protected InputStream inputOpened(final InputStream input) throws IOException { + protected void inputOpened(final InputStream input) throws IOException { final String timestamp = String.valueOf(Instant.now().toEpochMilli()); try { @@ -84,7 +84,7 @@ protected InputStream inputOpened(final InputStream input) throws IOException { IOUtils.copy(input, outputStream); // Copy input stream to temporary file } reader = AvroParquetReader.builder(new LocalInputFile(parquetFile.toPath())).build(); - return input; + } @Override diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java index 0bee0218..77156d8a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java @@ -76,7 +76,7 @@ protected abstract static class StreamSpliterator implements Spliterator action) { diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java index 1daeb3c8..cb6df54f 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java @@ -27,6 +27,8 @@ import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; class TransformerFragmentTest { @@ -41,21 +43,22 @@ void validateCorrectBufferSizeIsAccepted() { assertThat(schemaReg.getTransformerMaxBufferSize()).isEqualTo(bufferSize); } - @Test - void validateInvalidBufferSizeThrowsConfigException() { + @ParameterizedTest + @CsvSource({ + "21474836471,Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT", + "-1,transformer.max.buffer.size must be larger then 0 and less then 2147483647", + "MAX,Invalid value MAX for configuration transformer.max.buffer.size: Not a number of type INT", + "0,transformer.max.buffer.size must be larger then 0 and less then 2147483647", + "-9000,transformer.max.buffer.size must be larger then 0 and less then 2147483647", + "MTA=,Invalid value MTA= for configuration transformer.max.buffer.size: Not a number of type INT" }) + void validateInvalidBufferSizeThrowsConfigException(final String value, final String expectedMessage) { final ConfigDef configDef = TransformerFragment.update(new ConfigDef()); final Map props = new HashMap<>(); - // Too small - props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, 0); - assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props))) - .isInstanceOf(ConfigException.class); - // Too large - props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, Integer.MAX_VALUE + "1"); + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, value); assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props))) .isInstanceOf(ConfigException.class) - .hasMessage( - "Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT"); + .hasMessage(expectedMessage); } }