Skip to content

Commit

Permalink
Remove unused configuration, updates to improve code
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 22, 2025
1 parent 54851fe commit de0c91e
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public static ConfigDef update(final ConfigDef configDef) {
+ "Only some combinations of variables are valid, which currently are:\n"
+ "- `topic`, `partition`, `start_offset`."
+ "There is also `key` only variable {{key}} for grouping by keys",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG);
GROUP_FILE, fileGroupCounter++, ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG);

final String supportedCompressionTypes = CompressionType.names()
.stream()
Expand All @@ -84,8 +83,7 @@ public static ConfigDef update(final ConfigDef configDef) {
ConfigDef.Importance.MEDIUM,
"The compression type used for files put on S3. " + "The supported values are: "
+ supportedCompressionTypes + ".",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG,
GROUP_FILE, fileGroupCounter++, ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG,
FixedSetRecommender.ofSupportedValues(CompressionType.names()));

configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new ConfigDef.Validator() {
Expand All @@ -99,8 +97,7 @@ public void ensureValid(final String name, final Object value) {
}, ConfigDef.Importance.MEDIUM,
"The maximum number of records to put in a single file. " + "Must be a non-negative integer number. "
+ "0 is interpreted as \"unlimited\", which is the default.",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, FILE_MAX_RECORDS);
GROUP_FILE, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_MAX_RECORDS);

configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(),
new TimeZoneValidator(), ConfigDef.Importance.LOW,
Expand All @@ -114,16 +111,6 @@ public void ensureValid(final String name, final Object value) {
"Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The template for file prefix on S3. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic` and `partition` "
+ "and are mandatory to have these in the directory structure."
+ "Example prefix : topics/{{topic}}/partition/{{partition}}/",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);

return configDef;
}

Expand Down Expand Up @@ -197,8 +184,4 @@ public int getMaxRecordsPerFile() {
return cfg.getInt(FILE_MAX_RECORDS);
}

