Skip to content

Commit 54d79a3

Browse files
committed
more fixes
1 parent 389d5ae commit 54d79a3

File tree

12 files changed

+150
-105
lines changed

12 files changed

+150
-105
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ impl SequencerClient {
121121
Ok(balance)
122122
}
123123

124-
pub async fn current_epoch(&self) -> anyhow::Result<u64> {
124+
pub async fn current_epoch(&self) -> anyhow::Result<Option<u64>> {
125125
self.0
126-
.get::<u64>("node/current_epoch")
126+
.get::<Option<u64>>("node/current_epoch")
127127
.send()
128128
.await
129129
.context("getting epoch value")

sequencer-sqlite/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sequencer/src/api.rs

+9
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
177177
) -> Vec<StakeTableEntry<<SeqTypes as NodeType>::SignatureKey>> {
178178
self.as_ref().get_stake_table_current().await
179179
}
180+
181+
/// Get the stake table for the current epoch if not provided
182+
async fn get_current_epoch(&self) -> Option<<SeqTypes as NodeType>::Epoch> {
183+
self.as_ref().get_current_epoch().await
184+
}
180185
}
181186

182187
impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
@@ -205,6 +210,10 @@ impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
205210

206211
self.get_stake_table(epoch).await
207212
}
213+
214+
async fn get_current_epoch(&self) -> Option<<SeqTypes as NodeType>::Epoch> {
215+
self.consensus().await.read().await.cur_epoch().await
216+
}
208217
}
209218

210219
impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> SubmitDataSource<N, P>

sequencer/src/api/data_source.rs

+2
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ pub(crate) trait StakeTableDataSource<T: NodeType> {
127127
fn get_stake_table_current(
128128
&self,
129129
) -> impl Send + Future<Output = Vec<StakeTableEntry<T::SignatureKey>>>;
130+
131+
fn get_current_epoch(&self) -> impl Send + Future<Output = Option<T::Epoch>>;
130132
}
131133

132134
pub(crate) trait CatchupDataSource: Sync {

sequencer/src/api/endpoints.rs

+8
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ where
216216
.await)
217217
}
218218
.boxed()
219+
})?
220+
.at("current_epoch", |_, state| {
221+
async move {
222+
Ok(state
223+
.read(|state| state.get_current_epoch().boxed())
224+
.await)
225+
}
226+
.boxed()
219227
})?;
220228

221229
Ok(api)

tests/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ espresso-types = { path = "../types", features = ["testing"] }
1717
ethers = { workspace = true }
1818
futures = { workspace = true }
1919
reqwest = { workspace = true, features = ["json"] }
20+
sequencer-utils = { path = "../utils" }
2021
surf-disco = { workspace = true }
2122
tokio = { workspace = true }
23+
tracing = { workspace = true }
2224
vbs = { workspace = true }
23-
sequencer-utils = { path = "../utils" }

tests/common/mod.rs

+71-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1-
use anyhow::{anyhow, Result};
1+
use anyhow::{anyhow, Context, Result};
22
use client::SequencerClient;
33
use espresso_types::{FeeAmount, FeeVersion, MarketplaceVersion};
44
use ethers::prelude::*;
55
use futures::future::join_all;
6+
use std::path::Path;
67
use std::{fmt, str::FromStr, time::Duration};
78
use surf_disco::Url;
89
use tokio::time::{sleep, timeout};
910
use vbs::version::StaticVersionType;
1011

