Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 49 additions & 62 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::block_verification_types::{
pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnReconstructionResult,
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
DataAvailabilityChecker, DataColumnReconstructionResult,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -2993,6 +2993,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn verify_block_for_gossip(
self: &Arc<Self>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
custody_columns_count: usize,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
let chain = self.clone();
self.task_executor
Expand All @@ -3002,7 +3003,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let slot = block.slot();
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();

match GossipVerifiedBlock::new(block, &chain) {
match GossipVerifiedBlock::new(block, &chain, custody_columns_count) {
Ok(verified) => {
let commitments_formatted = verified.block.commitments_formatted();
debug!(
Expand Down Expand Up @@ -3169,7 +3170,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}

self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
// consumers don't expect the blobs event to fire erratically.
if !self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
}

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
Expand Down Expand Up @@ -3640,9 +3648,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;
let availability = self.data_availability_checker.put_engine_blobs(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
blobs,
data_column_recv,
)?;

self.process_availability(slot, availability, || Ok(()))
.await
Expand Down Expand Up @@ -3727,7 +3738,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;

// Record the time at which this block's blobs became available.
Expand Down Expand Up @@ -3755,7 +3765,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block,
parent_eth1_finalization_data,
consensus_context,
data_column_recv,
)
},
"payload_verification_handle",
Expand Down Expand Up @@ -3794,7 +3803,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
Expand Down Expand Up @@ -3892,7 +3900,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
if let Err(e) = self.early_attester_cache.add_head_block(
block_root,
signed_block.clone(),
&signed_block,
proto_block,
&state,
&self.spec,
Expand Down Expand Up @@ -3961,15 +3969,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let (_, signed_block, block_data) = signed_block.deconstruct();

match self.get_blobs_or_columns_store_op(
block_root,
signed_block.epoch(),
blobs,
data_columns,
data_column_recv,
) {
match self.get_blobs_or_columns_store_op(block_root, block_data) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
Expand Down Expand Up @@ -7218,29 +7220,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

fn get_blobs_or_columns_store_op(
pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: Option<BlobSidecarList<T::EthSpec>>,
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<T::EthSpec>>, String> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infinitely better omg

if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();

let custody_columns_available = data_columns
.as_ref()
.as_ref()
.is_some_and(|columns| columns.len() == custody_columns_count);

let data_columns_to_persist = if custody_columns_available {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
data_columns
} else if let Some(data_column_recv) = data_column_recv {
match block_data {
AvailableBlockData::NoData => Ok(None),
AvailableBlockData::Blobs(blobs) => {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
}
AvailableBlockData::DataColumns(data_columns) => {
debug!(
self.log, "Writing data columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
}
AvailableBlockData::DataColumnsRecv(data_column_recv) => {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
Expand All @@ -7250,34 +7253,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
Some(computed_data_columns)
} else {
// No blobs in the block.
None
};

if let Some(data_columns) = data_columns_to_persist {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
}
}
} else if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
self.log, "Writing data columns to store";
"block_root" => %block_root,
"count" => blobs.len(),
"count" => computed_data_columns.len(),
);
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
// TODO(das): Store only this node's custody columns
Ok(Some(StoreOp::PutDataColumns(
block_root,
computed_data_columns,
)))
}
}

Ok(None)
}
}

Expand Down
25 changes: 17 additions & 8 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ pub struct GossipVerifiedBlock<T: BeaconChainTypes> {
pub block_root: Hash256,
parent: Option<PreProcessingSnapshot<T::EthSpec>>,
consensus_context: ConsensusContext<T::EthSpec>,
custody_columns_count: usize,
}

/// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit
Expand Down Expand Up @@ -715,6 +716,7 @@ pub trait IntoGossipVerifiedBlock<T: BeaconChainTypes>: Sized {
fn into_gossip_verified_block(
self,
chain: &BeaconChain<T>,
custody_columns_count: usize,
) -> Result<GossipVerifiedBlock<T>, BlockError>;
fn inner_block(&self) -> Arc<SignedBeaconBlock<T::EthSpec>>;
}
Expand All @@ -723,6 +725,7 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlock<T> for GossipVerifiedBlock<T>
fn into_gossip_verified_block(
self,
_chain: &BeaconChain<T>,
_custody_columns_count: usize,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
Ok(self)
}
Expand All @@ -735,8 +738,9 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlock<T> for Arc<SignedBeaconBlock<T
fn into_gossip_verified_block(
self,
chain: &BeaconChain<T>,
custody_columns_count: usize,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
GossipVerifiedBlock::new(self, chain)
GossipVerifiedBlock::new(self, chain, custody_columns_count)
}

fn inner_block(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
Expand Down Expand Up @@ -805,6 +809,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
pub fn new(
block: Arc<SignedBeaconBlock<T::EthSpec>>,
chain: &BeaconChain<T>,
custody_columns_count: usize,
) -> Result<Self, BlockError> {
// If the block is valid for gossip we don't supply it to the slasher here because
// we assume it will be transformed into a fully verified block. We *do* need to supply
Expand All @@ -814,19 +819,22 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// The `SignedBeaconBlock` and `SignedBeaconBlockHeader` have the same canonical root,
// but it's way quicker to calculate root of the header since the hash of the tree rooted
// at `BeaconBlockBody` is already computed in the header.
Self::new_without_slasher_checks(block, &header, chain).map_err(|e| {
process_block_slash_info::<_, BlockError>(
chain,
BlockSlashInfo::from_early_error_block(header, e),
)
})
Self::new_without_slasher_checks(block, &header, chain, custody_columns_count).map_err(
|e| {
process_block_slash_info::<_, BlockError>(
chain,
BlockSlashInfo::from_early_error_block(header, e),
)
},
)
}

/// As for new, but doesn't pass the block to the slasher.
fn new_without_slasher_checks(
block: Arc<SignedBeaconBlock<T::EthSpec>>,
block_header: &SignedBeaconBlockHeader,
chain: &BeaconChain<T>,
custody_columns_count: usize,
) -> Result<Self, BlockError> {
// Ensure the block is the correct structure for the fork at `block.slot()`.
block
Expand Down Expand Up @@ -1031,6 +1039,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
block_root,
parent,
consensus_context,
custody_columns_count,
})
}

Expand Down Expand Up @@ -1175,6 +1184,7 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
block: MaybeAvailableBlock::AvailabilityPending {
block_root: from.block_root,
block,
custody_columns_count: from.custody_columns_count,
},
block_root: from.block_root,
parent: Some(parent),
Expand Down Expand Up @@ -1707,7 +1717,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv: None,
},
payload_verification_handle,
})
Expand Down
Loading