Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Feb 19, 2025
1 parent 2d99bbe commit a10ab7b
Showing 1 changed file with 1 addition and 3 deletions.
4 changes: 1 addition & 3 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ impl<S: StateStore> SourceExecutor<S> {
Vec::default()
};
let is_pause_on_startup = first_barrier.is_pause_on_startup();
let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);

yield Message::Barrier(first_barrier);

Expand All @@ -492,9 +493,6 @@ impl<S: StateStore> SourceExecutor<S> {
};

core.split_state_store.init_epoch(first_epoch).await?;
// initial_dispatch_num is 0 means the source executor doesn't have downstream jobs
// and is newly created
let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0;
for ele in &mut boot_state {
if let Some(recover_state) = core
.split_state_store
Expand Down

0 comments on commit a10ab7b

Please sign in to comment.