Skip to content

Commit

Permalink
Revert "[Proposal-1] Support Partitioner Refactoring (streamnative#845)"
Browse files Browse the repository at this point in the history
This reverts commit a8d9f1d.
  • Loading branch information
shibd committed Oct 17, 2024
1 parent 80a475f commit bff4e52
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.pulsar.io.jcloud.format.JsonFormat;
import org.apache.pulsar.io.jcloud.format.ParquetFormat;
import org.apache.pulsar.io.jcloud.partitioner.PartitionerType;
import org.apache.pulsar.io.jcloud.partitioner.legacy.LegacyPartitionerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,9 +81,7 @@ public class BlobStoreAbstractConfig implements Serializable {

private String formatType;

@Deprecated // Use partitioner instead
private String partitionerType;
private PartitionerType partitioner = PartitionerType.LEGACY;

private boolean partitionerUseIndexAsOffset;

Expand Down Expand Up @@ -136,18 +133,18 @@ public void validate() {
}

if (partitionerType == null
|| (EnumUtils.getEnumIgnoreCase(LegacyPartitionerType.class, partitionerType) == null
|| (EnumUtils.getEnumIgnoreCase(PartitionerType.class, partitionerType) == null
&& !partitionerType.equalsIgnoreCase("default"))) {
// `default` option is for backward compatibility
throw new IllegalArgumentException(
"partitionerType property not set properly, available options: "
+ Arrays.stream(LegacyPartitionerType.values())
+ Arrays.stream(PartitionerType.values())
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.joining(","))
);
}
if (LegacyPartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) {
if (PartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) {
if (StringUtils.isNoneBlank(timePartitionPattern)) {
LOGGER.info("test timePartitionPattern is ok {} {}",
timePartitionPattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.partitioner.legacy;
package org.apache.pulsar.io.jcloud.partitioner;

import java.util.ArrayList;
import java.util.List;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,49 @@
*/
package org.apache.pulsar.io.jcloud.partitioner;

import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.schema.GenericRecord;
import java.io.File;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig;

/**
* The Partitioner interface offers a mechanism to categorize a list of records into distinct parts.
* Partition incoming records, and generates directories and file names in which to store the
* incoming records.
*
* @param <T> The type representing the field schemas.
*/
public interface Partitioner {
public interface Partitioner<T> {

String PATH_SEPARATOR = File.separator;


void configure(BlobStoreAbstractConfig config);

/**
* Returns string representing the output path for a sinkRecord to be encoded and stored.
*
* @param sinkRecord The record to be stored by the Sink Connector
* @return The path/filename the SinkRecord will be stored into after it is encoded
*/
String encodePartition(Record<T> sinkRecord);

/**
* Returns string representing the output path for a sinkRecord to be encoded and stored.
*
* @param sinkRecord The record to be stored by the Sink Connector
* @param nowInMillis The current time in ms. Some Partitioners will use this option, but by
* default it is unused.
* @return The path/filename the SinkRecord will be stored into after it is encoded
*/
default String encodePartition(Record<T> sinkRecord, long nowInMillis) {
return encodePartition(sinkRecord);
}

/**
* The partition method takes a list of records and returns a map. Each key in the map represents a
* unique partition, and the corresponding value is a list of records that belong to that partition.
* Generate saved path.
*
* @param records A list of records to be partitioned. Each record is of the type GenericRecord.
* @return A map where keys represent unique partitions and values are lists of records
* associated with their respective partitions. The unique partition is consistently used as a file path in the
* cloud storage system.
* @param topic topic name
* @param encodedPartition Path encoded by the implementation class
* @return saved path
*/
Map<String, List<Record<GenericRecord>>> partition(List<Record<GenericRecord>> records);
String generatePartitionedPath(String topic, String encodedPartition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,10 @@
*/
package org.apache.pulsar.io.jcloud.partitioner;

import com.fasterxml.jackson.annotation.JsonCreator;

/**
* partitioner types.
*/
public enum PartitionerType {
LEGACY,
TIME,
TOPIC;

@JsonCreator
public static PartitionerType forValue(String value) {
for (PartitionerType partitionerType : PartitionerType.values()) {
if (partitionerType.name().equalsIgnoreCase(value)) {
return partitionerType;
}
}
throw new IllegalArgumentException("Invalid partitionerType value: " + value);
}
PARTITION,
TIME;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.partitioner.legacy;
package org.apache.pulsar.io.jcloud.partitioner;

import org.apache.pulsar.functions.api.Record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,95 @@
*/
package org.apache.pulsar.io.jcloud.partitioner;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.schema.GenericRecord;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The TimePartitioner is used to partition records based on the current sink timestamp.
* partition by day, hour.
*
* @param <T>
*/
public class TimePartitioner implements Partitioner {
/**
* Partitions a list of records into a map, where the key is the current system time in milliseconds
* and the value is the list of records.
*
* @param records A list of records of type GenericRecord that need to be partitioned.
* @return A map where the key is the current system time in milliseconds and the value is the list of records.
*/
public class TimePartitioner<T> extends AbstractPartitioner<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(TimePartitioner.class);

private static final long DEFAULT_PARTITION_DURATION = 24 * 3600 * 1000L;
private static final String DEFAULT_PARTITION_PATTERN = "yyyy-MM-dd";
private long partitionDuration;
private String formatString;
private DateTimeFormatter dateTimeFormatter;

@Override
public void configure(BlobStoreAbstractConfig config) {
super.configure(config);
this.formatString = StringUtils.defaultIfBlank(config.getTimePartitionPattern(), DEFAULT_PARTITION_PATTERN);
this.partitionDuration = parseDurationString(config.getTimePartitionDuration());
this.dateTimeFormatter = new DateTimeFormatterBuilder()
.appendPattern(formatString)
.toFormatter();
}

private long parseDurationString(String timePartitionDuration) {
if (StringUtils.isBlank(timePartitionDuration)) {
return DEFAULT_PARTITION_DURATION;
}
if (Character.isAlphabetic(timePartitionDuration.charAt(timePartitionDuration.length() - 1))) {
String number = timePartitionDuration.substring(0, timePartitionDuration.length() - 1);
switch (timePartitionDuration.charAt(timePartitionDuration.length() - 1)) {
case 'd':
case 'D':
return Long.parseLong(number) * 24L * 3600L * 1000L;
case 'h':
case 'H':
return Long.parseLong(number) * 3600L * 1000L;
case 'm':
return Long.parseLong(number) * 60L * 1000L;
case 's':
return Long.parseLong(number) * 1000L;
default:
throw new RuntimeException("not supported time duration scale " + timePartitionDuration);
}
} else {
try {
return Long.parseLong(timePartitionDuration);
} catch (NumberFormatException ex) {
throw new RuntimeException("not supported time duration format " + timePartitionDuration, ex);
}
}
}

@Override
public Map<String, List<Record<GenericRecord>>> partition(List<Record<GenericRecord>> records) {
return Collections.singletonMap(Long.toString(System.currentTimeMillis()), records);
public String encodePartition(Record<T> sinkRecord) {
throw new RuntimeException(new IllegalAccessException());
}

@Override
public String encodePartition(Record<T> sinkRecord, long nowInMillis) {
long publishTime = getPublishTime(sinkRecord, nowInMillis);
long parsed = (publishTime / partitionDuration) * partitionDuration;
String timeString = dateTimeFormatter.format(Instant.ofEpochMilli(parsed).atOffset(ZoneOffset.UTC));
final String result = timeString
+ PATH_SEPARATOR
+ getMessageOffset(sinkRecord);
return result;
}

private long getPublishTime(Record<T> sinkRecord, Long defaultTime) {
final Supplier<Long> defaultTimeSupplier = () -> {
LOGGER.warn("record not exist Message {}", sinkRecord.getRecordSequence().get());
return defaultTime;
};
return sinkRecord.getMessage()
.map(Message::getPublishTime)
.orElseGet(defaultTimeSupplier);
}
}

This file was deleted.

Loading

0 comments on commit bff4e52

Please sign in to comment.