Skip to content

Commit

Permalink
Allow users to vary the size of the byteArrayBuffer in source connect…
Browse files Browse the repository at this point in the history
…ors (#386)

This allows users to decide how large they wish to chunk the byte stream
in the source connectors.
One change was to use the SourceCommonConfig instead of the
abstractConfig in the transformers so that the new calls to get the
maxbytebuffer would be available.

It also adds the stream length to the Transformer which should lead to
improvements specifically in the ByteArrayTransformer.

---------

Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven authored Jan 29, 2025
1 parent b3e1312 commit 1d74693
Show file tree
Hide file tree
Showing 24 changed files with 422 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

public class SourceCommonConfig extends CommonConfig {

private final SchemaRegistryFragment schemaRegistryFragment;
private final TransformerFragment transformerFragment;
private final SourceConfigFragment sourceConfigFragment;
private final FileNameFragment fileNameFragment;
private final OutputFormatFragment outputFormatFragment;

public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
super(definition, originals);
// Construct Fragments
schemaRegistryFragment = new SchemaRegistryFragment(this);
transformerFragment = new TransformerFragment(this);
sourceConfigFragment = new SourceConfigFragment(this);
fileNameFragment = new FileNameFragment(this);
outputFormatFragment = new OutputFormatFragment(this);
Expand All @@ -45,18 +45,18 @@ public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
}

private void validate() {
schemaRegistryFragment.validate();
transformerFragment.validate();
sourceConfigFragment.validate();
fileNameFragment.validate();
outputFormatFragment.validate();
}

public InputFormat getInputFormat() {
return schemaRegistryFragment.getInputFormat();
return transformerFragment.getInputFormat();
}

public String getSchemaRegistryUrl() {
return schemaRegistryFragment.getSchemaRegistryUrl();
return transformerFragment.getSchemaRegistryUrl();
}

public String getTargetTopics() {
Expand All @@ -76,7 +76,11 @@ public int getMaxPollRecords() {
}

public Transformer getTransformer() {
return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
return TransformerFactory.getTransformer(transformerFragment.getInputFormat());
}

public int getTransformerMaxBufferSize() {
return transformerFragment.getTransformerMaxBufferSize();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,48 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import io.aiven.kafka.connect.common.source.input.InputFormat;

public final class SchemaRegistryFragment extends ConfigFragment {
private static final String SCHEMAREGISTRY_GROUP = "Schema registry group";
public final class TransformerFragment extends ConfigFragment {
private static final String TRANSFORMER_GROUP = "Transformer group";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url";
public static final String AVRO_VALUE_SERIALIZER = "value.serializer";
public static final String INPUT_FORMAT_KEY = "input.format";
public static final String SCHEMAS_ENABLE = "schemas.enable";
public static final String TRANSFORMER_MAX_BUFFER_SIZE = "transformer.max.buffer.size";
private static final int DEFAULT_MAX_BUFFER_SIZE = 4096;

/**
* Construct the ConfigFragment..
*
* @param cfg
* the configuration that this fragment is associated with.
*/
public SchemaRegistryFragment(final AbstractConfig cfg) {
public TransformerFragment(final AbstractConfig cfg) {
super(cfg);
}

public static ConfigDef update(final ConfigDef configDef) {
int srCounter = 0;
int transformerCounter = 0;
configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++,
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, transformerCounter++,
ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL);
configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL",
SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP,
transformerCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(),
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Input format of messages read from source avro/json/parquet/bytes", TRANSFORMER_GROUP,
transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, DEFAULT_MAX_BUFFER_SIZE,
new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM,
"Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, transformerCounter++,
ConfigDef.Width.NONE, INPUT_FORMAT_KEY);

configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM,
"Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Avro value serializer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER);
return configDef;
Expand All @@ -73,4 +79,21 @@ public Class<?> getAvroValueSerializer() {
return cfg.getClass(AVRO_VALUE_SERIALIZER);
}

public int getTransformerMaxBufferSize() {
return cfg.getInt(TRANSFORMER_MAX_BUFFER_SIZE);
}

private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {

// ConfigDef will throw an error if this is not an int that is supplied
if ((int) value <= 0) {
throw new ConfigException(
String.format("%s must be larger then 0 and less then %s", name, Integer.MAX_VALUE));
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.task.Context;

import io.confluent.connect.avro.AvroData;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -49,21 +48,15 @@ public class AvroTransformer extends Transformer {
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final Integer topicPartition, final AbstractConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final Context<?> context, final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
private DataFileStream<GenericRecord> dataFileStream;
private final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
protected void inputOpened(final InputStream input) throws IOException {
dataFileStream = new DataFileStream<>(input, datumReader);
return input;
}

@Override
Expand Down Expand Up @@ -91,7 +84,7 @@ protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
final SourceCommonConfig sourceConfig) {
return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA,
((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,42 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.task.Context;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* ByteArrayTransformer chunks an entire object into a maximum size specified by the
* {@link io.aiven.kafka.connect.common.config.TransformerFragment#TRANSFORMER_MAX_BUFFER_SIZE} configuration option.
* This will split large files into multiple records and each record will have the same key.
*/
public class ByteArrayTransformer extends Transformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class);

private static final int MAX_BUFFER_SIZE = 4096;

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
// For byte array transformations, ByteArrayConverter is the converter which is the default config.
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final Integer topicPartition, final AbstractConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final Context<?> context, final SourceCommonConfig sourceConfig) {
if (streamLength == 0) {
LOGGER.warn(
"Object sent for processing has an invalid streamLength of {}, object is empty returning an empty spliterator.",
streamLength);
return emptySpliterator(inputStreamIOSupplier);
}
// The max buffer size for the byte array the default is 4096 if not set by the user.
final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize();
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {

@Override
protected InputStream inputOpened(final InputStream input) {
return input;
protected void inputOpened(final InputStream input) {

}

@Override
Expand All @@ -57,18 +65,16 @@ protected void doClose() {

@Override
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
final byte[] buffer = new byte[MAX_BUFFER_SIZE];

try {
final int bytesRead = IOUtils.read(inputStream, buffer);
if (bytesRead == 0) {
return false;
final byte[] buffer = new byte[maxBufferSize];
final byte[] chunk = Arrays.copyOf(buffer, IOUtils.read(inputStream, buffer));
if (chunk.length > 0) {
action.accept(new SchemaAndValue(null, chunk));
return true;
}
if (bytesRead < MAX_BUFFER_SIZE) {
action.accept(new SchemaAndValue(null, Arrays.copyOf(buffer, bytesRead)));
} else {
action.accept(new SchemaAndValue(null, buffer));
}
return true;

return false;
} catch (IOException e) {
LOGGER.error("Error trying to advance inputStream: {}", e.getMessage(), e);
return false;
Expand All @@ -77,9 +83,35 @@ protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
};
}

/**
* This method returns an empty spliterator when an empty input stream is supplied to be split
*
* @param inputStreamIOSupplier
* The empty input stream that was supplied
* @return an Empty spliterator to return to the calling method.
*/
private static StreamSpliterator emptySpliterator(final IOSupplier<InputStream> inputStreamIOSupplier) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
return false;
}

@Override
protected void doClose() {
// nothing to do
}

@Override
protected void inputOpened(final InputStream input) {
}

};
}

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
final SourceCommonConfig sourceConfig) {
return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.task.Context;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -48,19 +49,15 @@ public class JsonTransformer extends Transformer {
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final Integer topicPartition, final AbstractConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final Context<?> context, final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
BufferedReader reader;

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
protected void inputOpened(final InputStream input) {
reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
return input;

}

@Override
Expand All @@ -87,7 +84,9 @@ public boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
}
}
line = line.trim();
action.accept(jsonConverter.toConnectData(topic, line.getBytes(StandardCharsets.UTF_8)));
// toConnectData does not actually use topic in the conversion so its fine if it is null.
action.accept(jsonConverter.toConnectData(context.getTopic().orElse(null),
line.getBytes(StandardCharsets.UTF_8)));
return true;
} catch (IOException e) {
LOGGER.error("Error reading input stream: {}", e.getMessage(), e);
Expand All @@ -99,7 +98,7 @@ public boolean doAdvance(final Consumer<? super SchemaAndValue> action) {

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
final SourceCommonConfig sourceConfig) {
return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
}
}
Loading

0 comments on commit 1d74693

Please sign in to comment.