From 0600cf7d99984126740f37c4a2180b0751bb303b Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Thu, 24 Oct 2024 09:24:04 +0100 Subject: [PATCH] Add compatibility for compact records to update only on flush. While allowing changelog records to initiate multipart upload. Signed-off-by: Aindriu Lavelle --- .../io/aiven/kafka/connect/s3/S3SinkTask.java | 108 +++++++++++++----- .../kafka/connect/s3/S3SinkTaskTest.java | 83 ++++++++++++++ 2 files changed, 163 insertions(+), 28 deletions(-) diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 7b09745a1..c1490692f 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -42,6 +42,7 @@ import io.aiven.kafka.connect.common.grouper.RecordGrouper; import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; import io.aiven.kafka.connect.common.output.OutputWriter; +import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart; import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory; import io.aiven.kafka.connect.s3.config.S3SinkConfig; @@ -68,6 +69,8 @@ public final class S3SinkTask extends SinkTask { private Map writers; + private boolean isKeyRecordGrouper; + AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); @SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect @@ -81,6 +84,7 @@ public void start(final Map props) { config = new S3SinkConfig(props); s3Client = createAmazonS3Client(config); writers = new HashMap<>(); + isKeyRecordGrouper = isOfTypeKeyRecordGrouper(config.getFilenameTemplate()); try { recordGrouper = RecordGrouperFactory.newRecordGrouper(config); } catch (final Exception e) { // NOPMD AvoidCatchingGenericException @@ -91,6 +95,20 @@ public void start(final Map props) { } } + /** + * This determines if the file is key based, and possible to change a single file multiple times per flush or if + * it's a roll over file which at each flush is reset. + * + * @param fileNameTemplate + * the format type to output files in supplied in the configuration + * @return true if is of type RecordGrouperFactory.KEY_RECORD or RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD + */ + private boolean isOfTypeKeyRecordGrouper(final Template fileNameTemplate) { + return RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate)) + || RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD + .equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate)); + } + private AmazonS3 createAmazonS3Client(final S3SinkConfig config) { final var awsEndpointConfig = newEndpointConfiguration(this.config); final var clientConfig = PredefinedClientConfigurations.defaultConfig() @@ -115,34 +133,46 @@ public void put(final Collection records) { Objects.requireNonNull(records, "records cannot be null"); LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); - - recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); - + if (!isKeyRecordGrouper) { + recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + } } /** - * Flush is used to roll over file and complete the S3 Mutli part upload. + * Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key + * record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and + * roll over the files/ * * @param offsets + * the latest offset sent to put and that is now ready to be flushed. */ @Override public void flush(final Map offsets) { - // On Flush Get Active writers - final Collection activeWriters = writers.values(); - // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. - recordGrouper.clear(); - // Close - activeWriters.forEach(writer -> { + if (isKeyRecordGrouper) { try { - // Close active writers && remove from writers Map - // Calling close will write anything in the buffer before closing and complete the S3 multi part upload - writer.close(); - // Remove once closed - writers.remove(writer); - } catch (IOException e) { - throw new ConnectException(e); + recordGrouper.records().forEach(this::flushToS3); + } finally { + recordGrouper.clear(); } - }); + } else { + // On Flush Get Active writers + final Collection activeWriters = writers.values(); + // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. + recordGrouper.clear(); + // Close + activeWriters.forEach(writer -> { + try { + // Close active writers && remove from writers Map + // Calling close will write anything in the buffer before closing and complete the S3 multi part + // upload + writer.close(); + // Remove once closed + writers.remove(writer); + } catch (IOException e) { + throw new ConnectException(e); + } + }); + } } @@ -182,12 +212,9 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin * the name of the file in S3 to be written to * @param records * all records in this record grouping, including those already written to S3 - * @param recordToBeWritten - * new records from put() which are to be written to S3 */ private void writeToS3(final String filename, final List records, final Collection recordToBeWritten) { - final SinkRecord sinkRecord = records.get(0); // This writer is being left open until a flush occurs. final OutputWriter writer; // NOPMD CloseResource @@ -196,6 +223,29 @@ private void writeToS3(final String filename, final List records, // Record Grouper returns all records for that filename, all we want is the new batch of records to be added // to the multi part upload. writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); + + } catch (IOException e) { + throw new ConnectException(e); + } + + } + + /** + * For Key record grouper the file is written just once to reduce the number of calls to S3 to a minimum. Each file + * contains one record and is written once with the latest record when flush is called + * + * @param filename + * the name of the file in S3 to be written to + * @param records + * all records in this record grouping, including those already written to S3 + */ + private void flushToS3(final String filename, final List records) { + final SinkRecord sinkRecord = records.get(0); + try (var writer = getOutputWriter(filename, sinkRecord)) { + // For Key based files Record Grouper returns only one record for that filename + // to the multi part upload. + writer.writeRecords(records); + writers.remove(filename, writer); } catch (IOException e) { throw new ConnectException(e); } @@ -204,13 +254,15 @@ private void writeToS3(final String filename, final List records, @Override public void stop() { - writers.forEach((k, v) -> { - try { - v.close(); - } catch (IOException e) { - throw new ConnectException(e); - } - }); + if (!isKeyRecordGrouper) { + writers.forEach((k, v) -> { + try { + v.close(); + } catch (IOException e) { + throw new ConnectException(e); + } + }); + } s3Client.shutdown(); LOGGER.info("Stop S3 Sink Task"); diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java index 83858666e..795c7c8b4 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java @@ -706,6 +706,7 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { properties.put(S3SinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false"); properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "json"); properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "prefix-"); + properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); final S3SinkTask task = new S3SinkTask(); task.start(properties); @@ -773,6 +774,88 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { } + @Test + void mutliPartUploadUsingKeyPartitioning() throws IOException { + final String compression = "none"; + properties.put(S3SinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + properties.put(S3SinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); + properties.put(S3SinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false"); + properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "json"); + properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "prefix-"); + // Compact/key 'mode' value only updated + properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}-{{topic}}"); + + final S3SinkTask task = new S3SinkTask(); + task.start(properties); + int timestamp = 1000; + int offset1 = 10; + int offset2 = 20; + int offset3 = 30; + final List> allRecords = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + allRecords.add(List.of( + createRecordWithStructValueSchema("topic0", 0, "key0", "name0" + i, offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name1" + i, offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name2" + i, offset3++, timestamp++))); + } + final TopicPartition tp00 = new TopicPartition("topic0", 0); + final TopicPartition tp01 = new TopicPartition("topic0", 1); + final TopicPartition tp10 = new TopicPartition("topic1", 0); + final Collection tps = List.of(tp00, tp01, tp10); + task.open(tps); + + allRecords.forEach(task::put); + + final Map offsets = new HashMap<>(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + + final CompressionType compressionType = CompressionType.forName(compression); + + List expectedBlobs = Lists.newArrayList( + "prefix-topic0-0-00000000000000000012" + compressionType.extension(), + "prefix-topic0-1-00000000000000000022" + compressionType.extension(), + "prefix-topic1-0-00000000000000000032" + compressionType.extension()); + + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + + assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000012", compression)) + .containsExactly("[", "{\"name\":\"name02\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000022", compression)) + .containsExactly("[", "{\"name\":\"name12\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000032", compression)) + .containsExactly("[", "{\"name\":\"name22\"}", "]"); + // Reset and send another batch of records to S3 + allRecords.clear(); + for (int i = 0; i < 3; i++) { + allRecords.add(List.of( + createRecordWithStructValueSchema("topic0", 0, "key0", "name01" + i, offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name11" + i, offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name21" + i, offset3++, timestamp++))); + } + allRecords.forEach(task::put); + offsets.clear(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + expectedBlobs.clear(); + + expectedBlobs = Lists.newArrayList("prefix-topic0-0-00000000000000000015" + compressionType.extension(), + "prefix-topic0-1-00000000000000000025" + compressionType.extension(), + "prefix-topic1-0-00000000000000000035" + compressionType.extension()); + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000015", compression)) + .containsExactly("[", "{\"name\":\"name012\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000025", compression)) + .containsExactly("[", "{\"name\":\"name112\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000035", compression)) + .containsExactly("[", "{\"name\":\"name212\"}", "]"); + + } + private SinkRecord createRecordWithStringValueSchema(final String topic, final int partition, final String key, final String value, final int offset, final long timestamp) { return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, offset,