12+
use dotenvy::var;
13+
use sequencer_utils::stake_table::{
14+
update_stake_table, PermissionedStakeTableUpdate, StakerIdentity,
15+
};
16+
1117
const L1_PROVIDER_RETRY_INTERVAL: Duration = Duration::from_secs(1);
1218
// TODO add to .env
1319
const RECIPIENT_ADDRESS: &str = "0x0000000000000000000000000000000000000000";
@@ -277,3 +283,67 @@ async fn wait_for_service(url: Url, interval: u64, timeout_duration: u64) -> Res
277283
.await
278284
.map_err(|e| anyhow!("Wait for service, timeout: ({}) {}", url, e))?
279285
}
286+
287+
pub async fn test_stake_table_update(clients: Vec<SequencerClient>) -> Result<()> {
288+
/*
289+
EPOCH V3
290+
*/
291+
292+
let l1_port = var("ESPRESSO_SEQUENCER_L1_PORT")?;
293+
let account_index = var("ESPRESSO_DEPLOYER_ACCOUNT_INDEX")?;
294+
let contract_address = var("ESPRESSO_SEQUENCER_PERMISSIONED_STAKE_TABLE_ADDRESS")?;
295+
let initial_stake_table_path = var("ESPRESSO_SEQUENCER_INITIAL_PERMISSIONED_STAKE_TABLE_PATH")?;
296+
297+
let permissioned_stake_table =
298+
PermissionedStakeTableUpdate::from_toml_file(Path::new(&initial_stake_table_path))?;
299+
300+
// initial stake table has 5 new stakers
301+
302+
let new_stakers = permissioned_stake_table.new_stakers;
303+
//lets remove one
304+
let staker_removed = new_stakers[0].clone();
305+
306+
let st_with_one_removed = PermissionedStakeTableUpdate::new(
307+
vec![],
308+
vec![StakerIdentity {
309+
stake_table_key: staker_removed.stake_table_key.clone(),
310+
}],
311+
);
312+
let client = clients[0].clone();
313+
314+
let epoch_before_update = client.current_epoch().await?.context("curr epoch")?;
315+
tracing::warn!("current_epoch={epoch_before_update:?}");
316+
update_stake_table(
317+
format!("http://localhost:{l1_port}").parse()?,
318+
Duration::from_secs(7),
319+
"test test test test test test test test test test test junk".to_string(),
320+
account_index.parse()?,
321+
contract_address.parse()?,
322+
st_with_one_removed,
323+
)
324+
.await?;
325+
326+
loop {
327+
sleep(Duration::from_secs(10)).await;
328+
let epoch = clients[0].current_epoch().await?.context("curr epoch")?;
329+
tracing::info!("current_epoch={epoch:?}");
330+
if epoch > epoch_before_update + 6 {
331+
let stake_table = client.stake_table(epoch).await?;
332+
tracing::info!("stake_table={stake_table:?}");
333+
assert_eq!(stake_table.len(), 4);
334+
335+
assert!(
336+
stake_table
337+
.iter()
338+
.all(|st| st.stake_key != staker_removed.stake_table_key),
339+
"Entry for {} already exists in the stake table",
340+
staker_removed.stake_table_key
341+
);
342+
343+
break;
344+
}
345+
}
346+
// TODO: randomize this test
347+
348+
Ok(())
349+
}

tests/smoke.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use crate::common::TestConfig;
2-
use anyhow::Result;
1+
use crate::common::{test_stake_table_update, TestConfig};
2+
use anyhow::{Context, Result};
33
use futures::StreamExt;
4+
use sequencer_utils::test_utils::setup_test;
45
use std::time::Instant;
56

67
/// We allow for no change in state across this many consecutive iterations.
@@ -10,6 +11,7 @@ const MAX_TXNS_NOT_INCREMENTING: u8 = 5;
1011

1112
#[tokio::test(flavor = "multi_thread")]
1213
async fn test_smoke() -> Result<()> {
14+
setup_test();
1315
let start = Instant::now();
1416
dotenvy::dotenv()?;
1517

@@ -78,5 +80,18 @@ async fn test_smoke() -> Result<()> {
7880

7981
last = new;
8082
}
83+
84+
let epoch = testing
85+
.espresso
86+
.current_epoch()
87+
.await?
88+
.context("curr epoch")?;
89+
90+
tracing::info!("epoch before stake table update {epoch:?}");
91+
92+
if epoch > 1 {
93+
tracing::info!("testing stake table update");
94+
test_stake_table_update(testing.sequencer_clients).await?;
95+
}
8196
Ok(())
8297
}

tests/upgrades.rs

+1-65
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
1-
use std::{path::Path, time::Duration};
2-
3-
use crate::common::TestConfig;
1+
use crate::common::{test_stake_table_update, TestConfig};
42
use anyhow::Result;
53
use client::SequencerClient;
6-
use dotenvy::var;
74
use espresso_types::{EpochVersion, FeeVersion, MarketplaceVersion};
85
use futures::{future::join_all, StreamExt};
9-
use sequencer_utils::stake_table::{update_stake_table, PermissionedStakeTableUpdate};
10-
use tokio::time::sleep;
116
use vbs::version::{StaticVersionType, Version};
127

138
const SEQUENCER_BLOCKS_TIMEOUT: u64 = 200;
@@ -108,62 +103,3 @@ async fn test_blocks_production(clients: Vec<SequencerClient>, from: u64, num: u
108103

109104
Ok(())
110105
}
111-
112-
async fn test_stake_table_update(clients: Vec<SequencerClient>) -> Result<()> {
113-
/*
114-
EPOCH V3
115-
*/
116-
117-
let rpc_url = var("ESPRESSO_SEQUENCER_L1_PROVIDER")?;
118-
let account_index = var("ESPRESSO_DEPLOYER_ACCOUNT_INDEX")?;
119-
let contract_address = var("ESPRESSO_SEQUENCER_PERMISSIONED_STAKE_TABLE_ADDRESS")?;
120-
let initial_stake_table_path = var("ESPRESSO_SEQUENCER_INITIAL_PERMISSIONED_STAKE_TABLE_PATH")?;
121-
122-
let permissioned_stake_table =
123-
PermissionedStakeTableUpdate::from_toml_file(Path::new(&initial_stake_table_path))?;
124-
125-
// initial stake table has 5 new stakers
126-
127-
let new_stakers = permissioned_stake_table.new_stakers;
128-
//lets remove one
129-
let staker_removed = new_stakers[0].clone();
130-
131-
let st_with_one_removed =
132-
PermissionedStakeTableUpdate::new(vec![staker_removed.clone()], vec![]);
133-
let client = clients[0].clone();
134-
135-
let epoch_before_update = client.current_epoch().await?;
136-
137-
update_stake_table(
138-
rpc_url.parse()?,
139-
Duration::from_secs(7),
140-
"test test test test test test test test test test test junk".to_string(),
141-
account_index.parse()?,
142-
contract_address.parse()?,
143-
st_with_one_removed,
144-
)
145-
.await?;
146-
147-
loop {
148-
sleep(Duration::from_secs(10)).await;
149-
let epoch = clients[0].current_epoch().await?;
150-
151-
if epoch > epoch_before_update {
152-
let stake_table = client.stake_table(epoch).await?;
153-
assert_eq!(stake_table.len(), 4);
154-
155-
assert!(
156-
stake_table
157-
.iter()
158-
.all(|st| st.stake_key != staker_removed.stake_table_key),
159-
"Entry for {} already exists in the stake table",
160-
staker_removed.stake_table_key
161-
);
162-
163-
break;
164-
}
165-
}
166-
// TODO: randomize this test
167-
168-
Ok(())
169-
}

