Skip to content

Commit

Permalink
Refactor thread mode and batch memory control (#1142)
Browse files Browse the repository at this point in the history
* Refactor thread mode and batch memory control

* Update docs

* update pulsar version

* optimize list
  • Loading branch information
shibd authored Jan 6, 2025
1 parent 1a6a0cd commit 7ddce6a
Show file tree
Hide file tree
Showing 14 changed files with 290 additions and 374 deletions.
1 change: 0 additions & 1 deletion docs/aws-s3-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |
Expand Down
1 change: 0 additions & 1 deletion docs/azure-blob-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ Before using the Azure Blob Storage sink connector, you need to configure it. Th
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |
Expand Down
1 change: 0 additions & 1 deletion docs/google-cloud-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ Before using the Google Cloud Storage sink connector, you need to configure it.
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<jackson.version>2.13.2</jackson.version>
<jackson-databind.version>2.13.4.2</jackson-databind.version>
<lombok.version>1.18.32</lombok.version>
<pulsar.version>3.2.2.1</pulsar.version>
<pulsar.version>4.0.0.8</pulsar.version>
<avro.version>1.11.4</avro.version>
<hadoop.version>3.3.6</hadoop.version>
<parquet.version>1.13.1</parquet.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ public class BlobStoreAbstractConfig implements Serializable {
private int batchSize = 10;
private long batchTimeMs = 1000;
private BatchModel batchModel = BatchModel.BLEND;
@Deprecated // never to use
private int pendingQueueSize = -1;
@Deprecated // never to use
private String partitioner;

// #### metadata configuration ####
private boolean withMetadata;
Expand Down Expand Up @@ -176,13 +179,6 @@ public void validate() {
+ "when formatType is 'json'.");
}

if (pendingQueueSize <= 0) {
pendingQueueSize = batchSize;
}
checkArgument(pendingQueueSize > 0, "pendingQueueSize must be a positive integer.");
checkArgument(pendingQueueSize >= batchSize, "pendingQueueSize must be larger than or "
+ "equal to batchSize");

if (avroCodec != null && (avroCodec.isEmpty() || avroCodec.equals("none"))) {
avroCodec = null;
}
Expand Down
91 changes: 49 additions & 42 deletions src/main/java/org/apache/pulsar/io/jcloud/batch/BatchContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.io.jcloud.batch;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;

Expand All @@ -37,21 +40,35 @@ public class BatchContainer {
private final long maxBatchTimeMs;
private final AtomicLong currentBatchSize = new AtomicLong(0L);
private final AtomicLong currentBatchBytes = new AtomicLong(0L);
private final ArrayBlockingQueue<Record<GenericRecord>> pendingFlushQueue;
private volatile long lastPoolRecordsTime;
private volatile long lastPollRecordsTime;
private final List<Record<GenericRecord>> pendingFlushList;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();

public BatchContainer(long maxBatchSize, long maxBatchBytes, long maxBatchTimeMs, int maxPendingQueueSize) {
public BatchContainer(long maxBatchSize, long maxBatchBytes, long maxBatchTimeMs) {
this.maxBatchSize = maxBatchSize;
this.maxBatchBytes = maxBatchBytes;
this.maxBatchTimeMs = maxBatchTimeMs;
this.pendingFlushQueue = new ArrayBlockingQueue<>(maxPendingQueueSize);
this.lastPoolRecordsTime = System.currentTimeMillis();
this.lastPollRecordsTime = System.currentTimeMillis();
this.pendingFlushList = new LinkedList<>();
}

public void add(Record<GenericRecord> record) throws InterruptedException {
pendingFlushQueue.put(record);
updateCurrentBatchSize(1);
updateCurrentBatchBytes(record.getMessage().get().size());
lock.lock();
try {
// Allow exceeding the maximum value once
long recordSize = record.getMessage().get().size();
pendingFlushList.add(record);
currentBatchSize.incrementAndGet();
currentBatchBytes.addAndGet(recordSize);

// Wait if the batch needs to be flushed
while (needFlush()) {
notFull.await();
}
} finally {
lock.unlock();
}
}

public long getCurrentBatchSize() {
Expand All @@ -62,42 +79,32 @@ public long getCurrentBatchBytes() {
return currentBatchBytes.get();
}

public void updateCurrentBatchSize(long delta) {
currentBatchSize.addAndGet(delta);
}

public void updateCurrentBatchBytes(long delta) {
currentBatchBytes.addAndGet(delta);
}

public boolean isEmpty() {
return pendingFlushQueue.isEmpty();
public List<Record<GenericRecord>> pollNeedFlushRecords() {
if (currentBatchSize.get() == 0) {
return Collections.emptyList();
}
lock.lock();
try {
if (!needFlush()) {
return Collections.emptyList();
}
List<Record<GenericRecord>> needFlushRecords = new ArrayList<>(pendingFlushList);
pendingFlushList.clear();
// Clear the pending list
currentBatchSize.set(0);
currentBatchBytes.set(0);
lastPollRecordsTime = System.currentTimeMillis();
return needFlushRecords;
} finally {
notFull.signalAll();
lock.unlock();
}
}

public boolean needFlush() {
private boolean needFlush() {
long currentTime = System.currentTimeMillis();
return currentBatchSize.get() >= maxBatchSize
|| currentBatchBytes.get() >= maxBatchBytes
|| (currentTime - lastPoolRecordsTime) >= maxBatchTimeMs;
}

public List<Record<GenericRecord>> pollNeedFlushRecords() {
final List<Record<GenericRecord>> needFlushRecords = Lists.newArrayList();
long recordsToInsertBytes = 0;
while (!pendingFlushQueue.isEmpty() && needFlushRecords.size() < maxBatchSize
&& recordsToInsertBytes < maxBatchBytes) {
Record<GenericRecord> r = pendingFlushQueue.poll();
if (r != null) {
if (r.getMessage().isPresent()) {
long recordBytes = r.getMessage().get().size();
recordsToInsertBytes += recordBytes;
}
needFlushRecords.add(r);
}
}
updateCurrentBatchBytes(-1 * recordsToInsertBytes);
updateCurrentBatchSize(-1 * needFlushRecords.size());
lastPoolRecordsTime = System.currentTimeMillis();
return needFlushRecords;
|| (currentTime - lastPollRecordsTime) >= maxBatchTimeMs;
}
}
Loading

0 comments on commit 7ddce6a

Please sign in to comment.