Skip to content

Commit

Permalink
pause writer stream briefly
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 19, 2025
1 parent d65b3bb commit ac43a24
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ impl<S: LocalStateStore> WriteFuture<S> {

fn paused(
duration: Duration,
future: StreamFuture<BoxedMessageStream>,
stream: BoxedMessageStream,
write_state: LogStoreWriteState<S>,
) -> Self {
Self::Paused {
duration,
future,
future: stream.into_future(),
write_state,
}
}
Expand Down Expand Up @@ -355,7 +355,8 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
initial_write_state = write_state;
continue 'recreate_consume_stream;
} else {
write_future = WriteFuture::receive_from_upstream(
write_future = WriteFuture::paused(
Duration::from_millis(256),
stream,
write_state,
);
Expand All @@ -381,10 +382,21 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
end_seq_id,
);
} else {
write_future = WriteFuture::receive_from_upstream(
stream,
write_state,
);
// If buffer 90% full, pause the stream for a while, let downstream do some processing
// to avoid flushing.
if buffer.buffer.len() >= self.buffer_max_size * 9 / 10
{
write_future = WriteFuture::paused(
Duration::from_millis(256),
stream,
write_state,
);
} else {
write_future = WriteFuture::receive_from_upstream(
stream,
write_state,
);
}
}
}
// FIXME(kwannoel): This should truncate the logstore,
Expand Down

0 comments on commit ac43a24

Please sign in to comment.