Skip to content

Commit 02a3cf0

Browse files
committed
store epochs and drbs
1 parent 0e574eb commit 02a3cf0

File tree

12 files changed

+434
-27
lines changed

12 files changed

+434
-27
lines changed

hotshot-example-types/src/storage_types.rs

+25
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::{
1212
use anyhow::{bail, Result};
1313
use async_lock::RwLock;
1414
use async_trait::async_trait;
15+
use hotshot_types::drb::DrbResult;
1516
use hotshot_types::{
1617
consensus::CommitmentMap,
1718
data::{
@@ -56,6 +57,8 @@ pub struct TestStorageState<TYPES: NodeType> {
5657
Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
5758
action: TYPES::View,
5859
epoch: Option<TYPES::Epoch>,
60+
drb_results: BTreeMap<TYPES::Epoch, DrbResult>,
61+
epoch_roots: BTreeMap<TYPES::Epoch, TYPES::BlockHeader>,
5962
}
6063

6164
impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
@@ -73,6 +76,8 @@ impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
7376
high_qc2: None,
7477
action: TYPES::View::genesis(),
7578
epoch: None,
79+
drb_results: BTreeMap::new(),
80+
epoch_roots: BTreeMap::new(),
7681
}
7782
}
7883
}
@@ -373,4 +378,24 @@ impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
373378

374379
Ok(())
375380
}
381+
382+
async fn add_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()> {
383+
let mut inner = self.inner.write().await;
384+
385+
inner.drb_results.insert(epoch, drb_result);
386+
387+
Ok(())
388+
}
389+
390+
async fn add_epoch_root(
391+
&self,
392+
epoch: TYPES::Epoch,
393+
block_header: TYPES::BlockHeader,
394+
) -> Result<()> {
395+
let mut inner = self.inner.write().await;
396+
397+
inner.epoch_roots.insert(epoch, block_header);
398+
399+
Ok(())
400+
}
376401
}

hotshot-task-impls/src/helpers.rs

+34-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use async_lock::RwLock;
99
use committable::{Commitment, Committable};
1010
use either::Either;
1111
use hotshot_task::dependency::{Dependency, EventDependency};
12+
use hotshot_types::traits::storage::Storage;
1213
use hotshot_types::{
1314
consensus::OuterConsensus,
1415
data::{Leaf2, QuorumProposalWrapper, ViewChangeEvidence2},
@@ -180,10 +181,11 @@ pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
180181
}
181182

