diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java index 805e37f2..6e1e8319 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java @@ -53,21 +53,8 @@ public class OffsetManager> { * the context for this instance to use. */ public OffsetManager(final SourceTaskContext context) { - this(context, new ConcurrentHashMap<>()); - } - - /** - * Package private for testing. - * - * @param context - * the context for this instance to use. - * @param offsets - * the offsets - */ - protected OffsetManager(final SourceTaskContext context, - final ConcurrentMap, Map> offsets) { this.context = context; - this.offsets = offsets; + this.offsets = new ConcurrentHashMap<>(); } /** diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 6fd037b1..53e7baa7 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -71,11 +71,11 @@ public void setValueData(final SchemaAndValue valueData) { } public String getTopic() { - return context.getTopic().isPresent() ? context.getTopic().get() : null; + return context.getTopic().orElse(null); } public Integer getPartition() { - return context.getPartition().isPresent() ? context.getPartition().get() : null; + return context.getPartition().orElse(null); } public String getObjectKey() {