Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor thread mode and batch memory control #1142

Merged
merged 4 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/aws-s3-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |
Expand Down
1 change: 0 additions & 1 deletion docs/azure-blob-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ Before using the Azure Blob Storage sink connector, you need to configure it. Th
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |
Expand Down
1 change: 0 additions & 1 deletion docs/google-cloud-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ Before using the Google Cloud Storage sink connector, you need to configure it.
| `batchTimeMs` | long | False | false | 1000 | The interval for batch submission. |
| `maxBatchBytes` | long | False | false | 10000000 | The maximum number of bytes in a batch. |
| `batchModel` | Enum | False | false | BLEND | Determines how records are batched. Options: `BLEND`, `PARTITIONED`. The BLEND which combines all topic records into a single batch, optimizing for throughput, and PARTITIONED which batches records separately for each topic, maintaining topic-level separation. Note: When set to PARTITIONED, the connector will cache data up to the size of the number of subscribed topics multiplied by maxBatchBytes. This means you need to anticipate the connector's memory requirements in advance. |
| `pendingQueueSize` | int | False | false | 10 | The number of records buffered in queue. By default, it is equal to `batchSize`. You can set it manually. |
| `skipFailedMessages` | Boolean | False | false | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `withMetadata` | Boolean | False | false | false | Save message attributes to metadata. |
| `useHumanReadableMessageId` | Boolean | False | false | false | Use a human-readable format string for messageId in message metadata. The messageId is in a format like `ledgerId:entryId:partitionIndex:batchIndex`. Otherwise, the messageId is a Hex-encoded string. |
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<jackson.version>2.13.2</jackson.version>
<jackson-databind.version>2.13.4.2</jackson-databind.version>
<lombok.version>1.18.32</lombok.version>
<pulsar.version>3.2.2.1</pulsar.version>
<pulsar.version>4.0.0.8</pulsar.version>
<avro.version>1.11.4</avro.version>
<hadoop.version>3.3.6</hadoop.version>
<parquet.version>1.13.1</parquet.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ public class BlobStoreAbstractConfig implements Serializable {
private int batchSize = 10;
private long batchTimeMs = 1000;
private BatchModel batchModel = BatchModel.BLEND;
@Deprecated // never to use
private int pendingQueueSize = -1;
@Deprecated // never to use
private String partitioner;

// #### metadata configuration ####
private boolean withMetadata;
Expand Down Expand Up @@ -176,13 +179,6 @@ public void validate() {
+ "when formatType is 'json'.");
}

if (pendingQueueSize <= 0) {
pendingQueueSize = batchSize;
}
checkArgument(pendingQueueSize > 0, "pendingQueueSize must be a positive integer.");
checkArgument(pendingQueueSize >= batchSize, "pendingQueueSize must be larger than or "
+ "equal to batchSize");

if (avroCodec != null && (avroCodec.isEmpty() || avroCodec.equals("none"))) {
avroCodec = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.io.jcloud.batch;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;

Expand All @@ -37,21 +40,35 @@ public class BatchContainer {
private final long maxBatchTimeMs;
private final AtomicLong currentBatchSize = new AtomicLong(0L);
private final AtomicLong currentBatchBytes = new AtomicLong(0L);
private final ArrayBlockingQueue<Record<GenericRecord>> pendingFlushQueue;
private volatile long lastPoolRecordsTime;
private volatile long lastPollRecordsTime;
private final List<Record<GenericRecord>> pendingFlushList;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();

public BatchContainer(long maxBatchSize, long maxBatchBytes, long maxBatchTimeMs, int maxPendingQueueSize) {
public BatchContainer(long maxBatchSize, long maxBatchBytes, long maxBatchTimeMs) {
this.maxBatchSize = maxBatchSize;
this.maxBatchBytes = maxBatchBytes;
this.maxBatchTimeMs = maxBatchTimeMs;
this.pendingFlushQueue = new ArrayBlockingQueue<>(maxPendingQueueSize);
this.lastPoolRecordsTime = System.currentTimeMillis();
this.lastPollRecordsTime = System.currentTimeMillis();
this.pendingFlushList = new LinkedList<>();
}

public void add(Record<GenericRecord> record) throws InterruptedException {
pendingFlushQueue.put(record);
updateCurrentBatchSize(1);
updateCurrentBatchBytes(record.getMessage().get().size());
lock.lock();
try {
// Allow exceeding the maximum value once
long recordSize = record.getMessage().get().size();
pendingFlushList.add(record);
currentBatchSize.incrementAndGet();
currentBatchBytes.addAndGet(recordSize);

// Wait if the batch needs to be flushed
while (needFlush()) {
notFull.await();
}
Comment on lines +65 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we schedule the flush immediately here and so that we don't nee the notFull?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class does not handle flush.

} finally {
lock.unlock();
}
}

public long getCurrentBatchSize() {
Expand All @@ -62,42 +79,32 @@ public long getCurrentBatchBytes() {
return currentBatchBytes.get();
}

public void updateCurrentBatchSize(long delta) {
currentBatchSize.addAndGet(delta);
}

public void updateCurrentBatchBytes(long delta) {
currentBatchBytes.addAndGet(delta);
}

public boolean isEmpty() {
return pendingFlushQueue.isEmpty();
public List<Record<GenericRecord>> pollNeedFlushRecords() {
if (currentBatchSize.get() == 0) {
return Collections.emptyList();
}
lock.lock();
try {
if (!needFlush()) {
return Collections.emptyList();
}
List<Record<GenericRecord>> needFlushRecords = new ArrayList<>(pendingFlushList);
pendingFlushList.clear();
// Clear the pending list
currentBatchSize.set(0);
currentBatchBytes.set(0);
lastPollRecordsTime = System.currentTimeMillis();
return needFlushRecords;
} finally {
notFull.signalAll();
lock.unlock();
}
}

public boolean needFlush() {
private boolean needFlush() {
long currentTime = System.currentTimeMillis();
return currentBatchSize.get() >= maxBatchSize
|| currentBatchBytes.get() >= maxBatchBytes
|| (currentTime - lastPoolRecordsTime) >= maxBatchTimeMs;
}

public List<Record<GenericRecord>> pollNeedFlushRecords() {
final List<Record<GenericRecord>> needFlushRecords = Lists.newArrayList();
long recordsToInsertBytes = 0;
while (!pendingFlushQueue.isEmpty() && needFlushRecords.size() < maxBatchSize
&& recordsToInsertBytes < maxBatchBytes) {
Record<GenericRecord> r = pendingFlushQueue.poll();
if (r != null) {
if (r.getMessage().isPresent()) {
long recordBytes = r.getMessage().get().size();
recordsToInsertBytes += recordBytes;
}
needFlushRecords.add(r);
}
}
updateCurrentBatchBytes(-1 * recordsToInsertBytes);
updateCurrentBatchSize(-1 * needFlushRecords.size());
lastPoolRecordsTime = System.currentTimeMillis();
return needFlushRecords;
|| (currentTime - lastPollRecordsTime) >= maxBatchTimeMs;
}
}
Loading
Loading