Skip to content

Commit

Permalink
Update Transformer interafce
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 28, 2025
1 parent 4240f6c commit 376255d
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputSt
private final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
protected void inputOpened(final InputStream input) throws IOException {
dataFileStream = new DataFileStream<>(input, datumReader);
return input;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputSt
// The max buffer size for the byte array the default is 4096 if not set by the user.
final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize();
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {

@Override
protected InputStream inputOpened(final InputStream input) {
return input;
protected void inputOpened(final InputStream input) {

}

@Override
Expand Down Expand Up @@ -102,9 +103,9 @@ protected void doClose() {
}

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
return InputStream.nullInputStream();
protected void inputOpened(final InputStream input) {
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputSt
BufferedReader reader;

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
protected void inputOpened(final InputStream input) {
reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
return input;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputSt
private File parquetFile;

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
protected void inputOpened(final InputStream input) throws IOException {
final String timestamp = String.valueOf(Instant.now().toEpochMilli());

try {
Expand All @@ -84,7 +84,7 @@ protected InputStream inputOpened(final InputStream input) throws IOException {
IOUtils.copy(input, outputStream); // Copy input stream to temporary file
}
reader = AvroParquetReader.<GenericRecord>builder(new LocalInputFile(parquetFile.toPath())).build();
return input;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected abstract static class StreamSpliterator implements Spliterator<SchemaA
*/
protected final Logger logger;
/**
* The input stream. Will be null until {@link #inputOpened} has completed. May be used for reading but should
* * The input stream. Will be null until {@link #inputOpened} has completed. May be used for reading but should
* not be closed or otherwise made unreadable.
*/
protected InputStream inputStream;
Expand Down Expand Up @@ -131,15 +131,16 @@ public final void close() {
* Allows modification of input stream. Called immediatly after the input stream is opened. Implementations may
* modify the type of input stream by wrapping it with a specific implementation, or may create Readers from the
* input stream. The modified input stream must be returned. If a Reader or similar class is created from the
* input stream the input stream must be returned.
* input stream the input stream must be returned. The input stream will be null until {@link #inputOpened} has
* completed. The implementation of the interface is responsible for closing any newly constructed readers or
* input streams in the doClose() method.
*
* @param input
* the input stream that was just opened.
* @return the input stream or modified input stream.
* @throws IOException
* on IO error.
*/
abstract protected InputStream inputOpened(InputStream input) throws IOException;
abstract protected void inputOpened(InputStream input) throws IOException;

@Override
public final boolean tryAdvance(final Consumer<? super SchemaAndValue> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.kafka.common.config.ConfigException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

class TransformerFragmentTest {

Expand All @@ -41,21 +43,22 @@ void validateCorrectBufferSizeIsAccepted() {
assertThat(schemaReg.getTransformerMaxBufferSize()).isEqualTo(bufferSize);
}

@Test
void validateInvalidBufferSizeThrowsConfigException() {
@ParameterizedTest
@CsvSource({
"21474836471,Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT",
"-1,transformer.max.buffer.size must be larger then 0 and less then 2147483647",
"MAX,Invalid value MAX for configuration transformer.max.buffer.size: Not a number of type INT",
"0,transformer.max.buffer.size must be larger then 0 and less then 2147483647",
"-9000,transformer.max.buffer.size must be larger then 0 and less then 2147483647",
"MTA=,Invalid value MTA= for configuration transformer.max.buffer.size: Not a number of type INT" })
void validateInvalidBufferSizeThrowsConfigException(final String value, final String expectedMessage) {
final ConfigDef configDef = TransformerFragment.update(new ConfigDef());
final Map<String, Object> props = new HashMap<>();
// Too small
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, 0);

assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props)))
.isInstanceOf(ConfigException.class);
// Too large
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, Integer.MAX_VALUE + "1");
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, value);
assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props)))
.isInstanceOf(ConfigException.class)
.hasMessage(
"Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT");
.hasMessage(expectedMessage);
}

}

0 comments on commit 376255d

Please sign in to comment.