Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::data_availability_checker::{
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::envelope_times_cache::EnvelopeTimesCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload};
Expand Down Expand Up @@ -56,6 +57,9 @@ use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_data_sidecars::ObservedDataSidecars;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
use crate::payload_envelope_verification::{
EnvelopeError, ExecutedEnvelope, ExecutionPendingEnvelope,
};
use crate::pending_payload_envelopes::PendingPayloadEnvelopes;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::persisted_custody::persist_custody_context;
Expand Down Expand Up @@ -463,6 +467,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to keep track of various envelope timings.
pub envelope_times_cache: Arc<RwLock<EnvelopeTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
pub pre_finalization_block_cache: PreFinalizationBlockCache,
/// A cache used to produce light_client server messages
Expand Down Expand Up @@ -1151,6 +1157,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Returns the full block at the given root, if it's available in the database.
///
/// Should always return a full block for pre-merge and post-gloas blocks.
pub fn get_full_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
match self.store.try_get_full_block(block_root)? {
Some(DatabaseBlock::Full(block)) => Ok(Some(block)),
Copy link
Member

@pawanjay176 pawanjay176 Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially look up a block status table here to figure out if we have processed the block and it turned out to be invalid

Some(DatabaseBlock::Blinded(_)) => {
// TODO(gloas) should we return None here?
Ok(None)
}
None => Ok(None),
}
}

/// Returns the block at the given root, if any.
///
/// ## Errors
Expand Down Expand Up @@ -3534,6 +3557,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
))
}

/// Accepts a fully-verified payload envelope and awaits on its payload verification handle to
/// get a fully `ExecutedEnvelope`.
///
/// An error is returned if the verification handle couldn't be awaited.
#[instrument(skip_all, level = "debug")]
pub async fn into_executed_payload_envelope(
self: Arc<Self>,
pending_envelope: ExecutionPendingEnvelope<T::EthSpec>,
) -> Result<ExecutedEnvelope<T::EthSpec>, EnvelopeError> {
let ExecutionPendingEnvelope {
signed_envelope,
import_data,
payload_verification_handle,
} = pending_envelope;

let payload_verification_outcome = payload_verification_handle
.await
.map_err(BeaconChainError::TokioJoin)?
.ok_or(BeaconChainError::RuntimeShutdown)??;

Ok(ExecutedEnvelope::new(
signed_envelope,
import_data,
payload_verification_outcome,
))
}

/* Import methods */

/// Checks if the block is available, and imports immediately if so, otherwise caches the block
Expand Down Expand Up @@ -4161,7 +4211,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Check block's consistentency with any configured weak subjectivity checkpoint.
fn check_block_against_weak_subjectivity_checkpoint(
pub(crate) fn check_block_against_weak_subjectivity_checkpoint(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
Expand Down Expand Up @@ -6468,6 +6518,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// sync anyway).
self.naive_aggregation_pool.write().prune(slot);
self.block_times_cache.write().prune(slot);
self.envelope_times_cache.write().prune(slot);

// Don't run heavy-weight tasks during sync.
if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,8 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
}

/// Used to await the result of executing payload with an EE.
type PayloadVerificationHandle = JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;
pub type PayloadVerificationHandle =
JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;

/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
/// ready to import into the `BeaconChain`. The validation includes:
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ where
)),
beacon_proposer_cache,
block_times_cache: <_>::default(),
envelope_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
early_attester_cache: <_>::default(),
Expand Down
197 changes: 197 additions & 0 deletions beacon_node/beacon_chain/src/envelope_times_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
//! This module provides the `EnvelopeTimesCache` which contains information regarding payload
//! envelope timings.
//!
//! This provides `BeaconChain` and associated functions with access to the timestamps of when a
//! payload envelope was observed, verified, executed, and imported.
//! This allows for better traceability and allows us to determine the root cause for why an
//! envelope was imported late.
//! This allows us to distinguish between the following scenarios:
//! - The envelope was observed late.
//! - Consensus verification was slow.
//! - Execution verification was slow.
//! - The DB write was slow.

use eth2::types::{Hash256, Slot};
use std::collections::HashMap;
use std::time::Duration;

