diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 979fda6eb9e85..555547cbe3716 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -472,6 +472,7 @@ impl SourceExecutor { Vec::default() }; let is_pause_on_startup = first_barrier.is_pause_on_startup(); + let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id); yield Message::Barrier(first_barrier); @@ -492,9 +493,6 @@ impl SourceExecutor { }; core.split_state_store.init_epoch(first_epoch).await?; - // initial_dispatch_num is 0 means the source executor doesn't have downstream jobs - // and is newly created - let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core .split_state_store