Skip to content

Commit

Permalink
feat: Support new batch model BLEND/PARTITIONED (#1092)
Browse files Browse the repository at this point in the history
* Revert "Add default value for the partitionType to `partition` (#863)"

This reverts commit ba24f85.

* Revert "[Proposal-1] Support Partitioner Refactoring (#845)"

This reverts commit a8d9f1d.

* feat: Add new batch model

(cherry picked from commit 4b94112)
  • Loading branch information
shibd committed Oct 22, 2024
1 parent e8a4191 commit 424d17a
Show file tree
Hide file tree
Showing 48 changed files with 1,599 additions and 1,449 deletions.
27 changes: 0 additions & 27 deletions docs/README.md.template

This file was deleted.

87 changes: 37 additions & 50 deletions docs/aws-s3-sink.md

Large diffs are not rendered by default.

84 changes: 36 additions & 48 deletions docs/azure-blob-storage-sink.md

Large diffs are not rendered by default.

549 changes: 0 additions & 549 deletions docs/cloud-storage-sink.md

This file was deleted.

Binary file removed docs/cloud-storage-sink.png
Binary file not shown.
79 changes: 34 additions & 45 deletions docs/google-cloud-storage-sink.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
import lombok.experimental.Accessors;
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.jcloud.batch.BatchModel;
import org.apache.pulsar.io.jcloud.format.AvroFormat;
import org.apache.pulsar.io.jcloud.format.BytesFormat;
import org.apache.pulsar.io.jcloud.format.Format;
import org.apache.pulsar.io.jcloud.format.JsonFormat;
import org.apache.pulsar.io.jcloud.format.ParquetFormat;
import org.apache.pulsar.io.jcloud.partitioner.PartitionerType;
import org.apache.pulsar.io.jcloud.partitioner.legacy.LegacyPartitionerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,55 +70,48 @@ public class BlobStoreAbstractConfig implements Serializable {
public static final String PROVIDER_GCS = "google-cloud-storage";
public static final String PROVIDER_AZURE = "azure-blob-storage";

// #### bucket configuration ####
private String provider;

private String bucket;

private String region;

private String endpoint;

private String pathPrefix;

private String formatType;

@Deprecated // Use partitioner instead
private String partitionerType = "partition";
private PartitionerType partitioner = PartitionerType.LEGACY;
// #### common configuration ####
private boolean usePathStyleUrl = true;
private String awsCannedAcl = "";
private boolean skipFailedMessages = false;

// #### partitioner configuration ####
// Options: PARTITION, TIME
private String partitionerType;
private String pathPrefix;
private boolean withTopicPartitionNumber = true;
private boolean partitionerUseIndexAsOffset;

// The AVRO codec.
// Options: none, deflate, bzip2, xz, zstandard, snappy
private String avroCodec = "snappy";

// The Parquet codec.
// Options: none, snappy, gzip, lzo, brotli, lz4, zstd
private String parquetCodec = "gzip";

private String timePartitionPattern;

private String timePartitionDuration;

private boolean sliceTopicPartitionPath;

// #### format configuration ####
private String formatType;
// The AVRO codec: none, deflate, bzip2, xz, zstandard, snappy
private String avroCodec = "snappy";
// The Parquet codec: none, snappy, gzip, lzo, brotli, lz4, zstd
private String parquetCodec = "gzip";
private String bytesFormatTypeSeparator = "0x10";
private boolean jsonAllowNaN = false;

// #### batch configuration ####
private long maxBatchBytes = 10_000_000;
private int batchSize = 10;
private int pendingQueueSize = -1;

private long batchTimeMs = 1000;
private BatchModel batchModel = BatchModel.BLEND;
private int pendingQueueSize = -1;

private boolean usePathStyleUrl = true;
private String awsCannedAcl = "";

// #### metadata configuration ####
private boolean withMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private boolean withTopicPartitionNumber = true;
private String bytesFormatTypeSeparator = "0x10";
private boolean skipFailedMessages = false;
private boolean jsonAllowNaN = false;

public void validate() {
checkNotNull(provider, "provider not set.");
Expand All @@ -136,18 +129,18 @@ public void validate() {
}

if (partitionerType == null
|| (EnumUtils.getEnumIgnoreCase(LegacyPartitionerType.class, partitionerType) == null
|| (EnumUtils.getEnumIgnoreCase(PartitionerType.class, partitionerType) == null
&& !partitionerType.equalsIgnoreCase("default"))) {
// `default` option is for backward compatibility
throw new IllegalArgumentException(
"partitionerType property not set properly, available options: "
+ Arrays.stream(LegacyPartitionerType.values())
+ Arrays.stream(PartitionerType.values())
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.joining(","))
);
}
if (LegacyPartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) {
if (PartitionerType.TIME.name().equalsIgnoreCase(partitionerType)) {
if (StringUtils.isNoneBlank(timePartitionPattern)) {
LOGGER.info("test timePartitionPattern is ok {} {}",
timePartitionPattern,
Expand All @@ -168,6 +161,7 @@ public void validate() {
checkArgument(StringUtils.endsWith(pathPrefix, "/"),
"pathPrefix must end with '/',the style is 'xx/xxx/'.");
}
pathPrefix = StringUtils.trimToEmpty(pathPrefix);

if ("bytes".equalsIgnoreCase(formatType)) {
checkArgument(StringUtils.isNotEmpty(bytesFormatTypeSeparator),
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/org/apache/pulsar/io/jcloud/batch/BatchContainer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.batch;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;

/**
* BatchContainer is used to store and manage batches of records.
* It keeps track of the current batch size and bytes, and checks
* if the batch needs to be flushed based on the max batch size, bytes, and time.
*/
public class BatchContainer {

private final long maxBatchSize;
private final long maxBatchBytes;
private final long maxBatchTimeMs;
private final AtomicLong currentBatchSize = new AtomicLong(0L);
private final AtomicLong currentBatchBytes = new AtomicLong(0L);
private final ArrayBlockingQueue<Record<GenericRecord>> pendingFlushQueue;
private volatile long lastPoolRecordsTime;

public BatchContainer(long maxBatchSize, long maxBatchBytes, long maxBatchTimeMs, int maxPendingQueueSize) {
this.maxBatchSize = maxBatchSize;
this.maxBatchBytes = maxBatchBytes;
this.maxBatchTimeMs = maxBatchTimeMs;
this.pendingFlushQueue = new ArrayBlockingQueue<>(maxPendingQueueSize);
this.lastPoolRecordsTime = System.currentTimeMillis();
}

public void add(Record<GenericRecord> record) throws InterruptedException {
pendingFlushQueue.put(record);
updateCurrentBatchSize(1);
updateCurrentBatchBytes(record.getMessage().get().size());
}

public long getCurrentBatchSize() {
return currentBatchSize.get();
}

public long getCurrentBatchBytes() {
return currentBatchBytes.get();
}

public void updateCurrentBatchSize(long delta) {
currentBatchSize.addAndGet(delta);
}

public void updateCurrentBatchBytes(long delta) {
currentBatchBytes.addAndGet(delta);
}

public boolean isEmpty() {
return pendingFlushQueue.isEmpty();
}

public boolean needFlush() {
long currentTime = System.currentTimeMillis();
return currentBatchSize.get() >= maxBatchSize
|| currentBatchBytes.get() >= maxBatchBytes
|| (currentTime - lastPoolRecordsTime) >= maxBatchTimeMs;
}

public List<Record<GenericRecord>> pollNeedFlushRecords() {
final List<Record<GenericRecord>> needFlushRecords = Lists.newArrayList();
long recordsToInsertBytes = 0;
while (!pendingFlushQueue.isEmpty() && needFlushRecords.size() < maxBatchSize
&& recordsToInsertBytes < maxBatchBytes) {
Record<GenericRecord> r = pendingFlushQueue.poll();
if (r != null) {
if (r.getMessage().isPresent()) {
long recordBytes = r.getMessage().get().size();
recordsToInsertBytes += recordBytes;
}
needFlushRecords.add(r);
}
}
updateCurrentBatchBytes(-1 * recordsToInsertBytes);
updateCurrentBatchSize(-1 * needFlushRecords.size());
lastPoolRecordsTime = System.currentTimeMillis();
return needFlushRecords;
}
}
Loading

0 comments on commit 424d17a

Please sign in to comment.