type BlockRoot = Hash256;

#[derive(Clone, Default)]
pub struct EnvelopeTimestamps {
/// When the envelope was first observed (gossip or RPC).
pub observed: Option<Duration>,
/// When consensus verification (state transition) completed.
pub consensus_verified: Option<Duration>,
/// When execution layer verification started.
pub started_execution: Option<Duration>,
/// When execution layer verification completed.
pub executed: Option<Duration>,
/// When the envelope was imported into the DB.
pub imported: Option<Duration>,
}

/// Delay data for envelope processing, computed relative to the slot start time.
#[derive(Debug, Default)]
pub struct EnvelopeDelays {
/// Time after start of slot we saw the envelope.
pub observed: Option<Duration>,
/// The time it took to complete consensus verification of the envelope.
pub consensus_verification_time: Option<Duration>,
/// The time it took to complete execution verification of the envelope.
pub execution_time: Option<Duration>,
/// Time after execution until the envelope was imported.
pub imported: Option<Duration>,
}

impl EnvelopeDelays {
fn new(times: EnvelopeTimestamps, slot_start_time: Duration) -> EnvelopeDelays {
let observed = times
.observed
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
let consensus_verification_time = times
.consensus_verified
.and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?));
let execution_time = times
.executed
.and_then(|executed| executed.checked_sub(times.started_execution?));
let imported = times
.imported
.and_then(|imported_time| imported_time.checked_sub(times.executed?));
EnvelopeDelays {
observed,
consensus_verification_time,
execution_time,
imported,
}
}
}

pub struct EnvelopeTimesCacheValue {
pub slot: Slot,
pub timestamps: EnvelopeTimestamps,
pub peer_id: Option<String>,
}

impl EnvelopeTimesCacheValue {
fn new(slot: Slot) -> Self {
EnvelopeTimesCacheValue {
slot,
timestamps: Default::default(),
peer_id: None,
}
}
}

#[derive(Default)]
pub struct EnvelopeTimesCache {
pub cache: HashMap<BlockRoot, EnvelopeTimesCacheValue>,
}

impl EnvelopeTimesCache {
/// Set the observation time for `block_root` to `timestamp` if `timestamp` is less than
/// any previous timestamp at which this envelope was observed.
pub fn set_time_observed(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
peer_id: Option<String>,
) {
let entry = self
.cache
.entry(block_root)
.or_insert_with(|| EnvelopeTimesCacheValue::new(slot));
match entry.timestamps.observed {
Some(existing) if existing <= timestamp => {
// Existing timestamp is earlier, do nothing.
}
_ => {
entry.timestamps.observed = Some(timestamp);
entry.peer_id = peer_id;
}
}
}

/// Set the timestamp for `field` if that timestamp is less than any previously known value.
fn set_time_if_less(
&mut self,
block_root: BlockRoot,
slot: Slot,
field: impl Fn(&mut EnvelopeTimestamps) -> &mut Option<Duration>,
timestamp: Duration,
) {
let entry = self
.cache
.entry(block_root)
.or_insert_with(|| EnvelopeTimesCacheValue::new(slot));
let existing_timestamp = field(&mut entry.timestamps);
if existing_timestamp.is_none_or(|prev| timestamp < prev) {
*existing_timestamp = Some(timestamp);
}
}

pub fn set_time_consensus_verified(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.consensus_verified,
timestamp,
)
}

pub fn set_time_started_execution(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.started_execution,
timestamp,
)
}

pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.executed,
timestamp,
)
}

pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.imported,
timestamp,
)
}

pub fn get_envelope_delays(
&self,
block_root: BlockRoot,
slot_start_time: Duration,
) -> EnvelopeDelays {
if let Some(entry) = self.cache.get(&block_root) {
EnvelopeDelays::new(entry.timestamps.clone(), slot_start_time)
} else {
EnvelopeDelays::default()
}
}

/// Prune the cache to only store the most recent 2 epochs.
pub fn prune(&mut self, current_slot: Slot) {
self.cache
.retain(|_, entry| entry.slot > current_slot.saturating_sub(64_u64));
}
}
Loading
Loading