Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
Expand Down Expand Up @@ -3086,7 +3086,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
Expand Down Expand Up @@ -3214,7 +3214,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};

let r = self
.process_availability(slot, availability, None, || Ok(()))
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Expand Down Expand Up @@ -3342,7 +3342,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block), None).await
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
Expand Down Expand Up @@ -3474,7 +3474,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3490,7 +3490,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3513,7 +3513,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_gossip_data_columns(block_root, data_columns)?;

self.process_availability(slot, availability, None, publish_fn)
self.process_availability(slot, availability, publish_fn)
.await
}

Expand Down Expand Up @@ -3557,7 +3557,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3566,14 +3566,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_engine_blobs(block_root, blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;

self.process_availability(slot, availability, data_column_recv, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand Down Expand Up @@ -3613,7 +3613,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3625,14 +3625,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block, recv).await
self.import_available_block(block).await
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
Expand All @@ -3643,7 +3642,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let AvailableExecutedBlock {
block,
Expand All @@ -3658,6 +3656,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;

// Record the time at which this block's blobs became available.
Expand Down Expand Up @@ -3724,7 +3723,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
Expand Down Expand Up @@ -3895,22 +3894,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// if block is made available via blobs, dropped the data columns.
let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count);
// if block is made available via blobs and `data_columns` is either `None` or incomplete, dropped the data columns.
let maybe_all_data_columns =
data_columns.filter(|columns| columns.len() == custody_columns_count);

let data_columns = match (data_columns, data_column_recv) {
let data_columns_to_persist = match (maybe_all_data_columns, data_column_recv) {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
(Some(columns), _) => Some(columns),
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
// wait for the data columns to be computed (blocking).
(None, Some(mut data_column_recv)) => {
(None, Some(data_column_recv)) => {
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
if let Some(columns) = data_column_recv.blocking_recv() {
if let Ok(columns) = data_column_recv.blocking_recv() {
Some(columns)
} else {
let err_msg = "Did not receive data columns from sender";
Expand Down Expand Up @@ -3952,7 +3952,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(data_columns) = data_columns {
if let Some(data_columns) = data_columns_to_persist {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
if !data_columns.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv: None,
},
payload_verification_handle,
})
Expand Down
11 changes: 8 additions & 3 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use ssz_types::VariableList;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::oneshot;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

/// A block that has been received over RPC. It has 2 internal variants:
Expand Down Expand Up @@ -355,14 +356,17 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, Derivative)]
#[derivative(PartialEq)]
pub struct BlockImportData<E: EthSpec> {
pub block_root: Hash256,
pub state: BeaconState<E>,
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<E>,
#[derivative(PartialEq = "ignore")]
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
}

impl<E: EthSpec> BlockImportData<E> {
Expand All @@ -381,6 +385,7 @@ impl<E: EthSpec> BlockImportData<E> {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::oneshot;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Expand Down Expand Up @@ -220,7 +221,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_err(AvailabilityCheckError::InvalidBlobs)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
.put_kzg_verified_blobs(block_root, verified_blobs, None, &self.log)
}

/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
Expand Down Expand Up @@ -260,6 +261,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
.slot_clock
Expand All @@ -269,8 +271,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let verified_blobs =
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp);

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
self.availability_cache.put_kzg_verified_blobs(
block_root,
verified_blobs,
data_column_recv,
&self.log,
)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand All @@ -285,6 +291,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.put_kzg_verified_blobs(
gossip_blob.block_root(),
vec![gossip_blob.into_inner()],
None,
&self.log,
)
}
Expand Down Expand Up @@ -801,7 +808,6 @@ impl<E: EthSpec> AvailableBlock<E> {
block,
blobs,
data_columns,
blobs_available_timestamp: _,
..
} = self;
(block_root, block, blobs, data_columns)
Expand Down
Loading
Loading