Skip to content

Commit

Permalink
Merge branch 'main' into lr/double-vid
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszrzasik committed Mar 6, 2025
2 parents bcf6574 + 1d535da commit 7cb26ff
Show file tree
Hide file tree
Showing 21 changed files with 789 additions and 490 deletions.
456 changes: 236 additions & 220 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,13 @@ where
// Similarly, we can initialize the payload table with a null payload, which can help us
// distinguish between blocks that haven't been produced yet and blocks we haven't received
// yet when answering queries.
self.upsert("payload", ["height"], ["height"], [(height as i64,)])
.await?;
// We don't overwrite the payload if it already exists.
// During epoch transition in PoS, the same height block is sent multiple times.
// The first block may have the payload, but subsequent blocks might be missing it.
// Overwriting would cause the payload to be lost since the block height is the same
let query = query("INSERT INTO payload (height) VALUES ($1) ON CONFLICT DO NOTHING")
.bind(height as i64);
query.execute(self.as_mut()).await?;

// Finally, we insert the leaf itself, which references the header row we created.
// Serialize the full leaf and QC to JSON for easy storage.
Expand Down
65 changes: 0 additions & 65 deletions hotshot-task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use either::Either;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
Expand Down Expand Up @@ -143,70 +142,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskSt
tracing::debug!("Failed to handle Timeout event; error = {e}");
}
}
HotShotEvent::Qc2Formed(Either::Left(quorum_cert)) => {
let cert_view = quorum_cert.view_number();
if !self.upgrade_lock.epochs_enabled(cert_view).await {
tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
return Ok(());
}
if !self
.consensus
.read()
.await
.is_leaf_extended(quorum_cert.data.leaf_commit)
{
tracing::debug!("We formed QC but not eQC. Do nothing");
return Ok(());
}

let consensus_reader = self.consensus.read().await;
let Some(next_epoch_qc) = consensus_reader.next_epoch_high_qc() else {
tracing::debug!("We formed the current epoch eQC but we don't have the next epoch eQC at all.");
return Ok(());
};
if quorum_cert.view_number() != next_epoch_qc.view_number()
|| quorum_cert.data != *next_epoch_qc.data
{
tracing::debug!("We formed the current epoch eQC but we don't have the corresponding next epoch eQC.");
return Ok(());
}
drop(consensus_reader);

broadcast_event(
Arc::new(HotShotEvent::ExtendedQc2Formed(quorum_cert.clone())),
&sender,
)
.await;
}
HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
let cert_view = next_epoch_qc.view_number();
if !self.upgrade_lock.epochs_enabled(cert_view).await {
tracing::debug!("Next epoch QC2 formed but epochs not enabled. Do nothing");
return Ok(());
}
if !self
.consensus
.read()
.await
.is_leaf_extended(next_epoch_qc.data.leaf_commit)
{
tracing::debug!("We formed next epoch QC but not eQC. Do nothing");
return Ok(());
}

let consensus_reader = self.consensus.read().await;
let high_qc = consensus_reader.high_qc();
if high_qc.view_number() != next_epoch_qc.view_number()
|| high_qc.data != *next_epoch_qc.data
{
tracing::debug!("We formed the current epoch eQC but we don't have the corresponding next epoch eQC.");
return Ok(());
}
let high_qc = high_qc.clone();
drop(consensus_reader);

broadcast_event(Arc::new(HotShotEvent::ExtendedQc2Formed(high_qc)), &sender).await;
}
HotShotEvent::ExtendedQc2Formed(eqc) => {
let cert_view = eqc.view_number();
let cert_block_number = self
Expand Down
56 changes: 53 additions & 3 deletions hotshot-task-impls/src/quorum_proposal/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ use std::{
use crate::{
events::HotShotEvent,
helpers::{broadcast_event, parent_leaf_and_state, wait_for_next_epoch_qc},
quorum_proposal::{UpgradeLock, Versions},
quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
};
use anyhow::{ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use committable::Committable;
use committable::{Commitment, Committable};
use hotshot_task::dependency_task::HandleDepOutput;
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus},
data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
message::Proposal,
simple_certificate::{QuorumCertificate2, UpgradeCertificate},
traits::{
block_contents::BlockHeader, election::Membership, node_implementation::NodeType,
block_contents::BlockHeader,
election::Membership,
node_implementation::{NodeImplementation, NodeType},
signature_key::SignatureKey,
},
utils::{is_last_block_in_epoch, option_epoch_from_block_number},
Expand Down Expand Up @@ -505,3 +507,51 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
}
}
}

pub(super) async fn handle_eqc_formed<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
cert_view: TYPES::View,
leaf_commit: Commitment<Leaf2<TYPES>>,
task_state: &QuorumProposalTaskState<TYPES, I, V>,
event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) {
if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
return;
}
if !task_state
.consensus
.read()
.await
.is_leaf_extended(leaf_commit)
{
tracing::debug!("We formed QC but not eQC. Do nothing");
return;
}

