Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor thread mode and batch memory control #1142

Merged
merged 4 commits into from
Jan 6, 2025

Conversation

shibd
Copy link
Member

@shibd shibd commented Dec 16, 2024

Motivation

It's difficult to control the actual size of each batch with the current batching because using only a blocking queue doesn't allow for backpressure based on byte size.

Modifications

  1. Refactor thread mode, the connector framework to write record, and fixed a thread to look poll message and flush data.
  2. Refactor BatchContainer, if message exceed BatchSize or BatchBytesSize will bock write thread.

Verifying this change

  • A simple approach is to ensure that CloudStorageSinkBatchBlendTest and CloudStorageSinkBatchPartitionedTest pass directly, proving there are no breaking changes.
  • Refactor batch mode unit test.
  • I will verify this change on end-to-end test.

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@shibd shibd self-assigned this Dec 16, 2024
@shibd shibd requested a review from a team as a code owner December 16, 2024 15:56
@github-actions github-actions bot added the doc This pr contains a document label Dec 16, 2024
Comment on lines +65 to +68
// Wait if the batch needs to be flushed
while (needFlush()) {
notFull.await();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we schedule the flush immediately here and so that we don't nee the notFull?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class does not handle flush.

@shibd shibd requested a review from RobertIndie January 2, 2025 03:36
@shibd shibd merged commit 7ddce6a into streamnative:master Jan 6, 2025
1 check passed
shibd added a commit to shibd/pulsar-io-cloud-storage that referenced this pull request Jan 7, 2025
* Refactor thread mode and batch memory control

* Update docs

* update pulsar version

* optimize list

(cherry picked from commit 7ddce6a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc This pr contains a document
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants