diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java new file mode 100644 index 000000000..0c358cc96 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.connect.sink.SinkRecord; + +public class GroupedSinkRecord { + + private int numberOfRecords; + final private List sinkRecords; + final private String filename; + final private long recordCreationDate = System.currentTimeMillis(); + + public GroupedSinkRecord(final String filename) { + this.filename = filename; + sinkRecords = new ArrayList<>(); + numberOfRecords = 0; + } + + public GroupedSinkRecord(final String filename, final List sinkRecords) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(sinkRecords); + numberOfRecords = sinkRecords.size(); + } + public GroupedSinkRecord(final String filename, final SinkRecord sinkRecord) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(); + this.sinkRecords.add(sinkRecord); + numberOfRecords = 1; + } + + public void addSinkRecord(final SinkRecord sinkRecord) { + this.sinkRecords.add(sinkRecord); + this.numberOfRecords++; + } + + public List getSinkRecords() { + // Ensure access to the Sink Records can only be changed through the apis and not accidentally by another + // process. + return Collections.unmodifiableList(sinkRecords); + } + + public void removeSinkRecords(final List sinkRecords) { + this.sinkRecords.removeAll(sinkRecords); + } + + public int getNumberOfRecords() { + return numberOfRecords; + } + + public String getFilename() { + return filename; + } + + public long getRecordCreationDate() { + return recordCreationDate; + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java index 43ce6c2f9..b2402385a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java @@ -103,6 +103,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One entry per file, so the entire file can be removed to reduce memory overhead. + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java index b9af899e0..5ba409b81 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java @@ -90,6 +90,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One record per file, so remove the entry to reduce memory + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java index 1e0e6c188..2126e0a66 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java @@ -38,10 +38,18 @@ public interface RecordGrouper { */ void clear(); + /** + * Clear processed records from memory + * + * @param records + * all records already processed to Sink + */ + void clearProcessedRecords(String identifier, List records); + /** * Get all records associated with files, grouped by the file name. * - * @return map of records assotiated with files + * @return map of records associated with files */ Map> records(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java index 55e097633..872ce00d9 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java @@ -17,13 +17,13 @@ package io.aiven.kafka.connect.common.grouper; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; @@ -34,6 +34,8 @@ import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter; +import org.apache.commons.lang3.tuple.Pair; + public class TopicPartitionKeyRecordGrouper implements RecordGrouper { private static final Map TIMESTAMP_FORMATTERS = Map.of("yyyy", @@ -42,13 +44,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { private final Template filenameTemplate; - private final Map currentHeadRecords = new HashMap<>(); + private final Map> currentHeadRecords = new HashMap<>(); - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final Function> setTimestampBasedOnRecord; - private final Rotator> rotator; + private final Rotator rotator; TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile, final TimestampSource tsSource) { @@ -64,7 +66,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -73,7 +75,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new GroupedSinkRecord(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { @@ -81,7 +83,8 @@ protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record); + final Pair currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, + ignored -> Pair.of(record.kafkaOffset(), record.kafkaPartition())); String objectKey = generateObjectKey(tpk, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(objectKey))) { // Create new file using this record as the head record. @@ -102,14 +105,14 @@ private String recordKey(final SinkRecord record) { return key; } - public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord, + public String generateObjectKey(final TopicPartitionKey tpk, final Pair headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getLeft()) + : Long.toString(headRecord.getLeft()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getRight()) + : Long.toString(headRecord.getRight()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic) @@ -123,8 +126,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he protected String generateNewRecordKey(final SinkRecord record) { final var key = recordKey(record); final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - currentHeadRecords.put(tpk, record); - return generateObjectKey(tpk, record, record); + currentHeadRecords.put(tpk, Pair.of(record.kafkaOffset(), record.kafkaPartition())); + return generateObjectKey(tpk, Pair.of(record.kafkaOffset(), record.kafkaPartition()), record); } @Override @@ -133,9 +136,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(GroupedSinkRecord::getFilename, GroupedSinkRecord::getSinkRecords))); } public static class TopicPartitionKey { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java index 5a080e0a9..6fb22e7af 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java @@ -17,13 +17,13 @@ package io.aiven.kafka.connect.common.grouper; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; @@ -33,6 +33,8 @@ import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter; +import org.apache.commons.lang3.tuple.Pair; + /** * A {@link RecordGrouper} that groups records by topic and partition. * @@ -50,14 +52,14 @@ class TopicPartitionRecordGrouper implements RecordGrouper { DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH")); private final Template filenameTemplate; + // Offsets are a Long and Partitions are an Integer + private final Map> currentHeadRecords = new HashMap<>(); - private final Map currentHeadRecords = new HashMap<>(); - - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final Function> setTimestampBasedOnRecord; - private final Rotator> rotator; + private final Rotator rotator; /** * A constructor. @@ -83,7 +85,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -92,28 +94,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new GroupedSinkRecord(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record); + final Pair currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, + ignored -> Pair.of(record.kafkaOffset(), record.kafkaPartition())); String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(recordKey))) { // Create new file using this record as the head record. recordKey = generateNewRecordKey(record); } + return recordKey; } - private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord, + private String generateRecordKey(final TopicPartition topicPartition, final Pair headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getLeft()) + : Long.toString(headRecord.getLeft()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getRight()) + : Long.toString(headRecord.getRight()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic) @@ -125,8 +129,8 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink protected String generateNewRecordKey(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - currentHeadRecords.put(topicPartition, record); - return generateRecordKey(topicPartition, record, record); + currentHeadRecords.put(topicPartition, Pair.of(record.kafkaOffset(), record.kafkaPartition())); + return generateRecordKey(topicPartition, Pair.of(record.kafkaOffset(), record.kafkaPartition()), record); } @Override @@ -135,9 +139,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(GroupedSinkRecord::getFilename, GroupedSinkRecord::getSinkRecords))); } } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 6dc7560e2..8ba4f9d38 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -134,46 +133,25 @@ public void put(final Collection records) { LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); if (!isKeyRecordGrouper) { - recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + recordGrouper.records().forEach(this::writeToS3); } } /** - * Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key + * Flush is used alongside the KeyRecordGroupers to initiate and complete file writes to S3. When not using a key * record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and - * roll over the files/ + * roll over the files if any records remain in the record grouper for completion. * * @param offsets * the latest offset sent to put and that is now ready to be flushed. */ @Override public void flush(final Map offsets) { - if (isKeyRecordGrouper) { - try { - recordGrouper.records().forEach(this::flushToS3); - } finally { - recordGrouper.clear(); - } - } else { - // On Flush Get Active writers - final Collection activeWriters = writers.values(); - // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. + try { + recordGrouper.records().forEach(this::flushToS3); + } finally { recordGrouper.clear(); - // Close - activeWriters.forEach(writer -> { - try { - // Close active writers && remove from writers Map - // Calling close will write anything in the buffer before closing and complete the S3 multi part - // upload - writer.close(); - // Remove once closed - writers.remove(writer); - } catch (IOException e) { - throw new ConnectException(e); - } - }); } - } /** @@ -212,19 +190,20 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin * @param records * all records in this record grouping, including those already written to S3 */ - private void writeToS3(final String filename, final List records, - final Collection recordToBeWritten) { + private void writeToS3(final String filename, final List records) { + // If no new records are supplied in this put operation return immediately + if (records.isEmpty()) { + return; + } final SinkRecord sinkRecord = records.get(0); - // This writer is being left open until a flush occurs. - final OutputWriter writer; // NOPMD CloseResource + // Record Grouper returns all records for that filename, all we want is the new batch of records to be added + // to the multi part upload. try { - writer = getOutputWriter(filename, sinkRecord); - // Record Grouper returns all records for that filename, all we want is the new batch of records to be added - // to the multi part upload. - writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); - + // This writer is being left open until a flush occurs. + getOutputWriter(filename, sinkRecord).writeRecords(records); + recordGrouper.clearProcessedRecords(filename, records); } catch (IOException e) { - throw new ConnectException(e); + LOGGER.warn("Unable to write record, will retry on next put or flush operation.", e); } } @@ -239,7 +218,8 @@ private void writeToS3(final String filename, final List records, * all records in this record grouping, including those already written to S3 */ private void flushToS3(final String filename, final List records) { - final SinkRecord sinkRecord = records.get(0); + + final SinkRecord sinkRecord = records.isEmpty() ? null : records.get(0); try (var writer = getOutputWriter(filename, sinkRecord)) { // For Key based files Record Grouper returns only one record for that filename // to the multi part upload.