From c5ffdc221d1e9ae3331daa252840b22f8c6822f7 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Wed, 23 Oct 2024 18:31:15 +0100 Subject: [PATCH 1/3] This update initiates the multipart upload as soon as a record begins, and closes the file on flush. Signed-off-by: Aindriu Lavelle --- .../io/aiven/kafka/connect/s3/S3SinkTask.java | 116 ++++++++++++++---- .../kafka/connect/s3/S3SinkTaskTest.java | 89 ++++++++++++-- 2 files changed, 176 insertions(+), 29 deletions(-) diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 189655569..1bce78a22 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -25,9 +25,11 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -55,10 +57,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("PMD.ExcessiveImports") +@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.TooManyMethods" }) public final class S3SinkTask extends SinkTask { - private static final Logger LOGGER = LoggerFactory.getLogger(AivenKafkaConnectS3SinkConnector.class); + private static final Logger LOGGER = LoggerFactory.getLogger(S3SinkTask.class); private RecordGrouper recordGrouper; @@ -66,6 +68,8 @@ public final class S3SinkTask extends SinkTask { private AmazonS3 s3Client; + private Map writers; + AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); @SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect @@ -78,6 +82,7 @@ public void start(final Map props) { Objects.requireNonNull(props, "props hasn't been set"); config = new S3SinkConfig(props); s3Client = createAmazonS3Client(config); + writers = new HashMap<>(); try { recordGrouper = RecordGrouperFactory.newRecordGrouper(config); } catch (final Exception e) { // NOPMD AvoidCatchingGenericException @@ -112,39 +117,104 @@ public void put(final Collection records) { Objects.requireNonNull(records, "records cannot be null"); LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); + + recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + } + /** + * Flush is used to roll over file and complete the S3 Mutli part upload. + * + * @param offsets + */ @Override public void flush(final Map offsets) { - try { - recordGrouper.records().forEach(this::flushFile); - } finally { - recordGrouper.clear(); - } + // On Flush Get Active writers + final Collection activeWriters = writers.values(); + // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. + recordGrouper.clear(); + // Close + activeWriters.forEach(writer -> { + try { + // Close active writers && remove from writers Map + // Calling close will write anything in the buffer before closing and complete the S3 multi part upload + writer.close(); + // Remove once closed + writers.remove(writer); + } catch (IOException e) { + throw new ConnectException(e); + } + }); + } - private void flushFile(final String filename, final List records) { - Objects.requireNonNull(records, "records cannot be null"); - if (records.isEmpty()) { - return; + /** + * getOutputWriter is used to check if an existing compatible OutputWriter exists and if not create one and return + * it to the caller. + * + * @param filename + * used to write to S3 + * @param sinkRecord + * a sinkRecord used to create a new S3OutputStream + * @return correct OutputWriter for writing a particular record to S3 + */ + private OutputWriter getOutputWriter(final String filename, final SinkRecord sinkRecord) { + final String fileNameTemplate = getFileNameTemplate(filename, sinkRecord); + + if (writers.get(fileNameTemplate) == null) { + final var out = newStreamFor(filename, sinkRecord); + try { + writers.put(fileNameTemplate, + OutputWriter.builder() + .withCompressionType(config.getCompressionType()) + .withExternalProperties(config.originalsStrings()) + .withOutputFields(config.getOutputFields()) + .withEnvelopeEnabled(config.envelopeEnabled()) + .build(out, config.getFormatType())); + } catch (IOException e) { + throw new ConnectException(e); + } } + return writers.get(fileNameTemplate); + } + + /** + * + * @param filename + * the name of the file in S3 to be written to + * @param records + * all records in this record grouping, including those already written to S3 + * @param recordToBeWritten + * new records from put() which are to be written to S3 + */ + private void writeToS3(final String filename, final List records, + final Collection recordToBeWritten) { + final SinkRecord sinkRecord = records.get(0); - try (var out = newStreamFor(filename, sinkRecord); - var outputWriter = OutputWriter.builder() - .withCompressionType(config.getCompressionType()) - .withExternalProperties(config.originalsStrings()) - .withOutputFields(config.getOutputFields()) - .withEnvelopeEnabled(config.envelopeEnabled()) - .build(out, config.getFormatType())) { - outputWriter.writeRecords(records); - } catch (final IOException e) { + // This writer is being left open until a flush occurs. + final OutputWriter writer; // NOPMD CloseResource + try { + writer = getOutputWriter(filename, sinkRecord); + // Record Grouper returns all records for that filename, all we want is the new batch of records to be added + // to the multi part upload. + writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); + } catch (IOException e) { throw new ConnectException(e); } + } @Override public void stop() { + writers.forEach((k, v) -> { + try { + v.close(); + } catch (IOException e) { + throw new ConnectException(e); + } + }); s3Client.shutdown(); + LOGGER.info("Stop S3 Sink Task"); } @@ -154,11 +224,15 @@ public String version() { } private OutputStream newStreamFor(final String filename, final SinkRecord record) { - final var fullKey = config.usesFileNameTemplate() ? filename : oldFullKey(record); + final var fullKey = getFileNameTemplate(filename, record); return new S3OutputStream(config.getAwsS3BucketName(), fullKey, config.getAwsS3PartSize(), s3Client, config.getServerSideEncryptionAlgorithmName()); } + private String getFileNameTemplate(final String filename, final SinkRecord record) { + return config.usesFileNameTemplate() ? filename : oldFullKey(record); + } + private EndpointConfiguration newEndpointConfiguration(final S3SinkConfig config) { if (Objects.isNull(config.getAwsS3EndPoint())) { return null; diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java index 7e90b3f87..a49dc76be 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java @@ -31,8 +31,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import io.aiven.kafka.connect.common.config.OutputFormatFragmentFixture.OutputFormatArgs; import java.io.BufferedReader; import java.io.IOException; @@ -444,9 +446,7 @@ void failedForStringValuesByDefault() { ); - task.put(records); - - assertThatThrownBy(() -> task.flush(null)).isInstanceOf(ConnectException.class) + assertThatThrownBy(() -> task.put(records)).isInstanceOf(ConnectException.class) .hasMessage("Record value schema type must be BYTES, STRING given"); } @@ -514,9 +514,7 @@ void failedForStructValuesByDefault() { createRecordWithStructValueSchema("topic0", 1, "key1", "name1", 20, 1001), createRecordWithStructValueSchema("topic1", 0, "key2", "name2", 30, 1002)); - task.put(records); - - assertThatThrownBy(() -> task.flush(null)).isInstanceOf(ConnectException.class) + assertThatThrownBy(() -> task.put(records)).isInstanceOf(ConnectException.class) .hasMessage("Record value schema type must be BYTES, STRUCT given"); } @@ -702,8 +700,8 @@ void supportUnwrappedJsonEnvelopeForStructAndClassicJson() throws IOException { void requestCredentialProviderFromFactoryOnStart() { final S3SinkTask task = new S3SinkTask(); - final AwsCredentialProviderFactory mockedFactory = Mockito.mock(AwsCredentialProviderFactory.class); - final AWSCredentialsProvider provider = Mockito.mock(AWSCredentialsProvider.class); + final AwsCredentialProviderFactory mockedFactory = mock(AwsCredentialProviderFactory.class); + final AWSCredentialsProvider provider = mock(AWSCredentialsProvider.class); task.credentialFactory = mockedFactory; Mockito.when(mockedFactory.getProvider(any(S3ConfigFragment.class))).thenReturn(provider); @@ -713,6 +711,81 @@ void requestCredentialProviderFromFactoryOnStart() { verify(mockedFactory, Mockito.times(1)).getProvider(any(S3ConfigFragment.class)); } + @Test + void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { + final String compression = "none"; + properties.put(S3SinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + properties.put(OutputFormatArgs.FORMAT_OUTPUT_FIELDS_CONFIG.key(), "value"); + properties.put(OutputFormatArgs.FORMAT_OUTPUT_ENVELOPE_CONFIG.key(), "false"); + properties.put(OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG.key(), "json"); + properties.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "prefix-"); + + final S3SinkTask task = new S3SinkTask(); + task.start(properties); + int timestamp = 1000; + int offset1 = 10; + int offset2 = 20; + int offset3 = 30; + final List> allRecords = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + allRecords.add( + List.of(createRecordWithStructValueSchema("topic0", 0, "key0", "name0", offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name1", offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name2", offset3++, timestamp++))); + } + final TopicPartition tp00 = new TopicPartition("topic0", 0); + final TopicPartition tp01 = new TopicPartition("topic0", 1); + final TopicPartition tp10 = new TopicPartition("topic1", 0); + final Collection tps = List.of(tp00, tp01, tp10); + task.open(tps); + + allRecords.forEach(task::put); + + final Map offsets = new HashMap<>(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + + final CompressionType compressionType = CompressionType.forName(compression); + + List expectedBlobs = Lists.newArrayList( + "prefix-topic0-0-00000000000000000010" + compressionType.extension(), + "prefix-topic0-1-00000000000000000020" + compressionType.extension(), + "prefix-topic1-0-00000000000000000030" + compressionType.extension()); + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + + assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000010", compression)) + .containsExactly("[", "{\"name\":\"name0\"},", "{\"name\":\"name0\"},", "{\"name\":\"name0\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000020", compression)) + .containsExactly("[", "{\"name\":\"name1\"},", "{\"name\":\"name1\"},", "{\"name\":\"name1\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000030", compression)) + .containsExactly("[", "{\"name\":\"name2\"},", "{\"name\":\"name2\"},", "{\"name\":\"name2\"}", "]"); + // Reset and send another batch of records to S3 + allRecords.clear(); + for (int i = 0; i < 3; i++) { + allRecords.add( + List.of(createRecordWithStructValueSchema("topic0", 0, "key0", "name0", offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name1", offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name2", offset3++, timestamp++))); + } + allRecords.forEach(task::put); + offsets.clear(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + expectedBlobs.clear(); + expectedBlobs = Lists.newArrayList("prefix-topic0-0-00000000000000000010" + compressionType.extension(), + "prefix-topic0-1-00000000000000000020" + compressionType.extension(), + "prefix-topic1-0-00000000000000000030" + compressionType.extension(), + "prefix-topic0-0-00000000000000000013" + compressionType.extension(), + "prefix-topic0-1-00000000000000000023" + compressionType.extension(), + "prefix-topic1-0-00000000000000000033" + compressionType.extension()); + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + + } + private SinkRecord createRecordWithStringValueSchema(final String topic, final int partition, final String key, final String value, final int offset, final long timestamp) { return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, offset, From 6ae469d29c5bbb614849534dc5ecad609344d935 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Thu, 24 Oct 2024 09:24:04 +0100 Subject: [PATCH 2/3] Add compatibility for compact records to update only on flush. While allowing changelog records to initiate multipart upload. Signed-off-by: Aindriu Lavelle --- .../kafka/connect/AvroIntegrationTest.java | 1 + .../connect/AvroParquetIntegrationTest.java | 1 + .../aiven/kafka/connect/IntegrationTest.java | 1 + .../kafka/connect/ParquetIntegrationTest.java | 1 + .../io/aiven/kafka/connect/s3/S3SinkTask.java | 115 +++++++++++++----- .../kafka/connect/s3/S3SinkTaskTest.java | 83 +++++++++++++ 6 files changed, 170 insertions(+), 32 deletions(-) diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java index 2c14c47c6..086acb673 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java @@ -281,6 +281,7 @@ private KafkaProducer newProducer() { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl()); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps); } diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java index 380af3e52..dede24c70 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java @@ -357,6 +357,7 @@ private KafkaProducer newProducer() { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl()); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps); } diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java index 55ace34ef..580c008c6 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java @@ -530,6 +530,7 @@ private KafkaProducer newProducer() { "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps); } diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java index 4c6a34b00..277239cd1 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java @@ -125,6 +125,7 @@ void tearDown() { private KafkaProducer newProducer() { final Map producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 1bce78a22..f794166f1 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -43,6 +43,7 @@ import io.aiven.kafka.connect.common.grouper.RecordGrouper; import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; import io.aiven.kafka.connect.common.output.OutputWriter; +import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; @@ -70,6 +71,8 @@ public final class S3SinkTask extends SinkTask { private Map writers; + private boolean isKeyRecordGrouper; + AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); @SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect @@ -83,6 +86,7 @@ public void start(final Map props) { config = new S3SinkConfig(props); s3Client = createAmazonS3Client(config); writers = new HashMap<>(); + isKeyRecordGrouper = isOfTypeKeyRecordGrouper(config.getFilenameTemplate()); try { recordGrouper = RecordGrouperFactory.newRecordGrouper(config); } catch (final Exception e) { // NOPMD AvoidCatchingGenericException @@ -93,6 +97,20 @@ public void start(final Map props) { } } + /** + * This determines if the file is key based, and possible to change a single file multiple times per flush or if + * it's a roll over file which at each flush is reset. + * + * @param fileNameTemplate + * the format type to output files in supplied in the configuration + * @return true if is of type RecordGrouperFactory.KEY_RECORD or RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD + */ + private boolean isOfTypeKeyRecordGrouper(final Template fileNameTemplate) { + return RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate)) + || RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD + .equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate)); + } + private AmazonS3 createAmazonS3Client(final S3SinkConfig config) { final var awsEndpointConfig = newEndpointConfiguration(this.config); final var clientConfig = PredefinedClientConfigurations.defaultConfig() @@ -117,34 +135,46 @@ public void put(final Collection records) { Objects.requireNonNull(records, "records cannot be null"); LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); - - recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); - + if (!isKeyRecordGrouper) { + recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + } } /** - * Flush is used to roll over file and complete the S3 Mutli part upload. + * Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key + * record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and + * roll over the files/ * * @param offsets + * the latest offset sent to put and that is now ready to be flushed. */ @Override public void flush(final Map offsets) { - // On Flush Get Active writers - final Collection activeWriters = writers.values(); - // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. - recordGrouper.clear(); - // Close - activeWriters.forEach(writer -> { + if (isKeyRecordGrouper) { try { - // Close active writers && remove from writers Map - // Calling close will write anything in the buffer before closing and complete the S3 multi part upload - writer.close(); - // Remove once closed - writers.remove(writer); - } catch (IOException e) { - throw new ConnectException(e); + recordGrouper.records().forEach(this::flushToS3); + } finally { + recordGrouper.clear(); } - }); + } else { + // On Flush Get Active writers + final Collection activeWriters = writers.values(); + // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. + recordGrouper.clear(); + // Close + activeWriters.forEach(writer -> { + try { + // Close active writers && remove from writers Map + // Calling close will write anything in the buffer before closing and complete the S3 multi part + // upload + writer.close(); + // Remove once closed + writers.remove(writer); + } catch (IOException e) { + throw new ConnectException(e); + } + }); + } } @@ -159,12 +189,11 @@ public void flush(final Map offsets) { * @return correct OutputWriter for writing a particular record to S3 */ private OutputWriter getOutputWriter(final String filename, final SinkRecord sinkRecord) { - final String fileNameTemplate = getFileNameTemplate(filename, sinkRecord); - if (writers.get(fileNameTemplate) == null) { + if (writers.get(filename) == null) { final var out = newStreamFor(filename, sinkRecord); try { - writers.put(fileNameTemplate, + writers.put(filename, OutputWriter.builder() .withCompressionType(config.getCompressionType()) .withExternalProperties(config.originalsStrings()) @@ -175,7 +204,7 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin throw new ConnectException(e); } } - return writers.get(fileNameTemplate); + return writers.get(filename); } /** @@ -184,12 +213,9 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin * the name of the file in S3 to be written to * @param records * all records in this record grouping, including those already written to S3 - * @param recordToBeWritten - * new records from put() which are to be written to S3 */ private void writeToS3(final String filename, final List records, final Collection recordToBeWritten) { - final SinkRecord sinkRecord = records.get(0); // This writer is being left open until a flush occurs. final OutputWriter writer; // NOPMD CloseResource @@ -198,6 +224,29 @@ private void writeToS3(final String filename, final List records, // Record Grouper returns all records for that filename, all we want is the new batch of records to be added // to the multi part upload. writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); + + } catch (IOException e) { + throw new ConnectException(e); + } + + } + + /** + * For Key record grouper the file is written just once to reduce the number of calls to S3 to a minimum. Each file + * contains one record and is written once with the latest record when flush is called + * + * @param filename + * the name of the file in S3 to be written to + * @param records + * all records in this record grouping, including those already written to S3 + */ + private void flushToS3(final String filename, final List records) { + final SinkRecord sinkRecord = records.get(0); + try (var writer = getOutputWriter(filename, sinkRecord)) { + // For Key based files Record Grouper returns only one record for that filename + // to the multi part upload. + writer.writeRecords(records); + writers.remove(filename, writer); } catch (IOException e) { throw new ConnectException(e); } @@ -206,13 +255,15 @@ private void writeToS3(final String filename, final List records, @Override public void stop() { - writers.forEach((k, v) -> { - try { - v.close(); - } catch (IOException e) { - throw new ConnectException(e); - } - }); + if (!isKeyRecordGrouper) { + writers.forEach((k, v) -> { + try { + v.close(); + } catch (IOException e) { + throw new ConnectException(e); + } + }); + } s3Client.shutdown(); LOGGER.info("Stop S3 Sink Task"); diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java index a49dc76be..3183ebd83 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java @@ -719,6 +719,7 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { properties.put(OutputFormatArgs.FORMAT_OUTPUT_ENVELOPE_CONFIG.key(), "false"); properties.put(OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG.key(), "json"); properties.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "prefix-"); + properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); final S3SinkTask task = new S3SinkTask(); task.start(properties); @@ -786,6 +787,88 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { } + @Test + void mutliPartUploadUsingKeyPartitioning() throws IOException { + final String compression = "none"; + properties.put(S3SinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + properties.put(OutputFormatArgs.FORMAT_OUTPUT_FIELDS_CONFIG.key(), "value"); + properties.put(OutputFormatArgs.FORMAT_OUTPUT_ENVELOPE_CONFIG.key(), "false"); + properties.put(OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG.key(), "json"); + properties.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "prefix-"); + // Compact/key 'mode' value only updated + properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}-{{topic}}"); + + final S3SinkTask task = new S3SinkTask(); + task.start(properties); + int timestamp = 1000; + int offset1 = 10; + int offset2 = 20; + int offset3 = 30; + final List> allRecords = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + allRecords.add(List.of( + createRecordWithStructValueSchema("topic0", 0, "key0", "name0" + i, offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name1" + i, offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name2" + i, offset3++, timestamp++))); + } + final TopicPartition tp00 = new TopicPartition("topic0", 0); + final TopicPartition tp01 = new TopicPartition("topic0", 1); + final TopicPartition tp10 = new TopicPartition("topic1", 0); + final Collection tps = List.of(tp00, tp01, tp10); + task.open(tps); + + allRecords.forEach(task::put); + + final Map offsets = new HashMap<>(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + + final CompressionType compressionType = CompressionType.forName(compression); + + List expectedBlobs = Lists.newArrayList( + "prefix-topic0-0-00000000000000000012" + compressionType.extension(), + "prefix-topic0-1-00000000000000000022" + compressionType.extension(), + "prefix-topic1-0-00000000000000000032" + compressionType.extension()); + + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + + assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000012", compression)) + .containsExactly("[", "{\"name\":\"name02\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000022", compression)) + .containsExactly("[", "{\"name\":\"name12\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000032", compression)) + .containsExactly("[", "{\"name\":\"name22\"}", "]"); + // Reset and send another batch of records to S3 + allRecords.clear(); + for (int i = 0; i < 3; i++) { + allRecords.add(List.of( + createRecordWithStructValueSchema("topic0", 0, "key0", "name01" + i, offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name11" + i, offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name21" + i, offset3++, timestamp++))); + } + allRecords.forEach(task::put); + offsets.clear(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + expectedBlobs.clear(); + + expectedBlobs = Lists.newArrayList("prefix-topic0-0-00000000000000000015" + compressionType.extension(), + "prefix-topic0-1-00000000000000000025" + compressionType.extension(), + "prefix-topic1-0-00000000000000000035" + compressionType.extension()); + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000015", compression)) + .containsExactly("[", "{\"name\":\"name012\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000025", compression)) + .containsExactly("[", "{\"name\":\"name112\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000035", compression)) + .containsExactly("[", "{\"name\":\"name212\"}", "]"); + + } + private SinkRecord createRecordWithStringValueSchema(final String topic, final int partition, final String key, final String value, final int offset, final long timestamp) { return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, offset, From 2fc73112372056153b7af8caf60ec693e1941862 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Tue, 29 Oct 2024 11:40:01 +0000 Subject: [PATCH 3/3] Reduce S3 memory usage by clearing records already sent to S3 Signed-off-by: Aindriu Lavelle --- .../KeyAndTopicPartitionRecordGrouper.java | 6 ++ .../common/grouper/KeyRecordGrouper.java | 6 ++ .../common/grouper/PartitionOffset.java | 44 +++++++++++ .../connect/common/grouper/RecordGrouper.java | 8 ++ .../common/grouper/SinkRecordsBatch.java | 77 +++++++++++++++++++ .../TopicPartitionKeyRecordGrouper.java | 42 ++++++---- .../grouper/TopicPartitionRecordGrouper.java | 46 +++++++---- .../io/aiven/kafka/connect/s3/S3SinkTask.java | 58 +++++--------- .../kafka/connect/s3/S3SinkTaskTest.java | 5 +- 9 files changed, 219 insertions(+), 73 deletions(-) create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java index 89148c6bb..8c45b30d3 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java @@ -101,6 +101,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One entry per file, so the entire file can be removed to reduce memory overhead. + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java index b9af899e0..5ba409b81 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java @@ -90,6 +90,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One record per file, so remove the entry to reduce memory + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java new file mode 100644 index 000000000..568407c84 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +public class PartitionOffset { + + private Long offset; + private int partition; + + public PartitionOffset(final int partition, final Long offset) { + this.offset = offset; + this.partition = partition; + } + + public int getPartition() { + return partition; + } + + public void setPartition(final int partition) { + this.partition = partition; + } + + public Long getOffset() { + return offset; + } + + public void setOffset(final Long offset) { + this.offset = offset; + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java index bd3c09c38..2126e0a66 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java @@ -38,6 +38,14 @@ public interface RecordGrouper { */ void clear(); + /** + * Clear processed records from memory + * + * @param records + * all records already processed to Sink + */ + void clearProcessedRecords(String identifier, List records); + /** * Get all records associated with files, grouped by the file name. * diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java new file mode 100644 index 000000000..3afae8017 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkRecordsBatch { + + private int numberOfRecords; + final private List sinkRecords; + final private String filename; + final private long recordCreationDate = System.currentTimeMillis(); + + public SinkRecordsBatch(final String filename) { + this.filename = filename; + sinkRecords = new ArrayList<>(); + numberOfRecords = 0; + } + + public SinkRecordsBatch(final String filename, final List sinkRecords) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(sinkRecords); + numberOfRecords = sinkRecords.size(); + } + public SinkRecordsBatch(final String filename, final SinkRecord sinkRecord) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(); + this.sinkRecords.add(sinkRecord); + numberOfRecords = 1; + } + + public void addSinkRecord(final SinkRecord sinkRecord) { + this.sinkRecords.add(sinkRecord); + this.numberOfRecords++; + } + + public List getSinkRecords() { + // Ensure access to the Sink Records can only be changed through the apis and not accidentally by another + // process. + return Collections.unmodifiableList(sinkRecords); + } + + public void removeSinkRecords(final List sinkRecords) { + this.sinkRecords.removeAll(sinkRecords); + } + + public int getNumberOfRecords() { + return numberOfRecords; + } + + public String getFilename() { + return filename; + } + + public long getRecordCreationDate() { + return recordCreationDate; + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java index 1b3737557..47127e973 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java @@ -16,13 +16,13 @@ package io.aiven.kafka.connect.common.grouper; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; @@ -38,13 +38,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { private final Template filenameTemplate; - private final Map currentHeadRecords = new HashMap<>(); + private final Map currentHeadRecords = new HashMap<>(); - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final StableTimeFormatter timeFormatter; - private final Rotator> rotator; + private final Rotator rotator; TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile, final TimestampSource tsSource) { @@ -59,7 +59,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -68,7 +68,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { @@ -76,7 +76,8 @@ protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record); + final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, + ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); String objectKey = generateObjectKey(tpk, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(objectKey))) { // Create new file using this record as the head record. @@ -97,14 +98,14 @@ private String recordKey(final SinkRecord record) { return key; } - public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord, + public String generateObjectKey(final TopicPartitionKey tpk, final PartitionOffset headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getOffset()) + : Long.toString(headRecord.getOffset()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getPartition()) + : Long.toString(headRecord.getPartition()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic) @@ -118,8 +119,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he protected String generateNewRecordKey(final SinkRecord record) { final var key = recordKey(record); final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - currentHeadRecords.put(tpk, record); - return generateObjectKey(tpk, record, record); + currentHeadRecords.put(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); + return generateObjectKey(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()), record); } @Override @@ -128,9 +129,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords))); } public static class TopicPartitionKey { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java index 994baa62f..ab2866518 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java @@ -16,13 +16,13 @@ package io.aiven.kafka.connect.common.grouper; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; @@ -46,14 +46,14 @@ class TopicPartitionRecordGrouper implements RecordGrouper { private final Template filenameTemplate; + // Offsets are a Long and Partitions are an Integer + private final Map currentHeadRecords = new HashMap<>(); - private final Map currentHeadRecords = new HashMap<>(); - - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final StableTimeFormatter timeFormatter; - private final Rotator> rotator; + private final Rotator rotator; /** * A constructor. @@ -78,7 +78,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -87,28 +87,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record); + final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, + ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(recordKey))) { // Create new file using this record as the head record. recordKey = generateNewRecordKey(record); } + return recordKey; } - private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord, + private String generateRecordKey(final TopicPartition topicPartition, final PartitionOffset headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getOffset()) + : Long.toString(headRecord.getOffset()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getPartition()) + : Long.toString(headRecord.getPartition()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic) @@ -120,8 +122,9 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink protected String generateNewRecordKey(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - currentHeadRecords.put(topicPartition, record); - return generateRecordKey(topicPartition, record, record); + currentHeadRecords.put(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); + return generateRecordKey(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()), + record); } @Override @@ -130,9 +133,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords))); } } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index f794166f1..dd2c5b5d2 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -136,46 +135,25 @@ public void put(final Collection records) { LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); if (!isKeyRecordGrouper) { - recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + recordGrouper.records().forEach(this::writeToS3); } } /** - * Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key + * Flush is used alongside the KeyRecordGroupers to initiate and complete file writes to S3. When not using a key * record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and - * roll over the files/ + * roll over the files if any records remain in the record grouper for completion. * * @param offsets * the latest offset sent to put and that is now ready to be flushed. */ @Override public void flush(final Map offsets) { - if (isKeyRecordGrouper) { - try { - recordGrouper.records().forEach(this::flushToS3); - } finally { - recordGrouper.clear(); - } - } else { - // On Flush Get Active writers - final Collection activeWriters = writers.values(); - // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. + try { + recordGrouper.records().forEach(this::flushToS3); + } finally { recordGrouper.clear(); - // Close - activeWriters.forEach(writer -> { - try { - // Close active writers && remove from writers Map - // Calling close will write anything in the buffer before closing and complete the S3 multi part - // upload - writer.close(); - // Remove once closed - writers.remove(writer); - } catch (IOException e) { - throw new ConnectException(e); - } - }); } - } /** @@ -214,19 +192,20 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin * @param records * all records in this record grouping, including those already written to S3 */ - private void writeToS3(final String filename, final List records, - final Collection recordToBeWritten) { + private void writeToS3(final String filename, final List records) { + // If no new records are supplied in this put operation return immediately + if (records.isEmpty()) { + return; + } final SinkRecord sinkRecord = records.get(0); - // This writer is being left open until a flush occurs. - final OutputWriter writer; // NOPMD CloseResource + // Record Grouper returns all records for that filename, all we want is the new batch of records to be added + // to the multi part upload. try { - writer = getOutputWriter(filename, sinkRecord); - // Record Grouper returns all records for that filename, all we want is the new batch of records to be added - // to the multi part upload. - writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); - + // This writer is being left open until a flush occurs. + getOutputWriter(filename, sinkRecord).writeRecords(records); + recordGrouper.clearProcessedRecords(filename, records); } catch (IOException e) { - throw new ConnectException(e); + LOGGER.warn("Unable to write record, will retry on next put or flush operation.", e); } } @@ -241,7 +220,8 @@ private void writeToS3(final String filename, final List records, * all records in this record grouping, including those already written to S3 */ private void flushToS3(final String filename, final List records) { - final SinkRecord sinkRecord = records.get(0); + + final SinkRecord sinkRecord = records.isEmpty() ? null : records.get(0); try (var writer = getOutputWriter(filename, sinkRecord)) { // For Key based files Record Grouper returns only one record for that filename // to the multi part upload. diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java index 3183ebd83..d9a1ad8e9 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java @@ -34,7 +34,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import io.aiven.kafka.connect.common.config.OutputFormatFragmentFixture.OutputFormatArgs; import java.io.BufferedReader; import java.io.IOException; @@ -718,7 +717,7 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { properties.put(OutputFormatArgs.FORMAT_OUTPUT_FIELDS_CONFIG.key(), "value"); properties.put(OutputFormatArgs.FORMAT_OUTPUT_ENVELOPE_CONFIG.key(), "false"); properties.put(OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG.key(), "json"); - properties.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "prefix-"); + properties.put(AWS_S3_PREFIX_CONFIG, "prefix-"); properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); final S3SinkTask task = new S3SinkTask(); @@ -794,7 +793,7 @@ void mutliPartUploadUsingKeyPartitioning() throws IOException { properties.put(OutputFormatArgs.FORMAT_OUTPUT_FIELDS_CONFIG.key(), "value"); properties.put(OutputFormatArgs.FORMAT_OUTPUT_ENVELOPE_CONFIG.key(), "false"); properties.put(OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG.key(), "json"); - properties.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "prefix-"); + properties.put(AWS_S3_PREFIX_CONFIG, "prefix-"); // Compact/key 'mode' value only updated properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}-{{topic}}");