Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal-1] Support Partitioner Refactoring #845

Merged
merged 10 commits into from
Jan 9, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.io.jcloud.format.JsonFormat;
import org.apache.pulsar.io.jcloud.format.ParquetFormat;
import org.apache.pulsar.io.jcloud.partitioner.PartitionerType;
import org.apache.pulsar.io.jcloud.partitioner.legacy.LegacyPartitionerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,7 +82,9 @@ public class BlobStoreAbstractConfig implements Serializable {

private String formatType;

@Deprecated // Use partitioner instead
private String partitionerType;
private PartitionerType partitioner = PartitionerType.LEGACY;

private boolean partitionerUseIndexAsOffset;

Expand Down Expand Up @@ -132,18 +135,18 @@ public void validate() {
}

if (partitionerType == null
|| (EnumUtils.getEnumIgnoreCase(PartitionerType.class, partitionerType) == null
|| (EnumUtils.getEnumIgnoreCase(LegacyPartitionerType.class, partitionerType) == null
&& !partitionerType.equalsIgnoreCase("default"))) {
// `default` option is for backward compatibility
throw new IllegalArgumentException(
"partitionerType property not set properly, available options: "
+ Arrays.stream(PartitionerType.values())
+ Arrays.stream(LegacyPartitionerType.values())
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.joining(","))
);
}
if (PartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) {
if (LegacyPartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) {
if (StringUtils.isNoneBlank(timePartitionPattern)) {
LOGGER.info("test timePartitionPattern is ok {} {}",
timePartitionPattern,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.partitioner;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jcloud.format.Format;

/**
* The LegacyPartitioner class, implementing the Partitioner interface, is designed to partition records according to
* their respective topic names.
* It is used to partition records based on their topic name. It will then use the legacy partitioner such as
* SimplePartitioner or TimePartitioner as the underlying partitioner. This is for the backward compatibility usage.
*/
@Slf4j
public class LegacyPartitioner implements Partitioner {

/**
* This method partitions a list of records into a map where the keys are the topic names and the values are lists
* of records.
*
* @param records A list of records of type GenericRecord that need to be partitioned.
* @return A map where the keys are the topic names and the values are lists of records.
*/
@Override
public Map<String, List<Record<GenericRecord>>> partition(List<Record<GenericRecord>> records) {
return records.stream().collect(Collectors.groupingBy(record -> record.getTopicName().get()));
}

public String buildPartitionPath(Record<GenericRecord> message, String pathPrefix,
org.apache.pulsar.io.jcloud.partitioner.legacy.Partitioner<GenericRecord> p,
Format<?> format,
long partitioningTimestamp) {

String encodePartition = p.encodePartition(message, partitioningTimestamp);
String partitionedPath = p.generatePartitionedPath(message.getTopicName().get(), encodePartition);
String path = pathPrefix + partitionedPath + format.getExtension();
log.info("generate message[recordSequence={}] savePath: {}", message.getRecordSequence().get(), path);
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,23 @@
*/
package org.apache.pulsar.io.jcloud.partitioner;

import java.io.File;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig;

/**
* Partition incoming records, and generates directories and file names in which to store the
* incoming records.
*
* @param <T> The type representing the field schemas.
* The Partitioner interface offers a mechanism to categorize a list of records into distinct parts.
*/
public interface Partitioner<T> {

String PATH_SEPARATOR = File.separator;


void configure(BlobStoreAbstractConfig config);

/**
* Returns string representing the output path for a sinkRecord to be encoded and stored.
*
* @param sinkRecord The record to be stored by the Sink Connector
* @return The path/filename the SinkRecord will be stored into after it is encoded
*/
String encodePartition(Record<T> sinkRecord);

/**
* Returns string representing the output path for a sinkRecord to be encoded and stored.
*
* @param sinkRecord The record to be stored by the Sink Connector
* @param nowInMillis The current time in ms. Some Partitioners will use this option, but by
* default it is unused.
* @return The path/filename the SinkRecord will be stored into after it is encoded
*/
default String encodePartition(Record<T> sinkRecord, long nowInMillis) {
return encodePartition(sinkRecord);
}

public interface Partitioner {
/**
* Generate saved path.
* The partition method takes a list of records and returns a map. Each key in the map represents a
* unique partition, and the corresponding value is a list of records that belong to that partition.
*
* @param topic topic name
* @param encodedPartition Path encoded by the implementation class
* @return saved path
* @param records A list of records to be partitioned. Each record is of the type GenericRecord.
* @return A map where keys represent unique partitions and values are lists of records
* associated with their respective partitions. The unique partition is consistently used as a file path in the
* cloud storage system.
*/
String generatePartitionedPath(String topic, String encodedPartition);
Map<String, List<Record<GenericRecord>>> partition(List<Record<GenericRecord>> records);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@
*/
package org.apache.pulsar.io.jcloud.partitioner;

/**
* partitioner types.
*/
import com.fasterxml.jackson.annotation.JsonCreator;

public enum PartitionerType {
PARTITION,
TIME;
LEGACY,
TIME,
TOPIC;

@JsonCreator
public static PartitionerType forValue(String value) {
for (PartitionerType partitionerType : PartitionerType.values()) {
if (partitionerType.name().equalsIgnoreCase(value)) {
return partitionerType;
}
}
throw new IllegalArgumentException("Invalid partitionerType value: " + value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,95 +18,25 @@
*/
package org.apache.pulsar.io.jcloud.partitioner;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Message;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* partition by day, hour.
*
* @param <T>
* The TimePartitioner is used to partition records based on the current sink timestamp.
*/
public class TimePartitioner<T> extends AbstractPartitioner<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(TimePartitioner.class);

private static final long DEFAULT_PARTITION_DURATION = 24 * 3600 * 1000L;
private static final String DEFAULT_PARTITION_PATTERN = "yyyy-MM-dd";
private long partitionDuration;
private String formatString;
private DateTimeFormatter dateTimeFormatter;

@Override
public void configure(BlobStoreAbstractConfig config) {
super.configure(config);
this.formatString = StringUtils.defaultIfBlank(config.getTimePartitionPattern(), DEFAULT_PARTITION_PATTERN);
this.partitionDuration = parseDurationString(config.getTimePartitionDuration());
this.dateTimeFormatter = new DateTimeFormatterBuilder()
.appendPattern(formatString)
.toFormatter();
}

private long parseDurationString(String timePartitionDuration) {
if (StringUtils.isBlank(timePartitionDuration)) {
return DEFAULT_PARTITION_DURATION;
}
if (Character.isAlphabetic(timePartitionDuration.charAt(timePartitionDuration.length() - 1))) {
String number = timePartitionDuration.substring(0, timePartitionDuration.length() - 1);
switch (timePartitionDuration.charAt(timePartitionDuration.length() - 1)) {
case 'd':
case 'D':
return Long.parseLong(number) * 24L * 3600L * 1000L;
case 'h':
case 'H':
return Long.parseLong(number) * 3600L * 1000L;
case 'm':
return Long.parseLong(number) * 60L * 1000L;
case 's':
return Long.parseLong(number) * 1000L;
default:
throw new RuntimeException("not supported time duration scale " + timePartitionDuration);
}
} else {
try {
return Long.parseLong(timePartitionDuration);
} catch (NumberFormatException ex) {
throw new RuntimeException("not supported time duration format " + timePartitionDuration, ex);
}
}
}

public class TimePartitioner implements Partitioner {
/**
* Partitions a list of records into a map, where the key is the current system time in milliseconds
* and the value is the list of records.
*
* @param records A list of records of type GenericRecord that need to be partitioned.
* @return A map where the key is the current system time in milliseconds and the value is the list of records.
*/
@Override
public String encodePartition(Record<T> sinkRecord) {
throw new RuntimeException(new IllegalAccessException());
}

@Override
public String encodePartition(Record<T> sinkRecord, long nowInMillis) {
long publishTime = getPublishTime(sinkRecord, nowInMillis);
long parsed = (publishTime / partitionDuration) * partitionDuration;
String timeString = dateTimeFormatter.format(Instant.ofEpochMilli(parsed).atOffset(ZoneOffset.UTC));
final String result = timeString
+ PATH_SEPARATOR
+ getMessageOffset(sinkRecord);
return result;
}

private long getPublishTime(Record<T> sinkRecord, Long defaultTime) {
final Supplier<Long> defaultTimeSupplier = () -> {
LOGGER.warn("record not exist Message {}", sinkRecord.getRecordSequence().get());
return defaultTime;
};
return sinkRecord.getMessage()
.map(Message::getPublishTime)
.orElseGet(defaultTimeSupplier);
public Map<String, List<Record<GenericRecord>>> partition(List<Record<GenericRecord>> records) {
return Collections.singletonMap(Long.toString(System.currentTimeMillis()), records);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.partitioner;

import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Record;

/**
* The TopicPartitioner is used to partition records based on the topic name.
*/
public class TopicPartitioner implements Partitioner {
@Override
public Map<String, List<Record<GenericRecord>>> partition(List<Record<GenericRecord>> records) {
return records.stream()
.collect(Collectors.groupingBy(record -> record.getTopicName()
.orElseThrow(() -> new RuntimeException("Topic name is not present in record."))))
.entrySet().stream()
.collect(Collectors.toMap(entry -> generateFilePath(entry.getKey(), records), Map.Entry::getValue));
}

String generateFilePath(String topic, List<Record<GenericRecord>> records) {
TopicName topicName = TopicName.getPartitionedTopicName(topic);

return StringUtils.join(Arrays.asList(
topicName.getTenant(),
topicName.getNamespacePortion(),
topicName.getLocalName(),
Long.toString(getMessageOffset(records.get(0)))
), File.separator);
}

protected long getMessageOffset(Record<GenericRecord> record) {
return record.getRecordSequence()
.orElseThrow(() -> new RuntimeException("The record sequence is not present in record."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.partitioner;
package org.apache.pulsar.io.jcloud.partitioner.legacy;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.partitioner.legacy;

/**
* partitioner types.
*/
public enum LegacyPartitionerType {
PARTITION,
TIME;
}
Loading
Loading