Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into arithmetic-lint…
Browse files Browse the repository at this point in the history
…-in-rate-limiter
  • Loading branch information
ackintosh committed Feb 24, 2025
2 parents 5dbce0f + cf4104a commit 66ac740
Show file tree
Hide file tree
Showing 68 changed files with 1,033 additions and 622 deletions.
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
beacon_node/network/ @jxs
beacon_node/lighthouse_network/ @jxs
/beacon_node/network/ @jxs
/beacon_node/lighthouse_network/ @jxs
4 changes: 3 additions & 1 deletion .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ env:
# Disable debug info (see https://github.com/sigp/lighthouse/issues/4005)
RUSTFLAGS: "-D warnings -C debuginfo=0"
# Prevent Github API rate limiting.
LIGHTHOUSE_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# NOTE: this token is a personal access token on Jimmy's account due to the default GITHUB_TOKEN
# not having access to other repositories. We should eventually devise a better solution here.
LIGHTHOUSE_GITHUB_TOKEN: ${{ secrets.LIGHTHOUSE_GITHUB_TOKEN }}
# Enable self-hosted runners for the sigp repo only.
SELF_HOSTED_RUNNERS: ${{ github.repository == 'sigp/lighthouse' }}
# Self-hosted runners need to reference a different host for `./watch` tests.
Expand Down
28 changes: 13 additions & 15 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1450,19 +1450,17 @@ where
return Err(Error::UnknownTargetRoot(target.root));
}

chain
.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| {
let committees_per_slot = committee_cache.committees_per_slot();

Ok(committee_cache
.get_beacon_committees_at_slot(attestation.data().slot)
.map(|committees| map_fn((committees, committees_per_slot)))
.unwrap_or_else(|_| {
Err(Error::NoCommitteeForSlotAndIndex {
slot: attestation.data().slot,
index: attestation.committee_index().unwrap_or(0),
})
}))
})
.map_err(BeaconChainError::from)?
chain.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| {
let committees_per_slot = committee_cache.committees_per_slot();

Ok(committee_cache
.get_beacon_committees_at_slot(attestation.data().slot)
.map(|committees| map_fn((committees, committees_per_slot)))
.unwrap_or_else(|_| {
Err(Error::NoCommitteeForSlotAndIndex {
slot: attestation.data().slot,
index: attestation.committee_index().unwrap_or(0),
})
}))
})?
}
116 changes: 53 additions & 63 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::block_verification_types::{
pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnReconstructionResult,
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
DataAvailabilityChecker, DataColumnReconstructionResult,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -3169,7 +3169,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}

self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
// consumers don't expect the blobs event to fire erratically.
if !self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
}

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
Expand Down Expand Up @@ -3640,9 +3647,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;
let availability = self.data_availability_checker.put_engine_blobs(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
blobs,
data_column_recv,
)?;

self.process_availability(slot, availability, || Ok(()))
.await
Expand Down Expand Up @@ -3727,7 +3737,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;