public String getFilePathPrefixTemplateConfig() {
return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ public String getSchemaRegistryUrl() {
public String getTargetTopics() {
return sourceConfigFragment.getTargetTopics();
}
public String getTargetTopicPartitions() {
return sourceConfigFragment.getTargetTopicPartitions();
}

public ErrorsTolerance getErrorsTolerance() {
return sourceConfigFragment.getErrorsTolerance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public final class SourceConfigFragment extends ConfigFragment {
public static final String MAX_POLL_RECORDS = "max.poll.records";
public static final String EXPECTED_MAX_MESSAGE_BYTES = "expected.max.message.bytes";
private static final String GROUP_OFFSET_TOPIC = "OFFSET_TOPIC";
public static final String TARGET_TOPIC_PARTITIONS = "topic.partitions";
public static final String TARGET_TOPICS = "topics";
public static final String ERRORS_TOLERANCE = "errors.tolerance";

Expand Down Expand Up @@ -70,9 +69,6 @@ public static ConfigDef update(final ConfigDef configDef) {

// Offset Storage config group includes target topics
int offsetStorageGroupCounter = 0;
configDef.define(TARGET_TOPIC_PARTITIONS, ConfigDef.Type.STRING, "0", new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "eg : 0,1", GROUP_OFFSET_TOPIC, offsetStorageGroupCounter++,
ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS);
configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC,
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS);
Expand All @@ -93,10 +89,6 @@ public String getTargetTopics() {
return cfg.getString(TARGET_TOPICS);
}

public String getTargetTopicPartitions() {
return cfg.getString(TARGET_TOPIC_PARTITIONS);
}

public int getMaxPollRecords() {
return cfg.getInt(MAX_POLL_RECORDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package io.aiven.kafka.connect.common.source;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
Expand Down Expand Up @@ -94,33 +94,19 @@ public Optional<E> getEntry(final OffsetManagerKey key, final Function<Map<Strin
return data == null ? Optional.empty() : Optional.of(creator.apply(data));
}

/**
* Copies the entry into the offset manager data.
*
* @param entry
* the entry to update.
*/
public void updateCurrentOffsets(final E entry) {
LOGGER.debug("Updating current offsets: {}", entry.getManagerKey().getPartitionMap());
offsets.compute(entry.getManagerKey().getPartitionMap(), (k, v) -> {
if (v == null) {
return new HashMap<>(entry.getProperties());
} else {
v.putAll(entry.getProperties());
return v;
}
});
}

/**
* Gets any offset information stored in the offsetStorageReader and adds to the local offsets Map. This provides a
* performance improvement over when checking if offsets exists individually.
*
* @param partitionMaps
* A Collection of partition maps which identify individual offset entries
* @param offsetManagerKeys
* A Collection of OffsetManagerKey which identify individual offset entries
*/
public void hydrateOffsetManager(final Collection<Map<String, Object>> partitionMaps) {
offsets.putAll(context.offsetStorageReader().offsets(partitionMaps));
public void populateOffsetManager(final Collection<OffsetManager.OffsetManagerKey> offsetManagerKeys) {

offsets.putAll(context.offsetStorageReader()
.offsets(offsetManagerKeys.stream()
.map(OffsetManagerKey::getPartitionMap)
.collect(Collectors.toList())));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,48 @@ public Context(final K storageKey) {
this.storageKey = storageKey;
}

public Optional<String> getTopic() {
/**
* Creates a defensive copy of the Context for use internally by the S3SourceRecord
*
* @param anotherContext
* The Context which needs to be copied
*/
protected Context(final Context<K> anotherContext) {
this.storageKey = anotherContext.storageKey;
this.partition = anotherContext.partition;
this.topic = anotherContext.topic;
this.offset = anotherContext.offset;
}

public final Optional<String> getTopic() {
return Optional.ofNullable(topic);
}

public void setTopic(final String topic) {
public final void setTopic(final String topic) {
this.topic = topic;
}

public Optional<Integer> getPartition() {
public final Optional<Integer> getPartition() {
return Optional.ofNullable(partition);
}

public void setPartition(final Integer partition) {
public final void setPartition(final Integer partition) {
this.partition = partition;
}

public Optional<K> getStorageKey() {
public final Optional<K> getStorageKey() {
return Optional.ofNullable(storageKey);
}

public void setStorageKey(final K storageKey) {
public final void setStorageKey(final K storageKey) {
this.storageKey = storageKey;
}

public Optional<Integer> getOffset() {
public final Optional<Integer> getOffset() {
return Optional.ofNullable(offset);
}

public void setOffset(final Integer offset) {
public final void setOffset(final Integer offset) {
this.offset = offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.aiven.kafka.connect.common.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -80,52 +82,49 @@ void testNewEntryWithoutDataFromContext() {
}

@Test
void testHydrateOffsetManagerWithPartitionMapsNoExistingEntries() {
final List<Map<String, Object>> partitionMaps = new ArrayList<>();
void testPopulateOffsetManagerWithPartitionMapsNoExistingEntries() {
final List<OffsetManager.OffsetManagerKey> partitionMaps = new ArrayList<>();

for (int i = 0; i < 10; i++) {
final Map<String, Object> partitionKey = new HashMap<>(); // NOPMD avoid instantiating objects in loops.
partitionKey.put("segment1", "topic" + i);
partitionKey.put("segment2", String.valueOf(i));
partitionKey.put("segment3", "something else" + i);
partitionMaps.add(partitionKey);
partitionMaps.add(new TestingOffsetManagerEntry("topic" + i, String.valueOf(i), "something else " + i)
.getManagerKey());// NOPMD avoid instantiating objects in loops.
}

when(offsetStorageReader.offsets(eq(partitionMaps))).thenReturn(Map.of());
when(offsetStorageReader.offsets(anyCollection())).thenReturn(Map.of());

offsetManager.hydrateOffsetManager(partitionMaps);
verify(offsetStorageReader, times(1)).offsets(eq(partitionMaps));
offsetManager.populateOffsetManager(partitionMaps);
verify(offsetStorageReader, times(1)).offsets(anyList());

// No Existing entries so we expect nothing to exist and for it to try check the offsets again.
final Optional<TestingOffsetManagerEntry> result = offsetManager.getEntry(() -> partitionMaps.get(0),
TestingOffsetManagerEntry::new);
final Optional<TestingOffsetManagerEntry> result = offsetManager
.getEntry(() -> partitionMaps.get(0).getPartitionMap(), TestingOffsetManagerEntry::new);
assertThat(result).isEmpty();
verify(offsetStorageReader, times(1)).offset(eq(partitionMaps.get(0)));
verify(offsetStorageReader, times(1)).offset(eq(partitionMaps.get(0).getPartitionMap()));

}

@Test
void testHydrateOffsetManagerWithPartitionMapsAllEntriesExist() {
final List<Map<String, Object>> partitionMaps = new ArrayList<>();
void testPopulateOffsetManagerWithPartitionMapsAllEntriesExist() {
final List<OffsetManager.OffsetManagerKey> partitionMaps = new ArrayList<>();
final Map<Map<String, Object>, Map<String, Object>> offsetReaderMap = new HashMap<>();

for (int i = 0; i < 10; i++) {
final Map<String, Object> partitionKey = Map.of("segment1", "topic" + 1, "segment2", String.valueOf(i),
"segment3", "somethingelse" + i); // NOPMD avoid instantiating objects in loops.
partitionMaps.add(partitionKey);
offsetReaderMap.put(partitionKey, Map.of("recordCount", (long) i));
OffsetManager.OffsetManagerKey key = new TestingOffsetManagerEntry("topic" + i, String.valueOf(i),
"something else " + i).getManagerKey();
partitionMaps.add(key);// NOPMD avoid instantiating objects in loops.
offsetReaderMap.put(key.getPartitionMap(), Map.of("recordCount", (long) i));
}

when(offsetStorageReader.offsets(eq(partitionMaps))).thenReturn(offsetReaderMap);
when(offsetStorageReader.offsets(anyList())).thenReturn(offsetReaderMap);

offsetManager.hydrateOffsetManager(partitionMaps);
verify(offsetStorageReader, times(1)).offsets(eq(partitionMaps));
offsetManager.populateOffsetManager(partitionMaps);
verify(offsetStorageReader, times(1)).offsets(anyList());

// No Existing entries so we expect nothing to exist and for it to try check the offsets again.
final Optional<TestingOffsetManagerEntry> result = offsetManager.getEntry(() -> partitionMaps.get(0),
TestingOffsetManagerEntry::new);
final Optional<TestingOffsetManagerEntry> result = offsetManager
.getEntry(() -> partitionMaps.get(0).getPartitionMap(), TestingOffsetManagerEntry::new);
assertThat(result).isPresent();
verify(offsetStorageReader, times(0)).offset(eq(partitionMaps.get(0)));
verify(offsetStorageReader, times(0)).offset(eq(partitionMaps.get(0).getPartitionMap()));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG;
Expand Down Expand Up @@ -123,7 +122,6 @@ private Map<String, String> getConfig(final String topics, final int maxTasks) {
config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint);
config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME);
config.put(AWS_S3_PREFIX_CONFIG, getS3Prefix());
config.put(TARGET_TOPIC_PARTITIONS, "0,1");
config.put(TARGET_TOPICS, topics);
config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.DISTRIBUTION_TYPE;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG;
Expand Down Expand Up @@ -378,7 +377,6 @@ private static Map<String, String> basicS3ConnectorConfig(final boolean addPrefi
if (addPrefix) {
config.put(AWS_S3_PREFIX_CONFIG, s3Prefix);
}
config.put(TARGET_TOPIC_PARTITIONS, "0,1");
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public final class S3OffsetManagerEntry implements OffsetManager.OffsetManagerEn
// TODO make this package private after values in S3SourceTask are no longer needed
public static final String BUCKET = "bucket";
public static final String OBJECT_KEY = "objectKey";
public static final String TOPIC = "topic";
public static final String PARTITION = "partition";
public static final String RECORD_COUNT = "recordCount";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.task.Context;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.S3Object;
Expand All @@ -50,9 +49,9 @@ public S3SourceRecord(final S3SourceRecord s3SourceRecord) {
this.valueData = s3SourceRecord.valueData;
this.context = s3SourceRecord.context;
}
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "offsetManagerEntry is essentially read only")

public void setOffsetManagerEntry(final S3OffsetManagerEntry offsetManagerEntry) {
this.offsetManagerEntry = offsetManagerEntry;
this.offsetManagerEntry = offsetManagerEntry.fromProperties(offsetManagerEntry.getProperties());
}

public long getRecordCount() {
Expand Down Expand Up @@ -96,11 +95,13 @@ public S3OffsetManagerEntry getOffsetManagerEntry() {
}

public Context<String> getContext() {
return defensiveContextCopy(context);
return new Context<>(context) {
};

}
public void setContext(final Context<String> context) {
this.context = defensiveContextCopy(context);
this.context = new Context<>(context) {
};
}

/**
Expand All @@ -125,20 +126,4 @@ public SourceRecord getSourceRecord(final ErrorsTolerance tolerance) {
}
}

/**
* Creates a defensive copy of the Context for use internally by the S3SourceRecord
*
* @param context
* The Context which needs to be copied
* @return The new Context object which has been created from the original context
*/
private Context<String> defensiveContextCopy(final Context<String> context) {
final Context<String> returnValue = new Context<>(
context.getStorageKey().isPresent() ? context.getStorageKey().get() : null);
context.getTopic().ifPresent(returnValue::setTopic);
context.getPartition().ifPresent(returnValue::setPartition);
context.getOffset().ifPresent(returnValue::setOffset);
return returnValue;
}

}
Loading

0 comments on commit de0c91e

Please sign in to comment.