182183
/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
183-
async fn decide_epoch_root<TYPES: NodeType>(
184+
async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
184185
decided_leaf: &Leaf2<TYPES>,
185186
epoch_height: u64,
186187
membership: &Arc<RwLock<TYPES::Membership>>,
188+
storage: &Arc<RwLock<I::Storage>>,
187189
) {
188190
let decided_block_number = decided_leaf.block_header().block_number();
189191

@@ -192,6 +194,19 @@ async fn decide_epoch_root<TYPES: NodeType>(
192194
let next_epoch_number =
193195
TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
194196

197+
if let Err(e) = storage
198+
.write()
199+
.await
200+
.add_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
201+
.await
202+
{
203+
tracing::error!(
204+
"Failed to store epoch root for epoch {:?}: {}",
205+
next_epoch_number,
206+
e
207+
);
208+
}
209+
195210
let write_callback = {
196211
tracing::debug!("Calling add_epoch_root for epoch {:?}", next_epoch_number);
197212
let membership_reader = membership.read().await;
@@ -251,13 +266,14 @@ impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
251266
/// # Panics
252267
/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
253268
/// impossible.
254-
pub async fn decide_from_proposal_2<TYPES: NodeType>(
269+
pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
255270
proposal: &QuorumProposalWrapper<TYPES>,
256271
consensus: OuterConsensus<TYPES>,
257272
existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
258273
public_key: &TYPES::SignatureKey,
259274
with_epochs: bool,
260275
membership: &Arc<RwLock<TYPES::Membership>>,
276+
storage: &Arc<RwLock<I::Storage>>,
261277
) -> LeafChainTraversalOutcome<TYPES> {
262278
let mut res = LeafChainTraversalOutcome::default();
263279
let consensus_reader = consensus.read().await;
@@ -332,7 +348,13 @@ pub async fn decide_from_proposal_2<TYPES: NodeType>(
332348
drop(consensus_reader);
333349

334350
for decided_leaf_info in &res.leaf_views {
335-
decide_epoch_root(&decided_leaf_info.leaf, epoch_height, membership).await;
351+
decide_epoch_root::<TYPES, I>(
352+
&decided_leaf_info.leaf,
353+
epoch_height,
354+
membership,
355+
storage,
356+
)
357+
.await;
336358
}
337359
}
338360

@@ -370,13 +392,14 @@ pub async fn decide_from_proposal_2<TYPES: NodeType>(
370392
/// # Panics
371393
/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
372394
/// impossible.
373-
pub async fn decide_from_proposal<TYPES: NodeType, V: Versions>(
395+
pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
374396
proposal: &QuorumProposalWrapper<TYPES>,
375397
consensus: OuterConsensus<TYPES>,
376398
existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
377399
public_key: &TYPES::SignatureKey,
378400
with_epochs: bool,
379401
membership: &Arc<RwLock<TYPES::Membership>>,
402+
storage: &Arc<RwLock<I::Storage>>,
380403
) -> LeafChainTraversalOutcome<TYPES> {
381404
let consensus_reader = consensus.read().await;
382405
let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
@@ -488,7 +511,13 @@ pub async fn decide_from_proposal<TYPES: NodeType, V: Versions>(
488511

489512
if with_epochs && res.new_decided_view_number.is_some() {
490513
for decided_leaf_info in &res.leaf_views {
491-
decide_epoch_root(&decided_leaf_info.leaf, epoch_height, membership).await;
514+
decide_epoch_root::<TYPES, I>(
515+
&decided_leaf_info.leaf,
516+
epoch_height,
517+
membership,
518+
storage,
519+
)
520+
.await;
492521
}
493522
}
494523

hotshot-task-impls/src/quorum_vote/handlers.rs

+29-6
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,25 @@ use crate::{
4848
quorum_vote::Versions,
4949
};
5050

51-
async fn notify_membership_of_drb_result<TYPES: NodeType>(
51+
async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
5252
membership: &EpochMembership<TYPES>,
53+
storage: &Arc<RwLock<I::Storage>>,
5354
drb_result: DrbResult,
5455
) {
5556
tracing::debug!("Calling add_drb_result for epoch {:?}", membership.epoch());
57+
58+
// membership.epoch should always be Some
59+
if let Some(epoch) = membership.epoch() {
60+
if let Err(e) = storage
61+
.write()
62+
.await
63+
.add_drb_result(epoch, drb_result)
64+
.await
65+
{
66+
tracing::error!("Failed to store drb result for epoch {:?}: {}", epoch, e);
67+
}
68+
}
69+
5670
membership.add_drb_result(drb_result).await;
5771
}
5872

@@ -101,11 +115,12 @@ async fn store_and_get_computed_drb_result<
101115
.insert(epoch_number, result);
102116
drop(consensus_writer);
103117

104-
notify_membership_of_drb_result::<TYPES>(
118+
handle_drb_result::<TYPES, I>(
105119
&task_state
106120
.membership
107121
.membership_for_epoch(Some(epoch_number))
108122
.await?,
123+
&task_state.storage,
109124
result,
110125
)
111126
.await;
@@ -221,7 +236,12 @@ async fn start_drb_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versio
221236
.drb_seeds_and_results
222237
.results
223238
.insert(*task_epoch, result);
224-
notify_membership_of_drb_result::<TYPES>(&epoch_membership, result).await;
239+
handle_drb_result::<TYPES, I>(
240+
&epoch_membership,
241+
&task_state.storage,
242+
result,
243+
)
244+
.await;
225245
task_state.drb_computation = None;
226246
}
227247
Err(e) => {
@@ -335,11 +355,12 @@ async fn store_drb_seed_and_result<TYPES: NodeType, I: NodeImplementation<TYPES>
335355
.drb_seeds_and_results
336356
.results
337357
.insert(current_epoch_number + 1, result);
338-
notify_membership_of_drb_result::<TYPES>(
358+
handle_drb_result::<TYPES, I>(
339359
&task_state
340360
.membership
341361
.membership_for_epoch(Some(current_epoch_number + 1))
342362
.await?,
363+
&task_state.storage,
343364
result,
344365
)
345366
.await;
@@ -379,23 +400,25 @@ pub(crate) async fn handle_quorum_proposal_validated<
379400
included_txns,
380401
decided_upgrade_cert,
381402
} = if version >= V::Epochs::VERSION {
382-
decide_from_proposal_2(
403+
decide_from_proposal_2::<TYPES, I>(
383404
proposal,
384405
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
385406
Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
386407
&task_state.public_key,
387408
version >= V::Epochs::VERSION,
388409
task_state.membership.membership(),
410+
&task_state.storage,
389411
)
390412
.await
391413
} else {
392-
decide_from_proposal::<TYPES, V>(
414+
decide_from_proposal::<TYPES, I, V>(
393415
proposal,
394416
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
395417
Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
396418
&task_state.public_key,
397419
version >= V::Epochs::VERSION,
398420
task_state.membership.membership(),
421+
&task_state.storage,
399422
)
400423
.await
401424
};

hotshot-testing/tests/tests_6/test_epochs.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use hotshot_example_types::{
88
node_types::{
99
CombinedImpl, EpochUpgradeTestVersions, EpochsTestVersions, Libp2pImpl, MemoryImpl,
1010
PushCdnImpl, RandomOverlapQuorumFilterConfig, StableQuorumFilterConfig,
11-
TestConsecutiveLeaderTypes, TestTwoStakeTablesTypes, TestTypes,
12-
TestTypesRandomizedCommitteeMembers, TestTypesRandomizedLeader, TestTypesEpochCatchupTypes
11+
TestConsecutiveLeaderTypes, TestTwoStakeTablesTypes, TestTypes, TestTypesEpochCatchupTypes,
12+
TestTypesRandomizedCommitteeMembers, TestTypesRandomizedLeader,
1313
},
1414
testable_delay::{DelayConfig, DelayOptions, DelaySettings, SupportedTraitTypesForAsyncDelay},
1515
};
@@ -505,23 +505,23 @@ cross_tests!(
505505
// };
506506
// let mut metadata = TestDescription::default().set_num_nodes(20,20);
507507
// let mut catchup_nodes = vec![];
508-
//
508+
//
509509
// for i in 0..20 {
510510
// catchup_nodes.push(ChangeNode {
511511
// idx: i,
512512
// updown: NodeAction::RestartDown(0),
513513
// })
514514
// }
515-
//
515+
//
516516
// metadata.timing_data = timing_data;
517-
//
517+
//
518518
// metadata.spinning_properties = SpinningTaskDescription {
519519
// // Restart all the nodes in view 10
520520
// node_changes: vec![(10, catchup_nodes)],
521521
// };
522522
// metadata.view_sync_properties =
523523
// hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20);
524-
//
524+
//
525525
// metadata.completion_task_description =
526526
// CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
527527
// TimeBasedCompletionTaskDescription {
@@ -536,7 +536,7 @@ cross_tests!(
536536
// decide_timeout: Duration::from_secs(20),
537537
// ..Default::default()
538538
// };
539-
//
539+
//
540540
// metadata
541541
// },
542542
// );

hotshot-types/src/traits/storage.rs

+9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{
2424
DaProposal, DaProposal2, Leaf, Leaf2, QuorumProposal, QuorumProposal2,
2525
QuorumProposalWrapper, VidDisperseShare,
2626
},
27+
drb::DrbResult,
2728
event::HotShotAction,
2829
message::{convert_proposal, Proposal},
2930
simple_certificate::{
@@ -158,4 +159,12 @@ pub trait Storage<TYPES: NodeType>: Send + Sync + Clone {
158159
async fn migrate_consensus(&self) -> Result<()> {
159160
Ok(())
160161
}
162+
/// Add a drb result
163+
async fn add_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()>;
164+
/// Add an epoch block header
165+
async fn add_epoch_root(
166+
&self,
167+
epoch: TYPES::Epoch,
168+
block_header: TYPES::BlockHeader,
169+
) -> Result<()>;
161170
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE TABLE epoch_drb_and_root (
2+
epoch BIGINT PRIMARY KEY,
3+
drb_result BYTEA,
4+
block_header BYTEA
5+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE TABLE epoch_drb_and_root (
2+
epoch BIGINT PRIMARY KEY,
3+
drb_result BLOB,
4+
block_header BLOB
5+
);

sequencer/src/persistence.rs

+47
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod persistence_tests {
5454
Event, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState,
5555
};
5656
use hotshot::types::{BLSPubKey, SignatureKey};
57+
use hotshot::InitializerEpochInfo;
5758
use hotshot_example_types::node_types::TestVersions;
5859
use hotshot_query_service::testing::mocks::MockVersions;
5960
use hotshot_types::{
@@ -156,6 +157,52 @@ mod persistence_tests {
156157
);
157158
}
158159

160+
#[tokio::test(flavor = "multi_thread")]
161+
pub async fn test_epoch_info<P: TestablePersistence>() {
162+
setup_test();
163+
164+
let tmp = P::tmp_storage().await;
165+
let storage = P::connect(&tmp).await;
166+
167+
// Initially, there is no saved info.
168+
assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
169+
170+
// Store a drb result.
171+
storage
172+
.add_drb_result(EpochNumber::new(1), [1; 32])
173+
.await
174+
.unwrap();
175+
assert_eq!(
176+
storage.load_start_epoch_info().await.unwrap(),
177+
vec![InitializerEpochInfo::<SeqTypes> {
178+
epoch: EpochNumber::new(1),
179+
drb_result: [1; 32],
180+
block_header: None,
181+
}]
182+
);
183+
184+
// Store a second DRB result
185+
storage
186+
.add_drb_result(EpochNumber::new(2), [3; 32])
187+
.await
188+
.unwrap();
189+
assert_eq!(
190+
storage.load_start_epoch_info().await.unwrap(),
191+
vec![
192+
InitializerEpochInfo::<SeqTypes> {
193+
epoch: EpochNumber::new(1),
194+
drb_result: [1; 32],
195+
block_header: None,
196+
},
197+
InitializerEpochInfo::<SeqTypes> {
198+
epoch: EpochNumber::new(2),
199+
drb_result: [3; 32],
200+
block_header: None,
201+
}
202+
]
203+
);
204+
}
205+
159206
fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
160207
LeafInfo {
161208
leaf,

0 commit comments

Comments
 (0)