Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): send barriers in batch when possible #19932

Merged
merged 15 commits into from
Feb 20, 2025
11 changes: 11 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ message StreamMessage {
}
}

message StreamMessageBatch {
message BarrierBatch {
repeated Barrier barriers = 1;
}
oneof stream_message_batch {
data.StreamChunk stream_chunk = 1;
BarrierBatch barrier_batch = 2;
Watermark watermark = 3;
}
}

// Hash mapping for compute node. Stores mapping from virtual node to actor id.
message ActorMapping {
repeated uint32 original_indices = 1;
Expand Down
2 changes: 1 addition & 1 deletion proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
}

message GetStreamResponse {
stream_plan.StreamMessage message = 1;
stream_plan.StreamMessageBatch message = 1;

Check failure on line 141 in proto/task_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "message" on message "GetStreamResponse" changed type from "stream_plan.StreamMessage" to "stream_plan.StreamMessageBatch".
// The number of permits acquired for this message, which should be sent back to the upstream with `add_permits`.
// In theory, this can also be derived from the message itself by the receiver. Here we send it explicitly to
// avoid any sense of inconsistency for the derivation logic, so the receiver can simply send it back verbatim.
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,10 @@ pub struct StreamingDeveloperConfig {
/// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
pub switch_jdbc_pg_to_native: bool,

/// The maximum number of consecutive barriers allowed in a message when sent between actors.
#[serde(default = "default::developer::stream_max_barrier_batch_size")]
pub max_barrier_batch_size: u32,

/// Configure the system-wide cache row cardinality of hash join.
/// For example, if this is set to 1000, it means we can have at most 1000 rows in cache.
#[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
Expand Down Expand Up @@ -2031,6 +2035,10 @@ pub mod default {
32768
}

pub fn stream_max_barrier_batch_size() -> u32 {
1024
}

pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
64 << 20 // 64MB
}
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_pb::task_service::{
permits, GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
};
use risingwave_stream::executor::exchange::permit::{MessageWithPermits, Receiver};
use risingwave_stream::executor::DispatcherMessage;
use risingwave_stream::executor::DispatcherMessageBatch;
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -178,7 +178,7 @@ impl ExchangeServiceImpl {
message: Some(proto),
permits: Some(PbPermits { value: permits }),
};
let bytes = DispatcherMessage::get_encoded_len(&response);
let bytes = DispatcherMessageBatch::get_encoded_len(&response);

yield response;

Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true
stream_enable_shared_source = true
stream_switch_jdbc_pg_to_native = false
stream_max_barrier_batch_size = 1024
stream_hash_join_entry_state_max_rows = 30000

[storage]
Expand Down
Loading
Loading