From c33b727f820be4a4425058e2282a7e69082a3f5b Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 18 Dec 2023 17:30:32 +0800 Subject: [PATCH 1/9] Proof of concept for Partitioner Refactoring --- .../io/jcloud/BlobStoreAbstractConfig.java | 4 +- .../jcloud/partitioner/LegacyPartitioner.java | 33 ++++++ .../io/jcloud/partitioner/Partitioner.java | 48 +------- .../jcloud/partitioner/TimePartitioner.java | 95 ++------------- .../jcloud/partitioner/TopicPartitioner.java | 57 +++++++++ .../{ => legacy}/AbstractPartitioner.java | 2 +- .../{ => legacy}/PartitionerType.java | 2 +- .../{ => legacy}/SimplePartitioner.java | 2 +- .../partitioner/legacy/TimePartitioner.java | 112 ++++++++++++++++++ .../partitioner/legacy/package-info.java | 19 +++ .../io/jcloud/partitioner/package-info.java | 2 +- .../io/jcloud/sink/BlobStoreAbstractSink.java | 52 ++++++-- .../pulsar/io/jcloud/ConnectorConfigTest.java | 2 +- .../jcloud/partitioner/PartitionerTest.java | 5 +- .../partitioner/PartitionerTypeTest.java | 1 + .../SliceTopicPartitionPartitionerTest.java | 5 +- .../CloudStorageGenericRecordSinkTest.java | 2 +- 17 files changed, 293 insertions(+), 150 deletions(-) create mode 100644 src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java create mode 100644 src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java rename src/main/java/org/apache/pulsar/io/jcloud/partitioner/{ => legacy}/AbstractPartitioner.java (98%) rename src/main/java/org/apache/pulsar/io/jcloud/partitioner/{ => legacy}/PartitionerType.java (94%) rename src/main/java/org/apache/pulsar/io/jcloud/partitioner/{ => legacy}/SimplePartitioner.java (95%) create mode 100644 src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/TimePartitioner.java create mode 100644 src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/package-info.java diff --git a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java index c28add81..7c532d56 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java @@ -41,7 +41,7 @@ import org.apache.pulsar.io.jcloud.format.Format; import org.apache.pulsar.io.jcloud.format.JsonFormat; import org.apache.pulsar.io.jcloud.format.ParquetFormat; -import org.apache.pulsar.io.jcloud.partitioner.PartitionerType; +import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +81,9 @@ public class BlobStoreAbstractConfig implements Serializable { private String formatType; + @Deprecated // Use partitioner instead private String partitionerType; + private String partitioner; private boolean partitionerUseIndexAsOffset; diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java new file mode 100644 index 00000000..bac97379 --- /dev/null +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.functions.api.Record; + +public class LegacyPartitioner implements Partitioner { + public static final String PARTITIONER_NAME = "legacy"; + @Override + public Map>> partition(List> records) { + return records.stream().collect(Collectors.groupingBy(record -> record.getTopicName().get())); + } +} diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java index e72d666f..e8b121ed 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java @@ -18,49 +18,11 @@ */ package org.apache.pulsar.io.jcloud.partitioner; -import java.io.File; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig; -/** - * Partition incoming records, and generates directories and file names in which to store the - * incoming records. - * - * @param The type representing the field schemas. - */ -public interface Partitioner { - - String PATH_SEPARATOR = File.separator; - - - void configure(BlobStoreAbstractConfig config); - - /** - * Returns string representing the output path for a sinkRecord to be encoded and stored. - * - * @param sinkRecord The record to be stored by the Sink Connector - * @return The path/filename the SinkRecord will be stored into after it is encoded - */ - String encodePartition(Record sinkRecord); - - /** - * Returns string representing the output path for a sinkRecord to be encoded and stored. - * - * @param sinkRecord The record to be stored by the Sink Connector - * @param nowInMillis The current time in ms. Some Partitioners will use this option, but by - * default it is unused. - * @return The path/filename the SinkRecord will be stored into after it is encoded - */ - default String encodePartition(Record sinkRecord, long nowInMillis) { - return encodePartition(sinkRecord); - } - - /** - * Generate saved path. - * - * @param topic topic name - * @param encodedPartition Path encoded by the implementation class - * @return saved path - */ - String generatePartitionedPath(String topic, String encodedPartition); +public interface Partitioner { + Map>> partition(List> records); } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java index 5803545b..f4a62d63 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java @@ -18,95 +18,16 @@ */ package org.apache.pulsar.io.jcloud.partitioner; -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.util.function.Supplier; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.Message; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * partition by day, hour. - * - * @param - */ -public class TimePartitioner extends AbstractPartitioner { - - private static final Logger LOGGER = LoggerFactory.getLogger(TimePartitioner.class); - - private static final long DEFAULT_PARTITION_DURATION = 24 * 3600 * 1000L; - private static final String DEFAULT_PARTITION_PATTERN = "yyyy-MM-dd"; - private long partitionDuration; - private String formatString; - private DateTimeFormatter dateTimeFormatter; - - @Override - public void configure(BlobStoreAbstractConfig config) { - super.configure(config); - this.formatString = StringUtils.defaultIfBlank(config.getTimePartitionPattern(), DEFAULT_PARTITION_PATTERN); - this.partitionDuration = parseDurationString(config.getTimePartitionDuration()); - this.dateTimeFormatter = new DateTimeFormatterBuilder() - .appendPattern(formatString) - .toFormatter(); - } - - private long parseDurationString(String timePartitionDuration) { - if (StringUtils.isBlank(timePartitionDuration)) { - return DEFAULT_PARTITION_DURATION; - } - if (Character.isAlphabetic(timePartitionDuration.charAt(timePartitionDuration.length() - 1))) { - String number = timePartitionDuration.substring(0, timePartitionDuration.length() - 1); - switch (timePartitionDuration.charAt(timePartitionDuration.length() - 1)) { - case 'd': - case 'D': - return Long.parseLong(number) * 24L * 3600L * 1000L; - case 'h': - case 'H': - return Long.parseLong(number) * 3600L * 1000L; - case 'm': - return Long.parseLong(number) * 60L * 1000L; - case 's': - return Long.parseLong(number) * 1000L; - default: - throw new RuntimeException("not supported time duration scale " + timePartitionDuration); - } - } else { - try { - return Long.parseLong(timePartitionDuration); - } catch (NumberFormatException ex) { - throw new RuntimeException("not supported time duration format " + timePartitionDuration, ex); - } - } - } - - @Override - public String encodePartition(Record sinkRecord) { - throw new RuntimeException(new IllegalAccessException()); - } +public class TimePartitioner implements Partitioner { + public static final String PARTITIONER_NAME = "time"; @Override - public String encodePartition(Record sinkRecord, long nowInMillis) { - long publishTime = getPublishTime(sinkRecord, nowInMillis); - long parsed = (publishTime / partitionDuration) * partitionDuration; - String timeString = dateTimeFormatter.format(Instant.ofEpochMilli(parsed).atOffset(ZoneOffset.UTC)); - final String result = timeString - + PATH_SEPARATOR - + getMessageOffset(sinkRecord); - return result; - } - - private long getPublishTime(Record sinkRecord, Long defaultTime) { - final Supplier defaultTimeSupplier = () -> { - LOGGER.warn("record not exist Message {}", sinkRecord.getRecordSequence().get()); - return defaultTime; - }; - return sinkRecord.getMessage() - .map(Message::getPublishTime) - .orElseGet(defaultTimeSupplier); + public Map>> partition(List> records) { + return Collections.singletonMap(Long.toString(System.currentTimeMillis()), records); } } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java new file mode 100644 index 00000000..2afa36db --- /dev/null +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.functions.api.Record; + +public class TopicPartitioner implements Partitioner { + public static final String PARTITIONER_NAME = "topic"; + @Override + public Map>> partition(List> records) { + return records.stream() + .collect(Collectors.groupingBy(record -> record.getTopicName() + .orElseThrow(() -> new RuntimeException("Topic name is not present in record.")))) + .entrySet().stream() + .collect(Collectors.toMap(entry -> generateFilePath(entry.getKey(), records), Map.Entry::getValue)); + } + + String generateFilePath(String topic, List> records) { + TopicName topicName = TopicName.getPartitionedTopicName(topic); + + return StringUtils.join(Arrays.asList( + topicName.getTenant(), + topicName.getNamespacePortion(), + topicName.getLocalName(), + Long.toString(getMessageOffset(records.get(0))) + ), File.separator); + } + + protected long getMessageOffset(Record record) { + return record.getRecordSequence() + .orElseThrow(() -> new RuntimeException("The record sequence is not present in record.")); + } +} diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/AbstractPartitioner.java similarity index 98% rename from src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java rename to src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/AbstractPartitioner.java index 0b95f309..71e2d3af 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/AbstractPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.jcloud.partitioner; +package org.apache.pulsar.io.jcloud.partitioner.legacy; import java.util.ArrayList; import java.util.List; diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/PartitionerType.java similarity index 94% rename from src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java rename to src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/PartitionerType.java index addd548e..a4724d48 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/PartitionerType.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.jcloud.partitioner; +package org.apache.pulsar.io.jcloud.partitioner.legacy; /** * partitioner types. diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/SimplePartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/SimplePartitioner.java similarity index 95% rename from src/main/java/org/apache/pulsar/io/jcloud/partitioner/SimplePartitioner.java rename to src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/SimplePartitioner.java index 4fd9d61d..fceca019 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/SimplePartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/SimplePartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.jcloud.partitioner; +package org.apache.pulsar.io.jcloud.partitioner.legacy; import org.apache.pulsar.functions.api.Record; diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/TimePartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/TimePartitioner.java new file mode 100644 index 00000000..30f593ed --- /dev/null +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/TimePartitioner.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner.legacy; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * partition by day, hour. + * + * @param + */ +public class TimePartitioner extends AbstractPartitioner { + + private static final Logger LOGGER = LoggerFactory.getLogger(TimePartitioner.class); + + private static final long DEFAULT_PARTITION_DURATION = 24 * 3600 * 1000L; + private static final String DEFAULT_PARTITION_PATTERN = "yyyy-MM-dd"; + private long partitionDuration; + private String formatString; + private DateTimeFormatter dateTimeFormatter; + + @Override + public void configure(BlobStoreAbstractConfig config) { + super.configure(config); + this.formatString = StringUtils.defaultIfBlank(config.getTimePartitionPattern(), DEFAULT_PARTITION_PATTERN); + this.partitionDuration = parseDurationString(config.getTimePartitionDuration()); + this.dateTimeFormatter = new DateTimeFormatterBuilder() + .appendPattern(formatString) + .toFormatter(); + } + + private long parseDurationString(String timePartitionDuration) { + if (StringUtils.isBlank(timePartitionDuration)) { + return DEFAULT_PARTITION_DURATION; + } + if (Character.isAlphabetic(timePartitionDuration.charAt(timePartitionDuration.length() - 1))) { + String number = timePartitionDuration.substring(0, timePartitionDuration.length() - 1); + switch (timePartitionDuration.charAt(timePartitionDuration.length() - 1)) { + case 'd': + case 'D': + return Long.parseLong(number) * 24L * 3600L * 1000L; + case 'h': + case 'H': + return Long.parseLong(number) * 3600L * 1000L; + case 'm': + return Long.parseLong(number) * 60L * 1000L; + case 's': + return Long.parseLong(number) * 1000L; + default: + throw new RuntimeException("not supported time duration scale " + timePartitionDuration); + } + } else { + try { + return Long.parseLong(timePartitionDuration); + } catch (NumberFormatException ex) { + throw new RuntimeException("not supported time duration format " + timePartitionDuration, ex); + } + } + } + + @Override + public String encodePartition(Record sinkRecord) { + throw new RuntimeException(new IllegalAccessException()); + } + + @Override + public String encodePartition(Record sinkRecord, long nowInMillis) { + long publishTime = getPublishTime(sinkRecord, nowInMillis); + long parsed = (publishTime / partitionDuration) * partitionDuration; + String timeString = dateTimeFormatter.format(Instant.ofEpochMilli(parsed).atOffset(ZoneOffset.UTC)); + final String result = timeString + + PATH_SEPARATOR + + getMessageOffset(sinkRecord); + return result; + } + + private long getPublishTime(Record sinkRecord, Long defaultTime) { + final Supplier defaultTimeSupplier = () -> { + LOGGER.warn("record not exist Message {}", sinkRecord.getRecordSequence().get()); + return defaultTime; + }; + return sinkRecord.getMessage() + .map(Message::getPublishTime) + .orElseGet(defaultTimeSupplier); + } +} diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/package-info.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/package-info.java new file mode 100644 index 00000000..1b8a8842 --- /dev/null +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner.legacy; diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/package-info.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/package-info.java index c9c9c798..3d0aea09 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/package-info.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/package-info.java @@ -16,4 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.jcloud.partitioner; \ No newline at end of file +package org.apache.pulsar.io.jcloud.partitioner; diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 3f05c460..0083bf9a 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -51,10 +51,12 @@ import org.apache.pulsar.io.jcloud.format.InitConfiguration; import org.apache.pulsar.io.jcloud.format.JsonFormat; import org.apache.pulsar.io.jcloud.format.ParquetFormat; -import org.apache.pulsar.io.jcloud.partitioner.Partitioner; -import org.apache.pulsar.io.jcloud.partitioner.PartitionerType; -import org.apache.pulsar.io.jcloud.partitioner.SimplePartitioner; -import org.apache.pulsar.io.jcloud.partitioner.TimePartitioner; +import org.apache.pulsar.io.jcloud.partitioner.LegacyPartitioner; +import org.apache.pulsar.io.jcloud.partitioner.TopicPartitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; +import org.apache.pulsar.io.jcloud.partitioner.legacy.SimplePartitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.TimePartitioner; import org.apache.pulsar.io.jcloud.writer.BlobWriter; import org.jclouds.blobstore.ContainerNotFoundException; @@ -67,7 +69,8 @@ public abstract class BlobStoreAbstractSink i private V sinkConfig; - protected Partitioner partitioner; + protected Partitioner legacyPartitioner; + protected org.apache.pulsar.io.jcloud.partitioner.Partitioner partitioner; protected Format format; @@ -105,6 +108,9 @@ public void open(Map config, SinkContext sinkContext) throws Exc formatConfigInitializer.configure(sinkConfig); } partitioner = buildPartitioner(sinkConfig); + if (partitioner instanceof LegacyPartitioner) { + legacyPartitioner = buildLegacyPartitioner(sinkConfig); + } pathPrefix = StringUtils.trimToEmpty(sinkConfig.getPathPrefix()); long batchTimeMs = sinkConfig.getBatchTimeMs(); maxBatchSize = sinkConfig.getBatchSize(); @@ -124,7 +130,7 @@ private void flushIfNeeded(boolean force) { } } - private Partitioner buildPartitioner(V sinkConfig) { + private Partitioner buildLegacyPartitioner(V sinkConfig) { Partitioner partitioner; String partitionerTypeName = sinkConfig.getPartitionerType(); PartitionerType partitionerType = @@ -142,6 +148,18 @@ private Partitioner buildPartitioner(V sinkConfig) { return partitioner; } + private org.apache.pulsar.io.jcloud.partitioner.Partitioner buildPartitioner(V sinkConfig) { + String partitionerTypeName = sinkConfig.getPartitioner(); + switch (partitionerTypeName) { + case TopicPartitioner.PARTITIONER_NAME: + return new TopicPartitioner(); + case org.apache.pulsar.io.jcloud.partitioner.TimePartitioner.PARTITIONER_NAME: + return new org.apache.pulsar.io.jcloud.partitioner.TimePartitioner(); + default: + return new LegacyPartitioner(); + } + } + private Format buildFormat(V sinkConfig) { String formatType = StringUtils.defaultIfBlank(sinkConfig.getFormatType(), "json"); switch (formatType) { @@ -244,7 +262,7 @@ private void unsafeFlush() { // all output blobs of the same batch should have the same partitioning timestamp final long timeStampForPartitioning = System.currentTimeMillis(); final Map>> recordsToInsertByTopic = - recordsToInsert.stream().collect(Collectors.groupingBy(record -> record.getTopicName().get())); + partitioner.partition(recordsToInsert); for (Map.Entry>> entry : recordsToInsertByTopic.entrySet()) { List> singleTopicRecordsToInsert = entry.getValue(); @@ -264,15 +282,27 @@ private void unsafeFlush() { return; } - String filepath = ""; + String filepath; + try { + if(partitioner instanceof LegacyPartitioner) { + filepath = buildPartitionPath(firstRecord, legacyPartitioner, format, timeStampForPartitioning); + } else { + filepath = entry.getKey() + format.getExtension(); + } + } catch (Exception e) { + log.error("Failed to generate file path", e); + bulkHandleFailedRecords(singleTopicRecordsToInsert); + return; + } + try { format.initSchema(schema); final Iterator> iter = singleTopicRecordsToInsert.iterator(); - filepath = buildPartitionPath(firstRecord, partitioner, format, timeStampForPartitioning); + ByteBuffer payload = bindValue(iter, format); int uploadSize = singleTopicRecordsToInsert.size(); long uploadBytes = getBytesSum(singleTopicRecordsToInsert); - log.info("Uploading blob {} from topic {} uploadSize {} out of currentBatchSize {} " + log.info("Uploading blob {} from partition {} uploadSize {} out of currentBatchSize {} " + " uploadBytes {} out of currcurrentBatchBytes {}", filepath, entry.getKey(), uploadSize, currentBatchSize.get(), @@ -288,7 +318,7 @@ private void unsafeFlush() { sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, singleTopicRecordsToInsert.size()); sinkContext.recordMetric(METRICS_LATEST_UPLOAD_ELAPSED_TIME, elapsedMs); } - log.info("Successfully uploaded blob {} from topic {} uploadSize {} uploadBytes {}", + log.info("Successfully uploaded blob {} from partition {} uploadSize {} uploadBytes {}", filepath, entry.getKey(), uploadSize, uploadBytes); } catch (Exception e) { diff --git a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java index d858c4bb..0dfc813e 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java @@ -25,7 +25,7 @@ import java.util.Map; import org.apache.pulsar.io.common.IOConfigUtils; import org.apache.pulsar.io.core.SinkContext; -import org.apache.pulsar.io.jcloud.partitioner.PartitionerType; +import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; import org.apache.pulsar.io.jcloud.sink.CloudStorageSinkConfig; import org.junit.Assert; import org.junit.Test; diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java index 21719ba9..c5569bfa 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java @@ -29,6 +29,9 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig; +import org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.SimplePartitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.TimePartitioner; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -195,4 +198,4 @@ public void testGeneratePartitionedPath() { () -> MessageFormat.format("expected: {0}\nactual: {1}", expected, encodePartition); Assert.assertEquals(supplier.get(), expectedPartitionedPath, partitionedPath); } -} \ No newline at end of file +} diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTypeTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTypeTest.java index f8f3ef29..3834c9d6 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTypeTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTypeTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import org.apache.commons.lang3.EnumUtils; +import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; import org.junit.Test; /** diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java index cd33bf29..686595bc 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java @@ -29,6 +29,9 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig; +import org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.SimplePartitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.TimePartitioner; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -172,4 +175,4 @@ public void testGeneratePartitionedPath() { () -> MessageFormat.format("expected: {0}\nactual: {1}", expected, encodePartition); Assert.assertEquals(supplier.get(), expectedPartitionedPath, partitionedPath); } -} \ No newline at end of file +} diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java index 59dbbe62..92ee5b2a 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java @@ -45,7 +45,7 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.jcloud.format.Format; -import org.apache.pulsar.io.jcloud.partitioner.Partitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner; import org.apache.pulsar.io.jcloud.writer.BlobWriter; import org.junit.After; import org.junit.Before; From d798bd4c4c4dc70b6def5a74b1176f26b43ca7c6 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 20 Dec 2023 09:03:38 +0800 Subject: [PATCH 2/9] Upload missing file --- .../partitioner/legacy/Partitioner.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/Partitioner.java diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/Partitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/Partitioner.java new file mode 100644 index 00000000..be2f0247 --- /dev/null +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/Partitioner.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner.legacy; + +import java.io.File; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig; + +/** + * Partition incoming records, and generates directories and file names in which to store the + * incoming records. + * + * @param The type representing the field schemas. + */ +public interface Partitioner { + + String PATH_SEPARATOR = File.separator; + + + void configure(BlobStoreAbstractConfig config); + + /** + * Returns string representing the output path for a sinkRecord to be encoded and stored. + * + * @param sinkRecord The record to be stored by the Sink Connector + * @return The path/filename the SinkRecord will be stored into after it is encoded + */ + String encodePartition(Record sinkRecord); + + /** + * Returns string representing the output path for a sinkRecord to be encoded and stored. + * + * @param sinkRecord The record to be stored by the Sink Connector + * @param nowInMillis The current time in ms. Some Partitioners will use this option, but by + * default it is unused. + * @return The path/filename the SinkRecord will be stored into after it is encoded + */ + default String encodePartition(Record sinkRecord, long nowInMillis) { + return encodePartition(sinkRecord); + } + + /** + * Generate saved path. + * + * @param topic topic name + * @param encodedPartition Path encoded by the implementation class + * @return saved path + */ + String generatePartitionedPath(String topic, String encodedPartition); +} From c5b0b6b145af666ac2eb327711f8a1c8b437cce0 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 20 Dec 2023 10:51:15 +0800 Subject: [PATCH 3/9] Add doc for Partitioner interface --- .../pulsar/io/jcloud/partitioner/Partitioner.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java index e8b121ed..3a47a0a4 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java @@ -23,6 +23,18 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.functions.api.Record; +/** + * The Partitioner interface offers a mechanism to categorize a list of records into distinct parts. + */ public interface Partitioner { + /** + * The partition method takes a list of records and returns a map. Each key in the map represents a + * unique partition, and the corresponding value is a list of records that belong to that partition. + * + * @param records A list of records to be partitioned. Each record is of the type GenericRecord. + * @return A map where keys represent unique partitions and values are lists of records + * associated with their respective partitions. The unique partition is consistently used as a file path in the cloud + * storage system. + */ Map>> partition(List> records); } From d031ec4c5b358003d74b35d97ecadb22f88e901c Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 4 Jan 2024 17:19:23 +0800 Subject: [PATCH 4/9] Fix check style --- .../org/apache/pulsar/io/jcloud/partitioner/Partitioner.java | 4 ++-- .../apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java | 3 +-- .../java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java | 2 ++ 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java index 3a47a0a4..9700d074 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/Partitioner.java @@ -33,8 +33,8 @@ public interface Partitioner { * * @param records A list of records to be partitioned. Each record is of the type GenericRecord. * @return A map where keys represent unique partitions and values are lists of records - * associated with their respective partitions. The unique partition is consistently used as a file path in the cloud - * storage system. + * associated with their respective partitions. The unique partition is consistently used as a file path in the + * cloud storage system. */ Map>> partition(List> records); } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 0083bf9a..999c47dd 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; @@ -284,7 +283,7 @@ private void unsafeFlush() { String filepath; try { - if(partitioner instanceof LegacyPartitioner) { + if (partitioner instanceof LegacyPartitioner) { filepath = buildPartitionPath(firstRecord, legacyPartitioner, format, timeStampForPartitioning); } else { filepath = entry.getKey() + format.getExtension(); diff --git a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java index 0dfc813e..0f92b1e3 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java @@ -51,6 +51,7 @@ public void loadBasicConfigTest() throws IOException { config.put("timePartitionPattern", "yyyy-MM-dd"); config.put("timePartitionDuration", "2d"); config.put("batchSize", 10); + config.put("partitioner", "topic"); CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config); cloudStorageSinkConfig.validate(); @@ -64,6 +65,7 @@ public void loadBasicConfigTest() throws IOException { Assert.assertEquals(config.get("timePartitionPattern"), cloudStorageSinkConfig.getTimePartitionPattern()); Assert.assertEquals(config.get("timePartitionDuration"), cloudStorageSinkConfig.getTimePartitionDuration()); Assert.assertEquals(config.get("batchSize"), cloudStorageSinkConfig.getBatchSize()); + Assert.assertEquals(config.get("partitioner"), cloudStorageSinkConfig.getPartitioner()); Assert.assertEquals((int) config.get("batchSize"), cloudStorageSinkConfig.getPendingQueueSize()); Assert.assertEquals(10000000L, cloudStorageSinkConfig.getMaxBatchBytes()); } From 3d1faea3612eb428c335491aae513bff286a3877 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 4 Jan 2024 18:00:05 +0800 Subject: [PATCH 5/9] Add tests and improve --- .../jcloud/partitioner/LegacyPartitioner.java | 15 +++++++- .../jcloud/partitioner/TimePartitioner.java | 11 ++++++ .../jcloud/partitioner/TopicPartitioner.java | 3 ++ .../io/jcloud/sink/BlobStoreAbstractSink.java | 2 +- .../CloudStorageGenericRecordSinkTest.java | 36 +++++++++++++++++++ 5 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java index bac97379..a8358674 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java @@ -24,8 +24,21 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.functions.api.Record; +/** + * The LegacyPartitioner class, implementing the Partitioner interface, is designed to partition records according to + * their respective topic names. + * It is used to partition records based on their topic name. It will then use the legacy partitioner such as + * SimplePartitioner or TimePartitioner as the underlying partitioner. This is for the backward compatibility usage. + */ public class LegacyPartitioner implements Partitioner { - public static final String PARTITIONER_NAME = "legacy"; + + /** + * This method partitions a list of records into a map where the keys are the topic names and the values are lists + * of records. + * + * @param records A list of records of type GenericRecord that need to be partitioned. + * @return A map where the keys are the topic names and the values are lists of records. + */ @Override public Map>> partition(List> records) { return records.stream().collect(Collectors.groupingBy(record -> record.getTopicName().get())); diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java index f4a62d63..2fab842a 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java @@ -24,8 +24,19 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.functions.api.Record; +/** + * The TimePartitioner is used to partition records based on the current sink timestamp. + */ public class TimePartitioner implements Partitioner { public static final String PARTITIONER_NAME = "time"; + + /** + * This method partitions a list of records into a map where the key is the current system time in milliseconds + * and the value is the list of records. + * + * @param records A list of records of type GenericRecord that need to be partitioned. + * @return A map where the key is the current system time in milliseconds and the value is the list of records. + */ @Override public Map>> partition(List> records) { return Collections.singletonMap(Long.toString(System.currentTimeMillis()), records); diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java index 2afa36db..3120cd1c 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java @@ -28,6 +28,9 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; +/** + * The TopicPartitioner is used to partition records based on the topic name. + */ public class TopicPartitioner implements Partitioner { public static final String PARTITIONER_NAME = "topic"; @Override diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 999c47dd..1fc1c709 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -286,7 +286,7 @@ private void unsafeFlush() { if (partitioner instanceof LegacyPartitioner) { filepath = buildPartitionPath(firstRecord, legacyPartitioner, format, timeStampForPartitioning); } else { - filepath = entry.getKey() + format.getExtension(); + filepath = pathPrefix + entry.getKey() + format.getExtension(); } } catch (Exception e) { log.error("Failed to generate file path", e); diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java index 92ee5b2a..55b63c70 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java @@ -20,6 +20,7 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; @@ -106,6 +107,7 @@ public void setup() throws Exception { when(mockRecord.getValue()).thenReturn(genericRecord); when(mockRecord.getSchema()).thenAnswer((Answer) invocationOnMock -> schema); when(mockRecord.getMessage()).thenReturn(Optional.of(mockMessage)); + when(mockRecord.getRecordSequence()).thenReturn(Optional.of(1L)); } @After @@ -160,6 +162,40 @@ public void repeatedlyFlushOnMaxBatchBytesTest() throws Exception { verifyRecordAck(100); } + private void verifyPartitionerSinkFlush(String prefix) throws Exception { + this.sink.open(this.config, this.mockSinkContext); + + sendMockRecord(5); + await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> verify(mockBlobWriter, atLeastOnce()).uploadBlob( + argThat((String s) -> s.matches(prefix + "(\\d+)\\.json")), any(ByteBuffer.class)) + ); + } + + @Test + public void testTimePartitioner() throws Exception { + this.config.put("batchTimeMs", 60000); // set high batchTimeMs to prevent scheduled flush + this.config.put("maxBatchBytes", 10000); // set high maxBatchBytes to prevent flush + this.config.put("batchSize", 5); // force flush after 5 messages + this.config.put("pathPrefix", "time/"); + this.config.put("partitioner", "time"); + this.config.put("formatType", "json"); + + verifyPartitionerSinkFlush("time/"); + } + + @Test + public void testTopicPartitioner() throws Exception { + this.config.put("batchTimeMs", 60000); // set high batchTimeMs to prevent scheduled flush + this.config.put("maxBatchBytes", 10000); // set high maxBatchBytes to prevent flush + this.config.put("batchSize", 5); // force flush after 5 messages + this.config.put("pathPrefix", "topic/"); + this.config.put("partitioner", "topic"); + this.config.put("formatType", "json"); + + verifyPartitionerSinkFlush("topic/public/default/test-topic/"); + } + private void verifyRecordAck(int numberOfRecords) throws Exception { this.sink.open(this.config, this.mockSinkContext); sendMockRecord(numberOfRecords); From adf802f0ff04dfd8f8df50e30131c2698fb8af3c Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 4 Jan 2024 18:03:35 +0800 Subject: [PATCH 6/9] Fix NPE --- .../io/jcloud/sink/BlobStoreAbstractSink.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 1fc1c709..7c8c9c8c 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -149,14 +149,15 @@ private Partitioner buildLegacyPartitioner(V sinkConfig) { private org.apache.pulsar.io.jcloud.partitioner.Partitioner buildPartitioner(V sinkConfig) { String partitionerTypeName = sinkConfig.getPartitioner(); - switch (partitionerTypeName) { - case TopicPartitioner.PARTITIONER_NAME: - return new TopicPartitioner(); - case org.apache.pulsar.io.jcloud.partitioner.TimePartitioner.PARTITIONER_NAME: - return new org.apache.pulsar.io.jcloud.partitioner.TimePartitioner(); - default: - return new LegacyPartitioner(); + if (partitionerTypeName == null) { + return new LegacyPartitioner(); } + return switch (partitionerTypeName) { + case TopicPartitioner.PARTITIONER_NAME -> new TopicPartitioner(); + case org.apache.pulsar.io.jcloud.partitioner.TimePartitioner.PARTITIONER_NAME -> + new org.apache.pulsar.io.jcloud.partitioner.TimePartitioner(); + default -> new LegacyPartitioner(); + }; } private Format buildFormat(V sinkConfig) { From 12f724b143ec3867c8fd1ae3341aac7a1c722df8 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 4 Jan 2024 18:04:23 +0800 Subject: [PATCH 7/9] Fix switch --- .../io/jcloud/sink/BlobStoreAbstractSink.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 7c8c9c8c..c8326286 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -152,12 +152,14 @@ private org.apache.pulsar.io.jcloud.partitioner.Partitioner buildPartitioner(V s if (partitionerTypeName == null) { return new LegacyPartitioner(); } - return switch (partitionerTypeName) { - case TopicPartitioner.PARTITIONER_NAME -> new TopicPartitioner(); - case org.apache.pulsar.io.jcloud.partitioner.TimePartitioner.PARTITIONER_NAME -> - new org.apache.pulsar.io.jcloud.partitioner.TimePartitioner(); - default -> new LegacyPartitioner(); - }; + switch (partitionerTypeName) { + case TopicPartitioner.PARTITIONER_NAME: + return new TopicPartitioner(); + case org.apache.pulsar.io.jcloud.partitioner.TimePartitioner.PARTITIONER_NAME: + return new org.apache.pulsar.io.jcloud.partitioner.TimePartitioner(); + default: + return new LegacyPartitioner(); + } } private Format buildFormat(V sinkConfig) { From d591a0374250a178538e4d8623e4b18c87fb002d Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 5 Jan 2024 16:13:26 +0800 Subject: [PATCH 8/9] Use enum type for partitioner --- .../io/jcloud/BlobStoreAbstractConfig.java | 11 ++++---- .../{legacy => }/PartitionerType.java | 10 +++---- .../jcloud/partitioner/TimePartitioner.java | 4 +-- .../jcloud/partitioner/TopicPartitioner.java | 1 - .../legacy/LegacyPartitionerType.java | 27 +++++++++++++++++++ .../io/jcloud/sink/BlobStoreAbstractSink.java | 19 +++++++------ .../pulsar/io/jcloud/ConnectorConfigTest.java | 6 ++--- ...st.java => LegacyPartitionerTypeTest.java} | 9 ++++--- 8 files changed, 55 insertions(+), 32 deletions(-) rename src/main/java/org/apache/pulsar/io/jcloud/partitioner/{legacy => }/PartitionerType.java (88%) create mode 100644 src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/LegacyPartitionerType.java rename src/test/java/org/apache/pulsar/io/jcloud/partitioner/{PartitionerTypeTest.java => LegacyPartitionerTypeTest.java} (73%) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java index 7c532d56..5ce8835b 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java @@ -41,7 +41,8 @@ import org.apache.pulsar.io.jcloud.format.Format; import org.apache.pulsar.io.jcloud.format.JsonFormat; import org.apache.pulsar.io.jcloud.format.ParquetFormat; -import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; +import org.apache.pulsar.io.jcloud.partitioner.PartitionerType; +import org.apache.pulsar.io.jcloud.partitioner.legacy.LegacyPartitionerType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +84,7 @@ public class BlobStoreAbstractConfig implements Serializable { @Deprecated // Use partitioner instead private String partitionerType; - private String partitioner; + private PartitionerType partitioner; private boolean partitionerUseIndexAsOffset; @@ -134,18 +135,18 @@ public void validate() { } if (partitionerType == null - || (EnumUtils.getEnumIgnoreCase(PartitionerType.class, partitionerType) == null + || (EnumUtils.getEnumIgnoreCase(LegacyPartitionerType.class, partitionerType) == null && !partitionerType.equalsIgnoreCase("default"))) { // `default` option is for backward compatibility throw new IllegalArgumentException( "partitionerType property not set properly, available options: " - + Arrays.stream(PartitionerType.values()) + + Arrays.stream(LegacyPartitionerType.values()) .map(Enum::name) .map(String::toLowerCase) .collect(Collectors.joining(",")) ); } - if (PartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) { + if (LegacyPartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) { if (StringUtils.isNoneBlank(timePartitionPattern)) { LOGGER.info("test timePartitionPattern is ok {} {}", timePartitionPattern, diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/PartitionerType.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java similarity index 88% rename from src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/PartitionerType.java rename to src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java index a4724d48..81c9d5f3 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/PartitionerType.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java @@ -16,12 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.jcloud.partitioner.legacy; +package org.apache.pulsar.io.jcloud.partitioner; -/** - * partitioner types. - */ public enum PartitionerType { - PARTITION, - TIME; + LEGACY, + TIME, + TOPIC } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java index 2fab842a..c254684f 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java @@ -28,10 +28,8 @@ * The TimePartitioner is used to partition records based on the current sink timestamp. */ public class TimePartitioner implements Partitioner { - public static final String PARTITIONER_NAME = "time"; - /** - * This method partitions a list of records into a map where the key is the current system time in milliseconds + * Partitions a list of records into a map, where the key is the current system time in milliseconds * and the value is the list of records. * * @param records A list of records of type GenericRecord that need to be partitioned. diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java index 3120cd1c..e337554e 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TopicPartitioner.java @@ -32,7 +32,6 @@ * The TopicPartitioner is used to partition records based on the topic name. */ public class TopicPartitioner implements Partitioner { - public static final String PARTITIONER_NAME = "topic"; @Override public Map>> partition(List> records) { return records.stream() diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/LegacyPartitionerType.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/LegacyPartitionerType.java new file mode 100644 index 00000000..83960a7e --- /dev/null +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/legacy/LegacyPartitionerType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner.legacy; + +/** + * partitioner types. + */ +public enum LegacyPartitionerType { + PARTITION, + TIME; +} diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 05b61d79..27e2a4a7 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -51,9 +51,10 @@ import org.apache.pulsar.io.jcloud.format.JsonFormat; import org.apache.pulsar.io.jcloud.format.ParquetFormat; import org.apache.pulsar.io.jcloud.partitioner.LegacyPartitioner; +import org.apache.pulsar.io.jcloud.partitioner.PartitionerType; import org.apache.pulsar.io.jcloud.partitioner.TopicPartitioner; +import org.apache.pulsar.io.jcloud.partitioner.legacy.LegacyPartitionerType; import org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner; -import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; import org.apache.pulsar.io.jcloud.partitioner.legacy.SimplePartitioner; import org.apache.pulsar.io.jcloud.partitioner.legacy.TimePartitioner; import org.apache.pulsar.io.jcloud.writer.BlobWriter; @@ -132,8 +133,9 @@ private void flushIfNeeded(boolean force) { private Partitioner buildLegacyPartitioner(V sinkConfig) { Partitioner partitioner; String partitionerTypeName = sinkConfig.getPartitionerType(); - PartitionerType partitionerType = - EnumUtils.getEnumIgnoreCase(PartitionerType.class, partitionerTypeName, PartitionerType.PARTITION); + LegacyPartitionerType partitionerType = + EnumUtils.getEnumIgnoreCase(LegacyPartitionerType.class, partitionerTypeName, + LegacyPartitionerType.PARTITION); switch (partitionerType) { case TIME: partitioner = new TimePartitioner<>(); @@ -148,14 +150,11 @@ private Partitioner buildLegacyPartitioner(V sinkConfig) { } private org.apache.pulsar.io.jcloud.partitioner.Partitioner buildPartitioner(V sinkConfig) { - String partitionerTypeName = sinkConfig.getPartitioner(); - if (partitionerTypeName == null) { - return new LegacyPartitioner(); - } - switch (partitionerTypeName) { - case TopicPartitioner.PARTITIONER_NAME: + PartitionerType partitionerType = sinkConfig.getPartitioner(); + switch (partitionerType) { + case TOPIC: return new TopicPartitioner(); - case org.apache.pulsar.io.jcloud.partitioner.TimePartitioner.PARTITIONER_NAME: + case TIME: return new org.apache.pulsar.io.jcloud.partitioner.TimePartitioner(); default: return new LegacyPartitioner(); diff --git a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java index 0f92b1e3..f06dea9d 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java @@ -25,7 +25,7 @@ import java.util.Map; import org.apache.pulsar.io.common.IOConfigUtils; import org.apache.pulsar.io.core.SinkContext; -import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; +import org.apache.pulsar.io.jcloud.partitioner.legacy.LegacyPartitionerType; import org.apache.pulsar.io.jcloud.sink.CloudStorageSinkConfig; import org.junit.Assert; import org.junit.Test; @@ -65,7 +65,7 @@ public void loadBasicConfigTest() throws IOException { Assert.assertEquals(config.get("timePartitionPattern"), cloudStorageSinkConfig.getTimePartitionPattern()); Assert.assertEquals(config.get("timePartitionDuration"), cloudStorageSinkConfig.getTimePartitionDuration()); Assert.assertEquals(config.get("batchSize"), cloudStorageSinkConfig.getBatchSize()); - Assert.assertEquals(config.get("partitioner"), cloudStorageSinkConfig.getPartitioner()); + Assert.assertEquals(config.get("partitioner"), cloudStorageSinkConfig.getPartitioner().toString()); Assert.assertEquals((int) config.get("batchSize"), cloudStorageSinkConfig.getPendingQueueSize()); Assert.assertEquals(10000000L, cloudStorageSinkConfig.getMaxBatchBytes()); } @@ -291,7 +291,7 @@ public void testEmptyPartitionerType() throws IOException { config.put("partitionerType", "default"); cloudStorageSinkConfig = CloudStorageSinkConfig.load(config); cloudStorageSinkConfig.validate(); - for (PartitionerType value : PartitionerType.values()) { + for (LegacyPartitionerType value : LegacyPartitionerType.values()) { config.put("partitionerType", value); cloudStorageSinkConfig = CloudStorageSinkConfig.load(config); cloudStorageSinkConfig.validate(); diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTypeTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitionerTypeTest.java similarity index 73% rename from src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTypeTest.java rename to src/test/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitionerTypeTest.java index 3834c9d6..3651747d 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTypeTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitionerTypeTest.java @@ -20,16 +20,17 @@ import static org.junit.Assert.assertEquals; import org.apache.commons.lang3.EnumUtils; -import org.apache.pulsar.io.jcloud.partitioner.legacy.PartitionerType; +import org.apache.pulsar.io.jcloud.partitioner.legacy.LegacyPartitionerType; import org.junit.Test; /** * partitionerType unit test. */ -public class PartitionerTypeTest { +public class LegacyPartitionerTypeTest { @Test public void testValueOf() { - assertEquals(PartitionerType.PARTITION, EnumUtils.getEnumIgnoreCase(PartitionerType.class, "partition")); - assertEquals(PartitionerType.TIME, EnumUtils.getEnumIgnoreCase(PartitionerType.class, "time")); + assertEquals( + LegacyPartitionerType.PARTITION, EnumUtils.getEnumIgnoreCase(LegacyPartitionerType.class, "partition")); + assertEquals(LegacyPartitionerType.TIME, EnumUtils.getEnumIgnoreCase(LegacyPartitionerType.class, "time")); } } From 2bc8b987494dc41d3919e191a8959b4a6c516eb7 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 5 Jan 2024 17:14:21 +0800 Subject: [PATCH 9/9] Move buildPartitionPath to LegacyPartitioner and fix PartitionerType --- .../io/jcloud/BlobStoreAbstractConfig.java | 2 +- .../jcloud/partitioner/LegacyPartitioner.java | 15 +++++++++++++++ .../jcloud/partitioner/PartitionerType.java | 14 +++++++++++++- .../io/jcloud/sink/BlobStoreAbstractSink.java | 19 ++++--------------- .../pulsar/io/jcloud/ConnectorConfigTest.java | 3 ++- .../CloudStorageGenericRecordSinkTest.java | 4 ---- 6 files changed, 35 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java index 5ce8835b..2fbda4e5 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java @@ -84,7 +84,7 @@ public class BlobStoreAbstractConfig implements Serializable { @Deprecated // Use partitioner instead private String partitionerType; - private PartitionerType partitioner; + private PartitionerType partitioner = PartitionerType.LEGACY; private boolean partitionerUseIndexAsOffset; diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java index a8358674..38dab9b6 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/LegacyPartitioner.java @@ -21,8 +21,10 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.jcloud.format.Format; /** * The LegacyPartitioner class, implementing the Partitioner interface, is designed to partition records according to @@ -30,6 +32,7 @@ * It is used to partition records based on their topic name. It will then use the legacy partitioner such as * SimplePartitioner or TimePartitioner as the underlying partitioner. This is for the backward compatibility usage. */ +@Slf4j public class LegacyPartitioner implements Partitioner { /** @@ -43,4 +46,16 @@ public class LegacyPartitioner implements Partitioner { public Map>> partition(List> records) { return records.stream().collect(Collectors.groupingBy(record -> record.getTopicName().get())); } + + public String buildPartitionPath(Record message, String pathPrefix, + org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner p, + Format format, + long partitioningTimestamp) { + + String encodePartition = p.encodePartition(message, partitioningTimestamp); + String partitionedPath = p.generatePartitionedPath(message.getTopicName().get(), encodePartition); + String path = pathPrefix + partitionedPath + format.getExtension(); + log.info("generate message[recordSequence={}] savePath: {}", message.getRecordSequence().get(), path); + return path; + } } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java index 81c9d5f3..bc757dcd 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerType.java @@ -18,8 +18,20 @@ */ package org.apache.pulsar.io.jcloud.partitioner; +import com.fasterxml.jackson.annotation.JsonCreator; + public enum PartitionerType { LEGACY, TIME, - TOPIC + TOPIC; + + @JsonCreator + public static PartitionerType forValue(String value) { + for (PartitionerType partitionerType : PartitionerType.values()) { + if (partitionerType.name().equalsIgnoreCase(value)) { + return partitionerType; + } + } + throw new IllegalArgumentException("Invalid partitionerType value: " + value); + } } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 27e2a4a7..4635aef3 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -262,8 +262,6 @@ private void unsafeFlush() { log.debug("buffered records {}", recordsToInsert); } - // all output blobs of the same batch should have the same partitioning timestamp - final long timeStampForPartitioning = System.currentTimeMillis(); final Map>> recordsToInsertByTopic = partitioner.partition(recordsToInsert); @@ -288,7 +286,10 @@ private void unsafeFlush() { String filepath; try { if (partitioner instanceof LegacyPartitioner) { - filepath = buildPartitionPath(firstRecord, legacyPartitioner, format, timeStampForPartitioning); + // all output blobs of the same batch should have the same partitioning timestamp + final long timeStampForPartitioning = System.currentTimeMillis(); + filepath = ((LegacyPartitioner) partitioner).buildPartitionPath(firstRecord, pathPrefix, + legacyPartitioner, format, timeStampForPartitioning); } else { filepath = pathPrefix + entry.getKey() + format.getExtension(); } @@ -353,18 +354,6 @@ public ByteBuffer bindValue(Iterator> message, return format.recordWriterBuf(message); } - public String buildPartitionPath(Record message, - Partitioner partitioner, - Format format, - long partitioningTimestamp) { - - String encodePartition = partitioner.encodePartition(message, partitioningTimestamp); - String partitionedPath = partitioner.generatePartitionedPath(message.getTopicName().get(), encodePartition); - String path = pathPrefix + partitionedPath + format.getExtension(); - log.info("generate message[recordSequence={}] savePath: {}", message.getRecordSequence().get(), path); - return path; - } - private long getBytesSum(List> records) { return records.stream() .map(Record::getMessage) diff --git a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java index f06dea9d..ac55f914 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java @@ -65,7 +65,8 @@ public void loadBasicConfigTest() throws IOException { Assert.assertEquals(config.get("timePartitionPattern"), cloudStorageSinkConfig.getTimePartitionPattern()); Assert.assertEquals(config.get("timePartitionDuration"), cloudStorageSinkConfig.getTimePartitionDuration()); Assert.assertEquals(config.get("batchSize"), cloudStorageSinkConfig.getBatchSize()); - Assert.assertEquals(config.get("partitioner"), cloudStorageSinkConfig.getPartitioner().toString()); + Assert.assertEquals(config.get("partitioner"), + cloudStorageSinkConfig.getPartitioner().toString().toLowerCase()); Assert.assertEquals((int) config.get("batchSize"), cloudStorageSinkConfig.getPendingQueueSize()); Assert.assertEquals(10000000L, cloudStorageSinkConfig.getMaxBatchBytes()); } diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java index 263c6380..6bb142c9 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java @@ -46,7 +46,6 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.jcloud.format.Format; -import org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner; import org.apache.pulsar.io.jcloud.writer.BlobWriter; import org.junit.After; import org.junit.Assert; @@ -90,9 +89,6 @@ public void setup() throws Exception { this.mockBlobWriter = mock(BlobWriter.class); this.mockRecord = mock(Record.class); - - doReturn("a/test.json").when(sink) - .buildPartitionPath(any(Record.class), any(Partitioner.class), any(Format.class), any(Long.class)); doReturn(mockBlobWriter).when(sink).initBlobWriter(any(CloudStorageSinkConfig.class)); doReturn(ByteBuffer.wrap(new byte[]{0x0})).when(sink).bindValue(any(Iterator.class), any(Format.class));