types/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ hotshot = { workspace = true }
3131
hotshot-contract-adapter = { workspace = true }
3232
hotshot-query-service = { workspace = true }
3333
hotshot-types = { workspace = true }
34+
indexmap = "2.7"
3435
itertools = { workspace = true }
3536
jf-merkle-tree = { workspace = true }
3637
jf-utils = { workspace = true } # TODO temporary: used only for test_rng()

types/src/v0/impls/stake_table.rs

+34-34
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use hotshot_types::{
1919
},
2020
PeerConfig,
2121
};
22-
use itertools::Itertools;
22+
use indexmap::IndexMap;
2323
use std::{
2424
cmp::max,
2525
collections::{BTreeSet, HashMap},
@@ -49,43 +49,36 @@ impl StakeTables {
4949
/// should not significantly affect performance to fetch all events and
5050
/// perform the computation in this functions once per epoch.
5151
pub fn from_l1_events(updates: Vec<StakersUpdated>) -> Self {
52-
let changes_per_node = updates
53-
.into_iter()
54-
.flat_map(|event| {
55-
event
56-
.removed
57-
.into_iter()
58-
.map(|key| StakeTableChange::Remove(bls_alloy_to_jf(key)))
59-
.chain(
60-
event
61-
.added
62-
.into_iter()
63-
.map(|node_info| StakeTableChange::Add(node_info.into())),
64-
)
65-
})
66-
.group_by(|change| change.key());
52+
let mut index_map = IndexMap::new();
6753

68-
// If the last event for a stakers is `Added` the staker is currently
69-
// staking, if the last event is removed or (or the staker is not present)
70-
// they are not staking.
71-
let currently_staking = changes_per_node
72-
.into_iter()
73-
.map(|(_pub_key, deltas)| deltas.last().expect("deltas non-empty").clone())
74-
.filter_map(|change| match change {
75-
StakeTableChange::Add(node_info) => Some(node_info),
76-
StakeTableChange::Remove(_) => None,
77-
});
78-
79-
let mut consensus_stake_table: Vec<StakeTableEntry<PubKey>> = vec![];
80-
let mut da_members: Vec<StakeTableEntry<PubKey>> = vec![];
81-
for node in currently_staking {
82-
consensus_stake_table.push(node.clone().into());
83-
if node.da {
84-
da_members.push(node.into());
54+
for event in updates {
55+
for key in event.removed {
56+
let change = StakeTableChange::Remove(bls_alloy_to_jf(key));
57+
index_map.insert(change.key(), change);
58+
}
59+
for node_info in event.added {
60+
let change = StakeTableChange::Add(node_info.into());
61+
index_map.insert(change.key(), change);
8562
}
8663
}
8764

88-
Self::new(consensus_stake_table.into(), da_members.into())
65+
let mut da_members = Vec::new();
66+
let mut stake_table = Vec::new();
67+
68+
for change in index_map.values() {
69+
if let StakeTableChange::Add(node_info_jf) = change {
70+
let entry: StakeTableEntry<PubKey> = node_info_jf.clone().into();
71+
stake_table.push(entry.clone());
72+
if change.is_da() {
73+
da_members.push(entry);
74+
}
75+
}
76+
}
77+
78+
tracing::error!("DA={da_members:?}");
79+
tracing::error!("ST={stake_table:?}");
80+
81+
Self::new(stake_table.into(), da_members.into())
8982
}
9083
}
9184

@@ -121,6 +114,13 @@ impl StakeTableChange {
121114
StakeTableChange::Remove(key) => *key,
122115
}
123116
}
117+
118+
pub(crate) fn is_da(&self) -> bool {
119+
match self {
120+
StakeTableChange::Add(node_info) => node_info.da,
121+
StakeTableChange::Remove(_) => false,
122+
}
123+
}
124124
}
125125

126126
/// Holds Stake table and da stake

0 commit comments

Comments
 (0)