Skip to content

Commit

Permalink
chore(starknet_consensus_orchestrator): add function to await the sec…
Browse files Browse the repository at this point in the history
…ond proposal part message (#4043)
  • Loading branch information
matan-starkware authored Feb 11, 2025
1 parent 8fa30e8 commit 957059f
Showing 1 changed file with 55 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use starknet_state_sync_types::communication::SharedStateSyncClient;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use starknet_types_core::felt::Felt;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, error_span, info, instrument, trace, warn, Instrument};
Expand Down Expand Up @@ -766,53 +767,18 @@ async fn validate_proposal(
) {
let mut content = Vec::new();
let deadline = tokio::time::Instant::now() + timeout;
// Validating first proposal part, it should be fin in case this is an empty proposal,
// or block info.
tokio::select! {
_ = cancel_token.cancelled() => {
warn!("Proposal interrupted");
return;
}
_ = tokio::time::sleep_until(deadline) => {
warn!("Validation timed out.");
return;
}
proposal_part = content_receiver.next() => {
match proposal_part {
None => {
warn!("Failed to receive proposal content");
return;
}
Some(ProposalPart::Fin(ProposalFin { proposal_commitment: id })) => {
warn!("Received an empty proposal.");
if fin_sender
.send((EMPTY_BLOCK_COMMITMENT, ProposalFin { proposal_commitment: id }))
.is_err()
{
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content ids");
}
return;
}
Some(ProposalPart::BlockInfo(block_info)) => {
if !valid_block_info(block_info_validation.clone(), block_info.clone()).await {
warn!("Invalid BlockInfo.");
return;
}
initiate_validation(
batcher,
block_info,
proposal_id,
timeout,
)
.await;
}
_ => {
panic!("Unexpected proposal part: {proposal_part:?}");
}
}
}
let Some((block_info, fin_sender)) =
await_second_proposal_part(&cancel_token, deadline, &mut content_receiver, fin_sender)
.await
else {
return;
};
if !valid_block_info(block_info_validation.clone(), block_info.clone()).await {
warn!("Invalid BlockInfo.");
return;
}
initiate_validation(batcher, block_info, proposal_id, timeout).await;

// Validating the rest of the proposal parts.
let (built_block, received_fin) = loop {
tokio::select! {
Expand Down Expand Up @@ -880,6 +846,49 @@ async fn valid_block_info(
&& block_info.l1_da_mode == block_info_validation.l1_da_mode
}

// The second proposal part when validating a proposal must be:
// 1. Fin - empty proposal.
// 2. BlockInfo - required to begin executing TX batches.
async fn await_second_proposal_part(
cancel_token: &CancellationToken,
deadline: Instant,
content_receiver: &mut mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalCommitment, ProposalFin)>,
) -> Option<(ConsensusBlockInfo, oneshot::Sender<(ProposalCommitment, ProposalFin)>)> {
tokio::select! {
_ = cancel_token.cancelled() => {
warn!("Proposal interrupted");
None
}
_ = tokio::time::sleep_until(deadline) => {
warn!("Validation timed out.");
None
}
proposal_part = content_receiver.next() => {
match proposal_part {
Some(ProposalPart::BlockInfo(block_info)) => {
Some((block_info, fin_sender))
}
Some(ProposalPart::Fin(ProposalFin { proposal_commitment })) => {
warn!("Received an empty proposal.");
if fin_sender
.send((EMPTY_BLOCK_COMMITMENT, ProposalFin { proposal_commitment }))
.is_err()
{
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content ids");
}
None
}
x => {
warn!("Invalid second proposal part: {x:?}");
None
}
}
}
}
}

async fn initiate_validation(
batcher: &dyn BatcherClient,
block_info: ConsensusBlockInfo,
Expand Down

0 comments on commit 957059f

Please sign in to comment.