let consensus_reader = task_state.consensus.read().await;
let current_epoch_qc = consensus_reader.high_qc();
let Some(next_epoch_qc) = consensus_reader.next_epoch_high_qc() else {
tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
return;
};
if current_epoch_qc.view_number() != next_epoch_qc.view_number()
|| current_epoch_qc.data != *next_epoch_qc.data
{
tracing::debug!(
"We formed the eQC but the current and next epoch QCs do not correspond to each other."
);
return;
}
let current_epoch_qc_clone = current_epoch_qc.clone();
drop(consensus_reader);

broadcast_event(
Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
event_sender,
)
.await;
}
13 changes: 13 additions & 0 deletions hotshot-task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use tracing::instrument;

use self::handlers::{ProposalDependency, ProposalDependencyHandle};
use crate::events::HotShotEvent;
use crate::quorum_proposal::handlers::handle_eqc_formed;

mod handlers;

Expand Down Expand Up @@ -437,6 +438,10 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
.await
.wrap()
.context(error!("Failed to update high QC in storage!"))?;

handle_eqc_formed(qc.view_number(), qc.data.leaf_commit, self, &event_sender)
.await;

let view_number = qc.view_number() + 1;
self.create_dependency_task_if_new(
view_number,
Expand Down Expand Up @@ -598,6 +603,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
.await
.wrap()
.context(error!("Failed to update next epoch high QC in storage!"))?;

handle_eqc_formed(
next_epoch_qc.view_number(),
next_epoch_qc.data.leaf_commit,
self,
&event_sender,
)
.await;
}
_ => {}
}
Expand Down
96 changes: 48 additions & 48 deletions hotshot-testing/tests/tests_6/test_epochs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,54 +491,54 @@ cross_tests!(
}
);

cross_tests!(
TestName: test_all_restart_epochs,
Impls: [CombinedImpl, PushCdnImpl],
Types: [TestTypes, TestTypesRandomizedLeader, TestTwoStakeTablesTypes],
Versions: [EpochsTestVersions],
Ignore: false,
Metadata: {
let timing_data = TimingData {
next_view_timeout: 2000,
..Default::default()
};
let mut metadata = TestDescription::default().set_num_nodes(20,20);
let mut catchup_nodes = vec![];

for i in 0..20 {
catchup_nodes.push(ChangeNode {
idx: i,
updown: NodeAction::RestartDown(0),
})
}

metadata.timing_data = timing_data;

metadata.spinning_properties = SpinningTaskDescription {
// Restart all the nodes in view 10
node_changes: vec![(10, catchup_nodes)],
};
metadata.view_sync_properties =
hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20);

metadata.completion_task_description =
CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_secs(60),
},
);
metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
// Make sure we keep committing rounds after the catchup, but not the full 50.
num_successful_views: 22,
expected_view_failures: vec![10],
possible_view_failures: vec![9, 11],
decide_timeout: Duration::from_secs(20),
..Default::default()
};

metadata
},
);
// cross_tests!(
// TestName: test_all_restart_epochs,
// Impls: [CombinedImpl, PushCdnImpl],
// Types: [TestTypes, TestTypesRandomizedLeader, TestTwoStakeTablesTypes],
// Versions: [EpochsTestVersions],
// Ignore: false,
// Metadata: {
// let timing_data = TimingData {
// next_view_timeout: 2000,
// ..Default::default()
// };
// let mut metadata = TestDescription::default().set_num_nodes(20,20);
// let mut catchup_nodes = vec![];
//
// for i in 0..20 {
// catchup_nodes.push(ChangeNode {
// idx: i,
// updown: NodeAction::RestartDown(0),
// })
// }
//
// metadata.timing_data = timing_data;
//
// metadata.spinning_properties = SpinningTaskDescription {
// // Restart all the nodes in view 10
// node_changes: vec![(10, catchup_nodes)],
// };
// metadata.view_sync_properties =
// hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20);
//
// metadata.completion_task_description =
// CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
// TimeBasedCompletionTaskDescription {
// duration: Duration::from_secs(60),
// },
// );
// metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
// // Make sure we keep committing rounds after the catchup, but not the full 50.
// num_successful_views: 22,
// expected_view_failures: vec![10],
// possible_view_failures: vec![9, 11],
// decide_timeout: Duration::from_secs(20),
// ..Default::default()
// };
//
// metadata
// },
// );

cross_tests!(
TestName: test_all_restart_one_da_with_epochs,
Expand Down
1 change: 0 additions & 1 deletion request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ async-trait = { workspace = true }
bincode = { workspace = true }
blake3 = { workspace = true }
byteorder = { version = "1", default-features = false }
derive_builder = { workspace = true }
derive_more = { workspace = true }
hotshot-types = { workspace = true }
parking_lot = { workspace = true }
Expand Down
Loading

0 comments on commit 7cb26ff

Please sign in to comment.