Skip to content

Commit

Permalink
store epochs and drbs
Browse files Browse the repository at this point in the history
  • Loading branch information
pls148 committed Mar 7, 2025
1 parent 0e574eb commit e3c46e3
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 28 deletions.
25 changes: 25 additions & 0 deletions hotshot-example-types/src/storage_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
use anyhow::{bail, Result};
use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_types::drb::DrbResult;
use hotshot_types::{
consensus::CommitmentMap,
data::{
Expand Down Expand Up @@ -56,6 +57,8 @@ pub struct TestStorageState<TYPES: NodeType> {
Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
action: TYPES::View,
epoch: Option<TYPES::Epoch>,
drb_results: BTreeMap<TYPES::Epoch, DrbResult>,
epoch_roots: BTreeMap<TYPES::Epoch, TYPES::BlockHeader>,
}

impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
Expand All @@ -73,6 +76,8 @@ impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
high_qc2: None,
action: TYPES::View::genesis(),
epoch: None,
drb_results: BTreeMap::new(),
epoch_roots: BTreeMap::new(),
}
}
}
Expand Down Expand Up @@ -373,4 +378,24 @@ impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {

Ok(())
}

async fn add_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()> {
let mut inner = self.inner.write().await;

inner.drb_results.insert(epoch, drb_result);

Ok(())
}

async fn add_epoch_root(
&self,
epoch: TYPES::Epoch,
block_header: TYPES::BlockHeader,
) -> Result<()> {
let mut inner = self.inner.write().await;

inner.epoch_roots.insert(epoch, block_header);

Ok(())
}
}
39 changes: 34 additions & 5 deletions hotshot-task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use async_lock::RwLock;
use committable::{Commitment, Committable};
use either::Either;
use hotshot_task::dependency::{Dependency, EventDependency};
use hotshot_types::traits::storage::Storage;
use hotshot_types::{
consensus::OuterConsensus,
data::{Leaf2, QuorumProposalWrapper, ViewChangeEvidence2},
Expand Down Expand Up @@ -180,10 +181,11 @@ pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
}