// Record the time at which this block's blobs became available.
Expand Down Expand Up @@ -3755,7 +3764,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block,
parent_eth1_finalization_data,
consensus_context,
data_column_recv,
)
},
"payload_verification_handle",
Expand Down Expand Up @@ -3794,7 +3802,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
Expand Down Expand Up @@ -3892,7 +3899,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
if let Err(e) = self.early_attester_cache.add_head_block(
block_root,
signed_block.clone(),
&signed_block,
proto_block,
&state,
&self.spec,
Expand Down Expand Up @@ -3961,15 +3968,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let (_, signed_block, block_data) = signed_block.deconstruct();

match self.get_blobs_or_columns_store_op(
block_root,
signed_block.epoch(),
blobs,
data_columns,
data_column_recv,
) {
match self.get_blobs_or_columns_store_op(block_root, block_data) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
Expand Down Expand Up @@ -6505,9 +6506,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Returns `true` if the given slot is prior to the `bellatrix_fork_epoch`.
pub fn slot_is_prior_to_bellatrix(&self, slot: Slot) -> bool {
self.spec.bellatrix_fork_epoch.map_or(true, |bellatrix| {
slot.epoch(T::EthSpec::slots_per_epoch()) < bellatrix
})
self.spec
.bellatrix_fork_epoch
.is_none_or(|bellatrix| slot.epoch(T::EthSpec::slots_per_epoch()) < bellatrix)
}

/// Returns the value of `execution_optimistic` for `block`.
Expand Down Expand Up @@ -7218,29 +7219,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

fn get_blobs_or_columns_store_op(
pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: Option<BlobSidecarList<T::EthSpec>>,
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let _custody_columns_count = self.data_availability_checker.get_sampling_column_count();

let custody_columns_available = data_columns
.as_ref()
.as_ref()
.is_some_and(|columns| columns.len() == custody_columns_count);

let data_columns_to_persist = if custody_columns_available {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
data_columns
} else if let Some(data_column_recv) = data_column_recv {
match block_data {
AvailableBlockData::NoData => Ok(None),
AvailableBlockData::Blobs(blobs) => {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
}
AvailableBlockData::DataColumns(data_columns) => {
debug!(
self.log, "Writing data columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
}
AvailableBlockData::DataColumnsRecv(data_column_recv) => {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
Expand All @@ -7250,34 +7256,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
Some(computed_data_columns)
} else {
// No blobs in the block.
None
};

if let Some(data_columns) = data_columns_to_persist {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
}
}
} else if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
self.log, "Writing data columns to store";
"block_root" => %block_root,
"count" => blobs.len(),
"count" => computed_data_columns.len(),
);
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
// TODO(das): Store only this node's custody columns
Ok(Some(StoreOp::PutDataColumns(
block_root,
computed_data_columns,
)))
}
}

Ok(None)
}
}

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/block_times_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl BlockTimesCache {
if block_times
.timestamps
.all_blobs_observed
.map_or(true, |prev| timestamp > prev)
.is_none_or(|prev| timestamp > prev)
{
block_times.timestamps.all_blobs_observed = Some(timestamp);
}
Expand All @@ -195,7 +195,7 @@ impl BlockTimesCache {
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
let existing_timestamp = field(&mut block_times.timestamps);
if existing_timestamp.map_or(true, |prev| timestamp < prev) {
if existing_timestamp.is_none_or(|prev| timestamp < prev) {
*existing_timestamp = Some(timestamp);
}
}
Expand Down
1 change: 0 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv: None,
},
payload_verification_handle,
})
Expand Down
16 changes: 3 additions & 13 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use derivative::Derivative;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::oneshot;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

/// A block that has been received over RPC. It has 2 internal variants:
Expand Down Expand Up @@ -265,7 +264,6 @@ impl<E: EthSpec> ExecutedBlock<E> {

/// A block that has completed all pre-deneb block processing checks including verification
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
#[derive(PartialEq)]
pub struct AvailableExecutedBlock<E: EthSpec> {
pub block: AvailableBlock<E>,
pub import_data: BlockImportData<E>,
Expand Down Expand Up @@ -338,21 +336,14 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
}
}

#[derive(Debug, Derivative)]
#[derivative(PartialEq)]
#[derive(Debug, PartialEq)]
pub struct BlockImportData<E: EthSpec> {
pub block_root: Hash256,
pub state: BeaconState<E>,
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<E>,
#[derivative(PartialEq = "ignore")]
/// An optional receiver for `DataColumnSidecarList`.
///
/// This field is `Some` when data columns are being computed asynchronously.
/// The resulting `DataColumnSidecarList` will be sent through this receiver.
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
}

impl<E: EthSpec> BlockImportData<E> {
Expand All @@ -371,7 +362,6 @@ impl<E: EthSpec> BlockImportData<E> {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ pub struct ChainConfig {
/// The delay in milliseconds applied by the node between sending each blob or data column batch.
/// This doesn't apply if the node is the block proposer.
pub blob_publication_batch_interval: Duration,
/// Artificial delay for block publishing. For PeerDAS testing only.
pub block_publishing_delay: Option<Duration>,
/// Artificial delay for data column publishing. For PeerDAS testing only.
pub data_column_publishing_delay: Option<Duration>,
}

impl Default for ChainConfig {
Expand Down Expand Up @@ -129,6 +133,8 @@ impl Default for ChainConfig {
enable_sampling: false,
blob_publication_batches: 4,
blob_publication_batch_interval: Duration::from_millis(300),
block_publishing_delay: None,
data_column_publishing_delay: None,
}
}
}
Expand Down
Loading

0 comments on commit 66ac740

Please sign in to comment.