Skip to content

Commit

Permalink
make configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 20, 2025
1 parent ac43a24 commit 88eba50
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 14 deletions.
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,8 @@ message RowMergeNode {

message SyncLogStoreNode {
catalog.Table log_store_table = 1;
uint32 pause_duration_ms = 2;
uint32 buffer_size = 3;
}

message StreamNode {
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ pub struct SessionConfig {
/// Whether the streaming join should be unaligned or not.
#[parameter(default = false)]
streaming_enable_unaligned_join: bool,

/// The timeout for reading from the buffer of the sync log store on barrier.
/// Every epoch we will attempt to read the full buffer of the sync log store.
/// If we hit the timeout, we will stop reading and continue.
#[parameter(default = 256_usize)]
streaming_sync_log_store_pause_duration_ms: usize,

/// The max buffer size for sync logstore, before we start flushing.
#[parameter(default = 2048_usize)]
streaming_sync_log_store_buffer_size: usize,
}

fn check_iceberg_engine_connection(val: &str) -> Result<(), String> {
Expand Down
36 changes: 32 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_sync_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use pretty_xmlish::XmlNode;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::SyncLogStoreNode;

Expand All @@ -32,6 +32,8 @@ use crate::PlanRef;
pub struct StreamSyncLogStore {
pub base: PlanBase<Stream>,
pub input: PlanRef,
pub buffer_size: usize,
pub pause_duration_ms: usize,
}

impl StreamSyncLogStore {
Expand All @@ -47,13 +49,35 @@ impl StreamSyncLogStore {
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
Self { base, input }
let pause_duration_ms = input
.ctx()
.session_ctx()
.config()
.streaming_sync_log_store_pause_duration_ms();
let buffer_size = input
.ctx()
.session_ctx()
.config()
.streaming_sync_log_store_buffer_size();
Self {
base,
input,
buffer_size,
pause_duration_ms,
}
}
}

impl Distill for StreamSyncLogStore {
fn distill<'a>(&self) -> XmlNode<'a> {
childless_record("StreamSyncLogStore", vec![])
let fields = vec![
("buffer_size", Pretty::display(&self.buffer_size)),
(
"pause_duration_ms",
Pretty::display(&self.pause_duration_ms),
),
];
childless_record("StreamSyncLogStore", fields)
}
}

Expand All @@ -76,7 +100,11 @@ impl StreamNode for StreamSyncLogStore {
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost()
.into();
NodeBody::SyncLogStore(Box::new(SyncLogStoreNode { log_store_table }))
NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
log_store_table,
pause_duration_ms: self.pause_duration_ms as _,
buffer_size: self.buffer_size as _,
}))
}
}

Expand Down
23 changes: 15 additions & 8 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,22 @@ pub struct SyncedKvLogStoreExecutor<S: StateStore> {

// Log store state
state_store: S,
buffer_max_size: usize,
buffer_size: usize,

pause_duration_ms: Duration,
}
// Stream interface
impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
actor_context: ActorContextRef,
table_id: u32,
metrics: KvLogStoreMetrics,
serde: LogStoreRowSerde,
state_store: S,
buffer_max_size: usize,
buffer_size: usize,
upstream: Executor,
pause_duration_ms: Duration,
) -> Self {
Self {
actor_context,
Expand All @@ -131,7 +135,8 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
serde,
state_store,
upstream,
buffer_max_size,
buffer_size,
pause_duration_ms,
}
}
}
Expand Down Expand Up @@ -301,7 +306,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
let mut truncation_offset = None;
let mut buffer = SyncedLogStoreBuffer {
buffer: VecDeque::new(),
max_size: self.buffer_max_size,
max_size: self.buffer_size,
next_chunk_id: 0,
metrics: self.metrics.clone(),
};
Expand Down Expand Up @@ -356,7 +361,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
continue 'recreate_consume_stream;
} else {
write_future = WriteFuture::paused(
Duration::from_millis(256),
self.pause_duration_ms,
stream,
write_state,
);
Expand Down Expand Up @@ -384,10 +389,9 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
} else {
// If buffer 90% full, pause the stream for a while, let downstream do some processing
// to avoid flushing.
if buffer.buffer.len() >= self.buffer_max_size * 9 / 10
{
if buffer.buffer.len() >= self.buffer_size * 9 / 10 {
write_future = WriteFuture::paused(
Duration::from_millis(256),
self.pause_duration_ms,
stream,
write_state,
);
Expand Down Expand Up @@ -795,6 +799,7 @@ mod tests {
MemoryStateStore::new(),
10,
source,
Duration::from_millis(256),
)
.boxed();

Expand Down Expand Up @@ -886,6 +891,7 @@ mod tests {
MemoryStateStore::new(),
10,
source,
Duration::from_millis(256),
)
.boxed();

Expand Down Expand Up @@ -975,6 +981,7 @@ mod tests {
MemoryStateStore::new(),
0,
source,
Duration::from_millis(256),
)
.boxed();

Expand Down
7 changes: 5 additions & 2 deletions src/stream/src/from_proto/sync_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use risingwave_pb::stream_plan::SyncLogStoreNode;
use risingwave_storage::StateStore;
use tokio::time::Duration;

use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
use crate::common::log_store_impl::kv_log_store::{KvLogStoreMetrics, KV_LOG_STORE_V2_INFO};
Expand Down Expand Up @@ -56,10 +57,11 @@ impl ExecutorBuilder for SyncLogStoreExecutorBuilder {
params.vnode_bitmap.map(|b| b.into()),
&KV_LOG_STORE_V2_INFO,
);
// FIXME(kwannoel): Make configurable
let buffer_max_size = 1000;
let [upstream] = params.input.try_into().unwrap();

let pause_duration_ms = node.pause_duration_ms as _;
let buffer_max_size = node.buffer_size as usize;

let executor = SyncedKvLogStoreExecutor::new(
actor_context,
table_id,
Expand All @@ -68,6 +70,7 @@ impl ExecutorBuilder for SyncLogStoreExecutorBuilder {
store,
buffer_max_size,
upstream,
Duration::from_millis(pause_duration_ms),
);
Ok((params.info, executor).into())
}
Expand Down

0 comments on commit 88eba50

Please sign in to comment.