/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
async fn decide_epoch_root<TYPES: NodeType>(
async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
decided_leaf: &Leaf2<TYPES>,
epoch_height: u64,
membership: &Arc<RwLock<TYPES::Membership>>,
storage: &Arc<RwLock<I::Storage>>,
) {
let decided_block_number = decided_leaf.block_header().block_number();

Expand All @@ -192,6 +194,19 @@ async fn decide_epoch_root<TYPES: NodeType>(
let next_epoch_number =
TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);

if let Err(e) = storage
.write()
.await
.add_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
.await
{
tracing::error!(
"Failed to store epoch root for epoch {:?}: {}",
next_epoch_number,
e
);
}

let write_callback = {
tracing::debug!("Calling add_epoch_root for epoch {:?}", next_epoch_number);
let membership_reader = membership.read().await;
Expand Down Expand Up @@ -251,13 +266,14 @@ impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
/// # Panics
/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
/// impossible.
pub async fn decide_from_proposal_2<TYPES: NodeType>(
pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
proposal: &QuorumProposalWrapper<TYPES>,
consensus: OuterConsensus<TYPES>,
existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
public_key: &TYPES::SignatureKey,
with_epochs: bool,
membership: &Arc<RwLock<TYPES::Membership>>,
storage: &Arc<RwLock<I::Storage>>,
) -> LeafChainTraversalOutcome<TYPES> {
let mut res = LeafChainTraversalOutcome::default();
let consensus_reader = consensus.read().await;
Expand Down Expand Up @@ -332,7 +348,13 @@ pub async fn decide_from_proposal_2<TYPES: NodeType>(
drop(consensus_reader);

for decided_leaf_info in &res.leaf_views {
decide_epoch_root(&decided_leaf_info.leaf, epoch_height, membership).await;
decide_epoch_root::<TYPES, I>(
&decided_leaf_info.leaf,
epoch_height,
membership,
storage,
)
.await;
}
}

Expand Down Expand Up @@ -370,13 +392,14 @@ pub async fn decide_from_proposal_2<TYPES: NodeType>(
/// # Panics
/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
/// impossible.
pub async fn decide_from_proposal<TYPES: NodeType, V: Versions>(
pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
proposal: &QuorumProposalWrapper<TYPES>,
consensus: OuterConsensus<TYPES>,
existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
public_key: &TYPES::SignatureKey,
with_epochs: bool,
membership: &Arc<RwLock<TYPES::Membership>>,
storage: &Arc<RwLock<I::Storage>>,
) -> LeafChainTraversalOutcome<TYPES> {
let consensus_reader = consensus.read().await;
let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
Expand Down Expand Up @@ -488,7 +511,13 @@ pub async fn decide_from_proposal<TYPES: NodeType, V: Versions>(

if with_epochs && res.new_decided_view_number.is_some() {
for decided_leaf_info in &res.leaf_views {
decide_epoch_root(&decided_leaf_info.leaf, epoch_height, membership).await;
decide_epoch_root::<TYPES, I>(
&decided_leaf_info.leaf,
epoch_height,
membership,
storage,
)
.await;
}
}

Expand Down
40 changes: 32 additions & 8 deletions hotshot-task-impls/src/quorum_vote/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,27 @@ use crate::{
quorum_vote::Versions,
};

async fn notify_membership_of_drb_result<TYPES: NodeType>(
membership: &EpochMembership<TYPES>,
async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
epoch_membership: &EpochMembership<TYPES>,
storage: &Arc<RwLock<I::Storage>>,
drb_result: DrbResult,
) {
tracing::debug!("Calling add_drb_result for epoch {:?}", membership.epoch());
membership.add_drb_result(drb_result).await;

if let Err(e) = storage
.write()
.await
.add_drb_result(membership.epoch(), drb_result)
.await
{
tracing::error!(
"Failed to store drb result for epoch {:?}: {}",
membership.epoch(),
e
);
}

membership.add_drb_result(drb_result);
}

/// Store the DRB result from the computation task to the shared `results` table.
Expand Down Expand Up @@ -101,11 +116,12 @@ async fn store_and_get_computed_drb_result<
.insert(epoch_number, result);
drop(consensus_writer);

notify_membership_of_drb_result::<TYPES>(
handle_drb_result::<TYPES, I>(
&task_state
.membership
.membership_for_epoch(Some(epoch_number))
.await?,
&task_state.storage,
result,
)
.await;
Expand Down Expand Up @@ -221,7 +237,12 @@ async fn start_drb_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versio
.drb_seeds_and_results
.results
.insert(*task_epoch, result);
notify_membership_of_drb_result::<TYPES>(&epoch_membership, result).await;
handle_drb_result::<TYPES, I>(
&epoch_membership,
&task_state.storage,
result,
)
.await;
task_state.drb_computation = None;
}
Err(e) => {
Expand Down Expand Up @@ -335,11 +356,12 @@ async fn store_drb_seed_and_result<TYPES: NodeType, I: NodeImplementation<TYPES>
.drb_seeds_and_results
.results
.insert(current_epoch_number + 1, result);
notify_membership_of_drb_result::<TYPES>(
handle_drb_result::<TYPES, I>(
&task_state
.membership
.membership_for_epoch(Some(current_epoch_number + 1))
.await?,
&task_state.storage,
result,
)
.await;
Expand Down Expand Up @@ -379,23 +401,25 @@ pub(crate) async fn handle_quorum_proposal_validated<
included_txns,
decided_upgrade_cert,
} = if version >= V::Epochs::VERSION {
decide_from_proposal_2(
decide_from_proposal_2::<TYPES, I>(
proposal,
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
&task_state.public_key,
version >= V::Epochs::VERSION,
task_state.membership.membership(),
&task_state.storage,
)
.await
} else {
decide_from_proposal::<TYPES, V>(
decide_from_proposal::<TYPES, I, V>(
proposal,
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
&task_state.public_key,
version >= V::Epochs::VERSION,
task_state.membership.membership(),
&task_state.storage,
)
.await
};
Expand Down
14 changes: 7 additions & 7 deletions hotshot-testing/tests/tests_6/test_epochs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use hotshot_example_types::{
node_types::{
CombinedImpl, EpochUpgradeTestVersions, EpochsTestVersions, Libp2pImpl, MemoryImpl,
PushCdnImpl, RandomOverlapQuorumFilterConfig, StableQuorumFilterConfig,
TestConsecutiveLeaderTypes, TestTwoStakeTablesTypes, TestTypes,
TestTypesRandomizedCommitteeMembers, TestTypesRandomizedLeader, TestTypesEpochCatchupTypes
TestConsecutiveLeaderTypes, TestTwoStakeTablesTypes, TestTypes, TestTypesEpochCatchupTypes,
TestTypesRandomizedCommitteeMembers, TestTypesRandomizedLeader,
},
testable_delay::{DelayConfig, DelayOptions, DelaySettings, SupportedTraitTypesForAsyncDelay},
};
Expand Down Expand Up @@ -505,23 +505,23 @@ cross_tests!(
// };
// let mut metadata = TestDescription::default().set_num_nodes(20,20);
// let mut catchup_nodes = vec![];
//
//
// for i in 0..20 {
// catchup_nodes.push(ChangeNode {
// idx: i,
// updown: NodeAction::RestartDown(0),
// })
// }
//
//
// metadata.timing_data = timing_data;
//
//
// metadata.spinning_properties = SpinningTaskDescription {
// // Restart all the nodes in view 10
// node_changes: vec![(10, catchup_nodes)],
// };
// metadata.view_sync_properties =
// hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20);
//
//
// metadata.completion_task_description =
// CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
// TimeBasedCompletionTaskDescription {
Expand All @@ -536,7 +536,7 @@ cross_tests!(
// decide_timeout: Duration::from_secs(20),
// ..Default::default()
// };
//
//
// metadata
// },
// );
Expand Down
9 changes: 9 additions & 0 deletions hotshot-types/src/traits/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
DaProposal, DaProposal2, Leaf, Leaf2, QuorumProposal, QuorumProposal2,
QuorumProposalWrapper, VidDisperseShare,
},
drb::DrbResult,
event::HotShotAction,
message::{convert_proposal, Proposal},
simple_certificate::{
Expand Down Expand Up @@ -158,4 +159,12 @@ pub trait Storage<TYPES: NodeType>: Send + Sync + Clone {
async fn migrate_consensus(&self) -> Result<()> {
Ok(())
}
/// Add a drb result
async fn add_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()>;
/// Add an epoch block header
async fn add_epoch_root(
&self,
epoch: TYPES::Epoch,
block_header: TYPES::BlockHeader,
) -> Result<()>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE epoch_drb_and_root (
epoch BIGINT PRIMARY KEY,
drb_result BYTEA,
block_header BYTEA,
);
5 changes: 5 additions & 0 deletions sequencer/api/migrations/sqlite/V302__epoch_drb_and_root.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE epoch_drb_and_root (
epoch BIGINT PRIMARY KEY,
drb_result BLOB,
block_header BLOB,
);
Loading

0 comments on commit e3c46e3

Please sign in to comment.