diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9f62bf11f5f..a491e8559b7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -27,6 +27,7 @@ use crate::data_availability_checker::{ }; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; +use crate::envelope_times_cache::EnvelopeTimesCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::events::ServerSentEventHandler; use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload}; @@ -463,6 +464,8 @@ pub struct BeaconChain { pub early_attester_cache: EarlyAttesterCache, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, + /// A cache used to keep track of various envelope timings. + pub envelope_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, /// A cache used to produce light_client server messages @@ -1151,6 +1154,23 @@ impl BeaconChain { } } + /// Returns the full block at the given root, if it's available in the database. + /// + /// Should always return a full block for pre-merge and post-gloas blocks. + pub fn get_full_block( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + match self.store.try_get_full_block(block_root)? { + Some(DatabaseBlock::Full(block)) => Ok(Some(block)), + Some(DatabaseBlock::Blinded(_)) => { + // TODO(gloas) should we return None here? + Ok(None) + } + None => Ok(None), + } + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -4161,7 +4181,7 @@ impl BeaconChain { } /// Check block's consistentency with any configured weak subjectivity checkpoint. - fn check_block_against_weak_subjectivity_checkpoint( + pub(crate) fn check_block_against_weak_subjectivity_checkpoint( &self, block: BeaconBlockRef, block_root: Hash256, @@ -6468,6 +6488,7 @@ impl BeaconChain { // sync anyway). self.naive_aggregation_pool.write().prune(slot); self.block_times_cache.write().prune(slot); + self.envelope_times_cache.write().prune(slot); // Don't run heavy-weight tasks during sync. if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index e0943d5d931..c54c92621e5 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -718,7 +718,8 @@ pub struct SignatureVerifiedBlock { } /// Used to await the result of executing payload with an EE. -type PayloadVerificationHandle = JoinHandle>>; +pub type PayloadVerificationHandle = + JoinHandle>>; /// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and /// ready to import into the `BeaconChain`. The validation includes: diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 4c82c93ba3d..aef90f46e38 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1056,6 +1056,7 @@ where )), beacon_proposer_cache, block_times_cache: <_>::default(), + envelope_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), validator_pubkey_cache: RwLock::new(validator_pubkey_cache), early_attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/envelope_times_cache.rs b/beacon_node/beacon_chain/src/envelope_times_cache.rs new file mode 100644 index 00000000000..84c936c210b --- /dev/null +++ b/beacon_node/beacon_chain/src/envelope_times_cache.rs @@ -0,0 +1,197 @@ +//! This module provides the `EnvelopeTimesCache` which contains information regarding payload +//! envelope timings. +//! +//! This provides `BeaconChain` and associated functions with access to the timestamps of when a +//! payload envelope was observed, verified, executed, and imported. +//! This allows for better traceability and allows us to determine the root cause for why an +//! envelope was imported late. +//! This allows us to distinguish between the following scenarios: +//! - The envelope was observed late. +//! - Consensus verification was slow. +//! - Execution verification was slow. +//! - The DB write was slow. + +use eth2::types::{Hash256, Slot}; +use std::collections::HashMap; +use std::time::Duration; + +type BlockRoot = Hash256; + +#[derive(Clone, Default)] +pub struct EnvelopeTimestamps { + /// When the envelope was first observed (gossip or RPC). + pub observed: Option, + /// When consensus verification (state transition) completed. + pub consensus_verified: Option, + /// When execution layer verification started. + pub started_execution: Option, + /// When execution layer verification completed. + pub executed: Option, + /// When the envelope was imported into the DB. + pub imported: Option, +} + +/// Delay data for envelope processing, computed relative to the slot start time. +#[derive(Debug, Default)] +pub struct EnvelopeDelays { + /// Time after start of slot we saw the envelope. + pub observed: Option, + /// The time it took to complete consensus verification of the envelope. + pub consensus_verification_time: Option, + /// The time it took to complete execution verification of the envelope. + pub execution_time: Option, + /// Time after execution until the envelope was imported. + pub imported: Option, +} + +impl EnvelopeDelays { + fn new(times: EnvelopeTimestamps, slot_start_time: Duration) -> EnvelopeDelays { + let observed = times + .observed + .and_then(|observed_time| observed_time.checked_sub(slot_start_time)); + let consensus_verification_time = times + .consensus_verified + .and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?)); + let execution_time = times + .executed + .and_then(|executed| executed.checked_sub(times.started_execution?)); + let imported = times + .imported + .and_then(|imported_time| imported_time.checked_sub(times.executed?)); + EnvelopeDelays { + observed, + consensus_verification_time, + execution_time, + imported, + } + } +} + +pub struct EnvelopeTimesCacheValue { + pub slot: Slot, + pub timestamps: EnvelopeTimestamps, + pub peer_id: Option, +} + +impl EnvelopeTimesCacheValue { + fn new(slot: Slot) -> Self { + EnvelopeTimesCacheValue { + slot, + timestamps: Default::default(), + peer_id: None, + } + } +} + +#[derive(Default)] +pub struct EnvelopeTimesCache { + pub cache: HashMap, +} + +impl EnvelopeTimesCache { + /// Set the observation time for `block_root` to `timestamp` if `timestamp` is less than + /// any previous timestamp at which this envelope was observed. + pub fn set_time_observed( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + peer_id: Option, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + match entry.timestamps.observed { + Some(existing) if existing <= timestamp => { + // Existing timestamp is earlier, do nothing. + } + _ => { + entry.timestamps.observed = Some(timestamp); + entry.peer_id = peer_id; + } + } + } + + /// Set the timestamp for `field` if that timestamp is less than any previously known value. + fn set_time_if_less( + &mut self, + block_root: BlockRoot, + slot: Slot, + field: impl Fn(&mut EnvelopeTimestamps) -> &mut Option, + timestamp: Duration, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + let existing_timestamp = field(&mut entry.timestamps); + if existing_timestamp.is_none_or(|prev| timestamp < prev) { + *existing_timestamp = Some(timestamp); + } + } + + pub fn set_time_consensus_verified( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.consensus_verified, + timestamp, + ) + } + + pub fn set_time_started_execution( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.started_execution, + timestamp, + ) + } + + pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.executed, + timestamp, + ) + } + + pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.imported, + timestamp, + ) + } + + pub fn get_envelope_delays( + &self, + block_root: BlockRoot, + slot_start_time: Duration, + ) -> EnvelopeDelays { + if let Some(entry) = self.cache.get(&block_root) { + EnvelopeDelays::new(entry.timestamps.clone(), slot_start_time) + } else { + EnvelopeDelays::default() + } + } + + /// Prune the cache to only store the most recent 2 epochs. + pub fn prune(&mut self, current_slot: Slot) { + self.cache + .retain(|_, entry| entry.slot > current_slot.saturating_sub(64_u64)); + } +} diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index bdf3ab95949..a5c2ead427a 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -112,12 +112,17 @@ impl PayloadNotifier { if let Some(precomputed_status) = self.payload_verification_status { Ok(precomputed_status) } else { - notify_new_payload(&self.chain, self.block.message()).await + notify_new_payload( + &self.chain, + self.block.message().tree_hash_root(), + self.block.message().try_into()?, + ) + .await } } } -/// Verify that `execution_payload` contained by `block` is considered valid by an execution +/// Verify that `execution_payload` associated with `beacon_block_root` is considered valid by an execution /// engine. /// /// ## Specification @@ -126,17 +131,20 @@ impl PayloadNotifier { /// contains a few extra checks by running `partially_verify_execution_payload` first: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload -async fn notify_new_payload( +pub async fn notify_new_payload( chain: &Arc>, - block: BeaconBlockRef<'_, T::EthSpec>, + beacon_block_root: Hash256, + new_payload_request: NewPayloadRequest<'_, T::EthSpec>, ) -> Result { let execution_layer = chain .execution_layer .as_ref() .ok_or(ExecutionPayloadError::NoExecutionConnection)?; - let execution_block_hash = block.execution_payload()?.block_hash(); - let new_payload_response = execution_layer.notify_new_payload(block.try_into()?).await; + let execution_block_hash = new_payload_request.execution_payload_ref().block_hash(); + let new_payload_response = execution_layer + .notify_new_payload(new_payload_request.clone()) + .await; match new_payload_response { Ok(status) => match status { @@ -152,10 +160,11 @@ async fn notify_new_payload( ?validation_error, ?latest_valid_hash, ?execution_block_hash, - root = ?block.tree_hash_root(), - graffiti = block.body().graffiti().as_utf8_lossy(), - proposer_index = block.proposer_index(), - slot = %block.slot(), + // TODO(gloas) are these other logs important? + root = ?beacon_block_root, + // graffiti = block.body().graffiti().as_utf8_lossy(), + // proposer_index = block.proposer_index(), + // slot = %block.slot(), method = "new_payload", "Invalid execution payload" ); @@ -178,11 +187,11 @@ async fn notify_new_payload( { // This block has not yet been applied to fork choice, so the latest block that was // imported to fork choice was the parent. - let latest_root = block.parent_root(); + let latest_root = new_payload_request.parent_beacon_block_root()?; chain .process_invalid_execution_payload(&InvalidationOperation::InvalidateMany { - head_block_root: latest_root, + head_block_root: *latest_root, always_invalidate_head: false, latest_valid_ancestor: latest_valid_hash, }) @@ -197,10 +206,11 @@ async fn notify_new_payload( warn!( ?validation_error, ?execution_block_hash, - root = ?block.tree_hash_root(), - graffiti = block.body().graffiti().as_utf8_lossy(), - proposer_index = block.proposer_index(), - slot = %block.slot(), + // TODO(gloas) are these other logs important? + root = ?beacon_block_root, + // graffiti = block.body().graffiti().as_utf8_lossy(), + // proposer_index = block.proposer_index(), + // slot = %block.slot(), method = "new_payload", "Invalid execution payload block hash" ); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 3b03395a665..21b32141a83 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -21,6 +21,7 @@ pub mod custody_context; pub mod data_availability_checker; pub mod data_column_verification; mod early_attester_cache; +pub mod envelope_times_cache; mod errors; pub mod events; pub mod execution_payload; @@ -43,6 +44,7 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; +pub mod payload_envelope_verification; pub mod pending_payload_envelopes; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 9de67ca93fa..775f5a3df02 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -21,6 +21,44 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &st pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str = "validator_monitor_attestation_simulator_source_attester_miss_total"; +/* +* Execution Payload Envelope Procsesing +*/ + +pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_requests_total", + "Count of payload envelopes submitted for processing", + ) +}); +pub static ENVELOPE_PROCESSING_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_successes_total", + "Count of payload envelopes processed without error", + ) +}); +pub static ENVELOPE_PROCESSING_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_seconds", + "Full runtime of payload envelope processing", + ) +}); +pub static ENVELOPE_PROCESSING_DB_WRITE: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_db_write_seconds", + "Time spent writing a newly processed payload envelope and state to DB", + ) +}); +pub static ENVELOPE_PROCESSING_POST_EXEC_PROCESSING: LazyLock> = + LazyLock::new(|| { + try_create_histogram_with_buckets( + "payload_envelope_processing_post_exec_pre_attestable_seconds", + "Time between finishing execution processing and the payload envelope + becoming attestable", + linear_buckets(0.01, 0.01, 15), + ) + }); + /* * Block Processing */ diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs new file mode 100644 index 00000000000..504a1d2c705 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -0,0 +1,318 @@ +use std::sync::Arc; + +use educe::Educe; +use slot_clock::SlotClock; +use state_processing::{ + VerifySignatures, + envelope_processing::{VerifyStateRoot, process_execution_payload_envelope}, +}; +use tracing::{Span, debug}; +use types::{ + EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, + consts::gloas::BUILDER_INDEX_SELF_BUILD, +}; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, + PayloadVerificationOutcome, + payload_envelope_verification::{ + EnvelopeError, EnvelopeImportData, EnvelopeProcessingSnapshot, ExecutionPendingEnvelope, + IntoExecutionPendingEnvelope, MaybeAvailableEnvelope, load_snapshot, + payload_notifier::PayloadNotifier, + }, +}; + +/// A wrapper around a `SignedExecutionPayloadEnvelope` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Educe)] +#[educe(Debug(bound = "T: BeaconChainTypes"))] +pub struct GossipVerifiedEnvelope { + pub signed_envelope: Arc>, + pub block: Arc>, + pub snapshot: Option>>, +} + +impl GossipVerifiedEnvelope { + pub fn new( + signed_envelope: Arc>, + chain: &BeaconChain, + ) -> Result { + let envelope = &signed_envelope.message; + let payload = &envelope.payload; + let beacon_block_root = envelope.beacon_block_root; + + // Check that we've seen the beacon block for this envelope and that it passes validation. + // TODO(EIP-7732): We might need some type of status table in order to differentiate between: + // + // 1. Blocks we haven't seen (IGNORE), and + // 2. Blocks we've seen that are invalid (REJECT). + // + // Presently these two cases are conflated. + let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock(); + let Some(proto_block) = fork_choice_read_lock.get_block(&beacon_block_root) else { + return Err(EnvelopeError::BlockRootUnknown { + block_root: beacon_block_root, + }); + }; + + let latest_finalized_slot = fork_choice_read_lock + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + drop(fork_choice_read_lock); + + // TODO(EIP-7732): check that we haven't seen another valid `SignedExecutionPayloadEnvelope` + // for this block root from this builder - envelope status table check + let block = chain + .get_full_block(&beacon_block_root)? + .ok_or_else(|| { + EnvelopeError::from(BeaconChainError::MissingBeaconBlock(beacon_block_root)) + }) + .map(Arc::new)?; + let execution_bid = &block + .message() + .body() + .signed_execution_payload_bid()? + .message; + + // check that the envelopes slot isnt from a slot prior + // to the latest finalized slot. + if envelope.slot < latest_finalized_slot { + return Err(EnvelopeError::PriorToFinalization { + payload_slot: envelope.slot, + latest_finalized_slot, + }); + } + + // check that the slot of the envelope matches the slot of the parent block + if envelope.slot != block.slot() { + return Err(EnvelopeError::SlotMismatch { + block: block.slot(), + envelope: envelope.slot, + }); + } + + // builder index matches committed bid + if envelope.builder_index != execution_bid.builder_index { + return Err(EnvelopeError::BuilderIndexMismatch { + committed_bid: execution_bid.builder_index, + envelope: envelope.builder_index, + }); + } + + // the block hash should match the block hash of the execution bid + if payload.block_hash != execution_bid.block_hash { + return Err(EnvelopeError::BlockHashMismatch { + committed_bid: execution_bid.block_hash, + envelope: payload.block_hash, + }); + } + + // Verify the envelope signature. + // + // For self-build envelopes, we can use the proposer cache for the fork and the + // validator pubkey cache for the proposer's pubkey, avoiding a state load from disk. + // For external builder envelopes, we must load the state to access the builder registry. + let builder_index = envelope.builder_index; + let block_slot = envelope.slot; + let block_epoch = block_slot.epoch(T::EthSpec::slots_per_epoch()); + let proposer_shuffling_decision_block = + proto_block.proposer_shuffling_root_for_child_block(block_epoch, &chain.spec); + + let (signature_is_valid, opt_snapshot) = if builder_index == BUILDER_INDEX_SELF_BUILD { + // Fast path: self-build envelopes can be verified without loading the state. + let envelope_ref = signed_envelope.as_ref(); + let mut opt_snapshot = None; + let proposer = chain.with_proposer_cache::<_, EnvelopeError>( + proposer_shuffling_decision_block, + block_epoch, + |proposers| proposers.get_slot::(block_slot), + || { + debug!( + %beacon_block_root, + "Proposer shuffling cache miss for envelope verification" + ); + let snapshot = load_snapshot(envelope_ref, chain)?; + opt_snapshot = Some(Box::new(snapshot.clone())); + Ok((snapshot.state_root, snapshot.pre_state)) + }, + )?; + let fork = proposer.fork; + + let pubkey_cache = chain.validator_pubkey_cache.read(); + let pubkey = pubkey_cache + .get(block.message().proposer_index() as usize) + .ok_or_else(|| EnvelopeError::UnknownValidator { + builder_index: block.message().proposer_index(), + })?; + let is_valid = signed_envelope.verify_signature( + pubkey, + &fork, + chain.genesis_validators_root, + &chain.spec, + ); + (is_valid, opt_snapshot) + } else { + // TODO(gloas) if we implement a builder pubkey cache, we'll need to use it here. + // External builder: must load the state to get the builder pubkey. + let snapshot = load_snapshot(signed_envelope.as_ref(), chain)?; + let is_valid = + signed_envelope.verify_signature_with_state(&snapshot.pre_state, &chain.spec)?; + (is_valid, Some(Box::new(snapshot))) + }; + + if !signature_is_valid { + return Err(EnvelopeError::BadSignature); + } + + Ok(Self { + signed_envelope, + block, + snapshot: opt_snapshot, + }) + } + + pub fn envelope_cloned(&self) -> Arc> { + self.signed_envelope.clone() + } +} + +impl IntoExecutionPendingEnvelope for GossipVerifiedEnvelope { + fn into_execution_pending_envelope( + self, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result, EnvelopeError> { + let signed_envelope = self.signed_envelope; + let envelope = &signed_envelope.message; + let payload = &envelope.payload; + + // Verify the execution payload is valid + let payload_notifier = PayloadNotifier::new( + chain.clone(), + signed_envelope.clone(), + self.block.clone(), + notify_execution_layer, + )?; + let block_root = envelope.beacon_block_root; + let slot = self.block.slot(); + + let payload_verification_future = async move { + let chain = payload_notifier.chain.clone(); + if let Some(started_execution) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_started_execution(block_root, slot, started_execution); + } + + let payload_verification_status = payload_notifier.notify_new_payload().await?; + Ok(PayloadVerificationOutcome { + payload_verification_status, + // This fork is after the merge so it'll never be the merge transition block + is_valid_merge_transition_block: false, + }) + }; + // Spawn the payload verification future as a new task, but don't wait for it to complete. + // The `payload_verification_future` will be awaited later to ensure verification completed + // successfully. + let payload_verification_handle = chain + .task_executor + .spawn_handle( + payload_verification_future, + "execution_payload_verification", + ) + .ok_or(BeaconChainError::RuntimeShutdown)?; + + let snapshot = if let Some(snapshot) = self.snapshot { + *snapshot + } else { + load_snapshot(signed_envelope.as_ref(), chain)? + }; + let mut state = snapshot.pre_state; + + // All the state modifications are done in envelope_processing + process_execution_payload_envelope( + &mut state, + Some(snapshot.state_root), + &signed_envelope, + // verify signature already done for GossipVerifiedEnvelope + VerifySignatures::False, + VerifyStateRoot::True, + &chain.spec, + )?; + + Ok(ExecutionPendingEnvelope { + signed_envelope: MaybeAvailableEnvelope::AvailabilityPending { + block_hash: payload.block_hash, + envelope: signed_envelope, + }, + import_data: EnvelopeImportData { + block_root, + block: self.block, + post_state: Box::new(state), + }, + payload_verification_handle, + }) + } + + fn envelope(&self) -> &Arc> { + &self.signed_envelope + } +} + +impl BeaconChain { + /// Returns `Ok(GossipVerifiedEnvelope)` if the supplied `envelope` should be forwarded onto the + /// gossip network. The envelope is not imported into the chain, it is just partially verified. + /// + /// The returned `GossipVerifiedEnvelope` should be provided to `Self::process_execution_payload_envelope` immediately + /// after it is returned, unless some other circumstance decides it should not be imported at + /// all. + /// + /// ## Errors + /// + /// Returns an `Err` if the given envelope was invalid, or an error was encountered during + pub async fn verify_envelope_for_gossip( + self: &Arc, + envelope: Arc>, + ) -> Result, EnvelopeError> { + let chain = self.clone(); + let span = Span::current(); + self.task_executor + .clone() + .spawn_blocking_handle( + move || { + let _guard = span.enter(); + let slot = envelope.slot(); + let beacon_block_root = envelope.message.beacon_block_root; + + match GossipVerifiedEnvelope::new(envelope, &chain) { + Ok(verified) => { + debug!( + %slot, + ?beacon_block_root, + "Successfully verified gossip envelope" + ); + + Ok(verified) + } + Err(e) => { + debug!( + error = e.to_string(), + ?beacon_block_root, + %slot, + "Rejected gossip envelope" + ); + + Err(e) + } + } + }, + "gossip_envelope_verification_handle", + ) + .ok_or(BeaconChainError::RuntimeShutdown)? + .await + .map_err(BeaconChainError::TokioJoin)? + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs new file mode 100644 index 00000000000..603e14446a6 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -0,0 +1,390 @@ +use std::sync::Arc; +use std::time::Duration; + +use fork_choice::PayloadVerificationStatus; +use logging::crit; +use slot_clock::SlotClock; +use store::StoreOp; +use tracing::{debug, error, info, info_span, instrument, warn}; +use types::{BeaconState, BlockImportSource, Hash256, SignedBeaconBlock, Slot}; + +use super::{ + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, + ExecutedEnvelope, IntoExecutionPendingEnvelope, +}; +use crate::{ + AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, + NotifyExecutionLayer, + block_verification_types::{AsBlock, AvailableBlockData}, + metrics, + payload_envelope_verification::ExecutionPendingEnvelope, + validator_monitor::{get_slot_delay_ms, timestamp_now}, +}; + +impl BeaconChain { + /// Returns `Ok(block_root)` if the given `unverified_envelope` was successfully verified and + /// imported into the chain. + /// + /// Items that implement `IntoExecutionPendingEnvelope` include: + /// + /// - `GossipVerifiedEnvelope` + /// + /// ## Errors + /// + /// Returns an `Err` if the given block was invalid, or an error was encountered during + /// verification. + #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] + pub async fn process_execution_payload_envelope>( + self: &Arc, + block_root: Hash256, + unverified_envelope: P, + notify_execution_layer: NotifyExecutionLayer, + block_source: BlockImportSource, + publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, + ) -> Result { + let block_slot = unverified_envelope.envelope().slot(); + + // Set observed time if not already set. Usually this should be set by gossip or RPC, + // but just in case we set it again here (useful for tests). + if let Some(seen_timestamp) = self.slot_clock.now_duration() { + self.envelope_times_cache.write().set_time_observed( + block_root, + block_slot, + seen_timestamp, + None, + ); + } + + // TODO(gloas) insert the pre-executed envelope into some type of cache. + + let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); + + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS); + + // A small closure to group the verification and import errors. + let chain = self.clone(); + let import_envelope = async move { + let execution_pending = unverified_envelope + .into_execution_pending_envelope(&chain, notify_execution_layer)?; + publish_fn()?; + + // Record the time it took to complete consensus verification. + if let Some(timestamp) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_consensus_verified(block_root, block_slot, timestamp); + } + + let envelope_times_cache = chain.envelope_times_cache.clone(); + let slot_clock = chain.slot_clock.clone(); + + let executed_envelope = chain + .into_executed_payload_envelope(execution_pending) + .await + .inspect_err(|_| { + // TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline), + // and we keep it in the cache, then the node will NOT perform lookup and + // reprocess this block until the block is evicted from DA checker, causing the + // chain to get stuck temporarily if the block is canonical. Therefore we remove + // it from the cache if execution fails. + })?; + + // Record the time it took to wait for execution layer verification. + if let Some(timestamp) = slot_clock.now_duration() { + envelope_times_cache + .write() + .set_time_executed(block_root, block_slot, timestamp); + } + + match executed_envelope { + ExecutedEnvelope::Available(envelope) => { + self.import_available_execution_payload_envelope(Box::new(envelope)) + .await + } + ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( + "Pending payload envelope not yet implemented".to_owned(), + )), + } + }; + + // Verify and import the block. + match import_envelope.await { + // The block was successfully verified and imported. Yay. + Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { + info!( + ?block_root, + %block_slot, + source = %block_source, + "Execution payload envelope imported" + ); + + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES); + + Ok(status) + } + Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + debug!(?block_root, %slot, "Beacon block awaiting blobs"); + + Ok(status) + } + Err(EnvelopeError::BeaconChainError(e)) => { + match e.as_ref() { + BeaconChainError::TokioJoin(e) => { + debug!( + error = ?e, + "Envelope processing cancelled" + ); + } + _ => { + // There was an error whilst attempting to verify and import the block. The block might + // be partially verified or partially imported. + crit!( + error = ?e, + "Envelope processing error" + ); + } + }; + Err(EnvelopeError::BeaconChainError(e)) + } + // The block failed verification. + Err(other) => { + warn!( + reason = other.to_string(), + "Execution payload envelope rejected" + ); + Err(other) + } + } + } + + /// Accepts a fully-verified payload envelope and awaits on its payload verification handle to + /// get a fully `ExecutedEnvelope`. + /// + /// An error is returned if the verification handle couldn't be awaited. + #[instrument(skip_all, level = "debug")] + pub async fn into_executed_payload_envelope( + self: Arc, + pending_envelope: ExecutionPendingEnvelope, + ) -> Result, EnvelopeError> { + let ExecutionPendingEnvelope { + signed_envelope, + import_data, + payload_verification_handle, + } = pending_envelope; + + let payload_verification_outcome = payload_verification_handle + .await + .map_err(BeaconChainError::TokioJoin)? + .ok_or(BeaconChainError::RuntimeShutdown)??; + + Ok(ExecutedEnvelope::new( + signed_envelope, + import_data, + payload_verification_outcome, + )) + } + + #[instrument(skip_all)] + pub async fn import_available_execution_payload_envelope( + self: &Arc, + envelope: Box>, + ) -> Result { + let AvailableExecutedEnvelope { + envelope, + import_data, + payload_verification_outcome, + } = *envelope; + + let EnvelopeImportData { + block_root, + block, + post_state, + } = import_data; + + let block_root = { + // Capture the current span before moving into the blocking task + let current_span = tracing::Span::current(); + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + // Enter the captured span in the blocking thread + let _guard = current_span.enter(); + chain.import_execution_payload_envelope( + envelope, + block_root, + *post_state, + payload_verification_outcome.payload_verification_status, + block, + ) + }, + "payload_verification_handle", + ) + .await?? + }; + + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + + /// Accepts a fully-verified and available envelope and imports it into the chain without performing any + /// additional verification. + /// + /// An error is returned if the envelope was unable to be imported. It may be partially imported + /// (i.e., this function is not atomic). + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + fn import_execution_payload_envelope( + &self, + signed_envelope: AvailableEnvelope, + block_root: Hash256, + state: BeaconState, + _payload_verification_status: PayloadVerificationStatus, + parent_block: Arc>, + ) -> Result { + // ----------------------------- ENVELOPE NOT YET ATTESTABLE ---------------------------------- + // Everything in this initial section is on the hot path between processing the envelope and + // being able to attest to it. DO NOT add any extra processing in this initial section + // unless it must run before fork choice. + // ----------------------------------------------------------------------------------------- + + let post_exec_timer = + metrics::start_timer(&metrics::ENVELOPE_PROCESSING_POST_EXEC_PROCESSING); + + // Check the payloads parent block against weak subjectivity checkpoint. + self.check_block_against_weak_subjectivity_checkpoint( + parent_block.message(), + block_root, + &state, + )?; + + // Take an upgradable read lock on fork choice so we can check if this block has already + // been imported. We don't want to repeat work importing a block that is already imported. + let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); + if !fork_choice_reader.contains_block(&block_root) { + return Err(EnvelopeError::BlockRootUnknown { block_root }); + } + + // TODO(gloas) no fork choice logic yet + // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by + // avoiding taking other locks whilst holding this lock. + // let fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); + + // TODO(gloas) Do we need this check? Do not import a block that doesn't descend from the finalized root. + // let signed_block = check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?; + + // TODO(gloas) Do we want to use an early attester cache like mechanism for payload enevelopes? + // TODO(gloas) emit SSE event if the payload became the new head payload + drop(post_exec_timer); + + // ---------------------------- ENVELOPE PROBABLY ATTESTABLE ---------------------------------- + // It is important NOT to return errors here before the database commit, because the envelope + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. + // ----------------------------------------------------------------------------------------- + + // Store the envelope and its state, and execute the confirmation batch for the intermediate + // states, which will delete their temporary flags. + // If the write fails, revert fork choice to the version from disk, else we can + // end up with envelopes in fork choice that are missing from disk. + // See https://github.com/sigp/lighthouse/issues/2028 + let (signed_envelope, columns) = signed_envelope.deconstruct(); + + let mut ops = vec![]; + + match self.get_blobs_or_columns_store_op( + block_root, + signed_envelope.slot(), + AvailableBlockData::DataColumns(columns), + ) { + Ok(Some(blobs_or_columns_store_op)) => { + ops.push(blobs_or_columns_store_op); + } + Ok(None) => {} + Err(e) => { + error!( + msg = "Restoring fork choice from disk", + error = &e, + ?block_root, + "Failed to store data columns into the database" + ); + // TODO(gloas) implement failed write handling to fork choice + // let _ = self.handle_import_block_db_write_error(fork_choice); + return Err(EnvelopeError::InternalError(e)); + } + } + + let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE); + + ops.push(StoreOp::PutPayloadEnvelope( + block_root, + signed_envelope.clone(), + )); + ops.push(StoreOp::PutState( + signed_envelope.message.state_root, + &state, + )); + + let db_span = info_span!("persist_payloads_and_blobs").entered(); + + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { + error!( + msg = "Restoring fork choice from disk", + error = ?e, + "Database write failed!" + ); + // TODO(gloas) handle db write failure + // return Err(self + // .handle_import_block_db_write_error(fork_choice) + // .err() + // .unwrap_or(e.into())); + } + + drop(db_span); + + // TODO(gloas) drop fork choice lock + // The fork choice write-lock is dropped *after* the on-disk database has been updated. + // This prevents inconsistency between the two at the expense of concurrency. + // drop(fork_choice); + + // We're declaring the envelope "imported" at this point, since fork choice and the DB know + // about it. + let envelope_time_imported = timestamp_now(); + + // TODO(gloas) depending on what happens with light clients + // we might need to do some light client related computations here + + metrics::stop_timer(db_write_timer); + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES); + + self.import_envelope_update_metrics_and_events( + block_root, + signed_envelope.slot(), + envelope_time_imported, + ); + + Ok(block_root) + } + + fn import_envelope_update_metrics_and_events( + &self, + block_root: Hash256, + envelope_slot: Slot, + envelope_time_imported: Duration, + ) { + let envelope_delay_total = + get_slot_delay_ms(envelope_time_imported, envelope_slot, &self.slot_clock); + + // Do not write to the cache for envelopes older than 2 epochs, this helps reduce writes + // to the cache during sync. + if envelope_delay_total < self.slot_clock.slot_duration().saturating_mul(64) { + self.envelope_times_cache.write().set_time_imported( + block_root, + envelope_slot, + envelope_time_imported, + ); + } + + // TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks). + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs new file mode 100644 index 00000000000..80e62f93b71 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -0,0 +1,352 @@ +//! The incremental processing steps (e.g., signatures verified but not the state transition) is +//! represented as a sequence of wrapper-types around the block. There is a linear progression of +//! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see +//! diagram below). +//! +//! // TODO(gloas) we might want to update this diagram to include `AvailabelExecutedEnvelope` +//! ```ignore +//! START +//! | +//! ▼ +//! SignedExecutionPayloadEnvelope +//! | +//! |--------------- +//! | | +//! | ▼ +//! | GossipVerifiedEnvelope +//! | | +//! |--------------- +//! | +//! ▼ +//! ExecutionPendingEnvelope +//! | +//! await +//! | +//! ▼ +//! END +//! +//! ``` + +use std::sync::Arc; + +use store::Error as DBError; + +use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError}; +use tracing::instrument; +use types::{ + BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, + ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, +}; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError, + NotifyExecutionLayer, PayloadVerificationOutcome, + block_verification::PayloadVerificationHandle, + payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, +}; + +pub mod gossip_verified_envelope; +pub mod import; +mod payload_notifier; + +pub trait IntoExecutionPendingEnvelope: Sized { + fn into_execution_pending_envelope( + self, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result, EnvelopeError>; + + fn envelope(&self) -> &Arc>; +} + +pub struct ExecutionPendingEnvelope { + pub signed_envelope: MaybeAvailableEnvelope, + pub import_data: EnvelopeImportData, + pub payload_verification_handle: PayloadVerificationHandle, +} + +#[derive(PartialEq)] +pub struct EnvelopeImportData { + pub block_root: Hash256, + pub block: Arc>, + pub post_state: Box>, +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct AvailableEnvelope { + execution_block_hash: ExecutionBlockHash, + envelope: Arc>, + columns: DataColumnSidecarList, + /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). + columns_available_timestamp: Option, + pub spec: Arc, +} + +impl AvailableEnvelope { + pub fn message(&self) -> &ExecutionPayloadEnvelope { + &self.envelope.message + } + + #[allow(clippy::type_complexity)] + pub fn deconstruct( + self, + ) -> ( + Arc>, + DataColumnSidecarList, + ) { + let AvailableEnvelope { + envelope, columns, .. + } = self; + (envelope, columns) + } +} + +pub enum MaybeAvailableEnvelope { + Available(AvailableEnvelope), + AvailabilityPending { + block_hash: ExecutionBlockHash, + envelope: Arc>, + }, +} + +/// This snapshot is to be used for verifying a envelope of the block. +#[derive(Debug, Clone)] +pub struct EnvelopeProcessingSnapshot { + /// This state is equivalent to the `self.beacon_block.state_root()` before applying the envelope. + pub pre_state: BeaconState, + pub state_root: Hash256, + pub beacon_block_root: Hash256, +} + +/// A payload envelope that has gone through processing checks and execution by an EL client. +/// This envelope hasn't necessarily completed data availability checks. +/// +/// +/// It contains 2 variants: +/// 1. `Available`: This enelope has been executed and also contains all data to consider it +/// fully available. +/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it +/// fully available. +pub enum ExecutedEnvelope { + Available(AvailableExecutedEnvelope), + // TODO(gloas) implement availability pending + AvailabilityPending(), +} + +impl ExecutedEnvelope { + pub fn new( + envelope: MaybeAvailableEnvelope, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + match envelope { + MaybeAvailableEnvelope::Available(available_envelope) => { + Self::Available(AvailableExecutedEnvelope::new( + available_envelope, + import_data, + payload_verification_outcome, + )) + } + // TODO(gloas) implement availability pending + MaybeAvailableEnvelope::AvailabilityPending { + block_hash: _, + envelope: _, + } => Self::AvailabilityPending(), + } + } +} + +/// A payload envelope that has completed all payload processing checks including verification +/// by an EL client **and** has all requisite blob data to be imported into fork choice. +pub struct AvailableExecutedEnvelope { + pub envelope: AvailableEnvelope, + pub import_data: EnvelopeImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailableExecutedEnvelope { + pub fn new( + envelope: AvailableEnvelope, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + envelope, + import_data, + payload_verification_outcome, + } + } +} + +#[derive(Debug)] +pub enum EnvelopeError { + /// The envelope's block root is unknown. + BlockRootUnknown { + block_root: Hash256, + }, + /// The signature is invalid. + BadSignature, + /// The builder index doesn't match the committed bid + BuilderIndexMismatch { + committed_bid: u64, + envelope: u64, + }, + // The envelope slot doesn't match the block + SlotMismatch { + block: Slot, + envelope: Slot, + }, + // The validator index is unknown + UnknownValidator { + builder_index: u64, + }, + // The block hash doesn't match the committed bid + BlockHashMismatch { + committed_bid: ExecutionBlockHash, + envelope: ExecutionBlockHash, + }, + // The slot belongs to a block that is from a slot prior than + // the most recently finalized slot + PriorToFinalization { + payload_slot: Slot, + latest_finalized_slot: Slot, + }, + // Some Beacon Chain Error + BeaconChainError(Arc), + // Some Beacon State error + BeaconStateError(BeaconStateError), + // Some BlockProcessingError (for electra operations) + BlockProcessingError(BlockProcessingError), + // Some EnvelopeProcessingError + EnvelopeProcessingError(EnvelopeProcessingError), + // Error verifying the execution payload + ExecutionPayloadError(ExecutionPayloadError), + // An error from block-level checks reused during envelope import + BlockError(BlockError), + // Internal error + InternalError(String), +} + +impl std::fmt::Display for EnvelopeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for EnvelopeError { + fn from(e: BeaconChainError) -> Self { + EnvelopeError::BeaconChainError(Arc::new(e)) + } +} + +impl From for EnvelopeError { + fn from(e: ExecutionPayloadError) -> Self { + EnvelopeError::ExecutionPayloadError(e) + } +} + +impl From for EnvelopeError { + fn from(e: BeaconStateError) -> Self { + EnvelopeError::BeaconStateError(e) + } +} + +impl From for EnvelopeError { + fn from(e: DBError) -> Self { + EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::DBError(e))) + } +} + +impl From for EnvelopeError { + fn from(e: BlockError) -> Self { + EnvelopeError::BlockError(e) + } +} + +/// Pull errors up from EnvelopeProcessingError to EnvelopeError +impl From for EnvelopeError { + fn from(e: EnvelopeProcessingError) -> Self { + match e { + EnvelopeProcessingError::BadSignature => EnvelopeError::BadSignature, + EnvelopeProcessingError::BeaconStateError(e) => EnvelopeError::BeaconStateError(e), + EnvelopeProcessingError::BlockHashMismatch { + committed_bid, + envelope, + } => EnvelopeError::BlockHashMismatch { + committed_bid, + envelope, + }, + EnvelopeProcessingError::BlockProcessingError(e) => { + EnvelopeError::BlockProcessingError(e) + } + e => EnvelopeError::EnvelopeProcessingError(e), + } + } +} + +#[allow(clippy::type_complexity)] +#[instrument(skip_all, level = "debug", fields(beacon_block_root = %envelope.beacon_block_root()))] +pub(crate) fn load_snapshot( + envelope: &SignedExecutionPayloadEnvelope, + chain: &BeaconChain, +) -> Result, EnvelopeError> { + // Reject any envelope if its block is not known to fork choice. + // + // A block that is not in fork choice is either: + // + // - Not yet imported: we should reject this envelope because we should only import it after its parent block + // has been fully imported. + // - Pre-finalized: if the parent block is _prior_ to finalization, we should ignore the envelope + // because it will revert finalization. Note that the finalized block is stored in fork + // choice, so we will not reject any child of the finalized block (this is relevant during + // genesis). + + let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock(); + let beacon_block_root = envelope.beacon_block_root(); + let Some(proto_beacon_block) = fork_choice_read_lock.get_block(&beacon_block_root) else { + return Err(EnvelopeError::BlockRootUnknown { + block_root: beacon_block_root, + }); + }; + drop(fork_choice_read_lock); + + // TODO(EIP-7732): add metrics here + + let block_state_root = proto_beacon_block.state_root; + // We can use `get_hot_state` here rather than `get_advanced_hot_state` because the envelope + // must be from the same slot as its block (so no advance is required). + let cache_state = true; + let state = chain + .store + .get_hot_state(&block_state_root, cache_state) + .map_err(EnvelopeError::from)? + .ok_or_else(|| { + BeaconChainError::DBInconsistent(format!( + "Missing state for envelope block {block_state_root:?}", + )) + })?; + + Ok(EnvelopeProcessingSnapshot { + pre_state: state, + state_root: block_state_root, + beacon_block_root, + }) +} + +impl IntoExecutionPendingEnvelope + for Arc> +{ + fn into_execution_pending_envelope( + self, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result, EnvelopeError> { + GossipVerifiedEnvelope::new(self, chain)? + .into_execution_pending_envelope(chain, notify_execution_layer) + } + + fn envelope(&self) -> &Arc> { + self + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs new file mode 100644 index 00000000000..5b1f332b5ad --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -0,0 +1,93 @@ +use std::sync::Arc; + +use execution_layer::{NewPayloadRequest, NewPayloadRequestGloas}; +use fork_choice::PayloadVerificationStatus; +use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; +use tracing::warn; +use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope}; + +use crate::{ + BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, + execution_payload::notify_new_payload, payload_envelope_verification::EnvelopeError, +}; + +/// Used to await the result of executing payload with a remote EE. +pub struct PayloadNotifier { + pub chain: Arc>, + envelope: Arc>, + block: Arc>, + payload_verification_status: Option, +} + +impl PayloadNotifier { + pub fn new( + chain: Arc>, + envelope: Arc>, + block: Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result { + let payload_verification_status = { + let payload_message = &envelope.message; + + match notify_execution_layer { + NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => { + let new_payload_request = Self::build_new_payload_request(&envelope, &block)?; + if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() { + warn!( + block_number = ?payload_message.payload.block_number, + info = "you can silence this warning with --disable-optimistic-finalized-sync", + error = ?e, + "Falling back to slow block hash verification" + ); + None + } else { + Some(PayloadVerificationStatus::Optimistic) + } + } + _ => None, + } + }; + + Ok(Self { + chain, + envelope, + block, + payload_verification_status, + }) + } + + pub async fn notify_new_payload(self) -> Result { + if let Some(precomputed_status) = self.payload_verification_status { + Ok(precomputed_status) + } else { + let block_root = self.envelope.message.beacon_block_root; + let request = Self::build_new_payload_request(&self.envelope, &self.block)?; + notify_new_payload(&self.chain, block_root, request).await + } + } + + fn build_new_payload_request<'a>( + envelope: &'a SignedExecutionPayloadEnvelope, + block: &'a SignedBeaconBlock, + ) -> Result, BlockError> { + let bid = &block + .message() + .body() + .signed_execution_payload_bid() + .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? + .message; + + let versioned_hashes = bid + .blob_kzg_commitments + .iter() + .map(kzg_commitment_to_versioned_hash) + .collect(); + + Ok(NewPayloadRequest::Gloas(NewPayloadRequestGloas { + execution_payload: &envelope.message.payload, + versioned_hashes, + parent_beacon_block_root: block.message().parent_root(), + execution_requests: &envelope.message.execution_requests, + })) + } +} diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index d214ea6b15e..779e1964959 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,5 +1,5 @@ #![cfg(not(debug_assertions))] - +// TODO(gloas) we probably need similar test for payload envelope verification use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 0016f66c01c..098d6d8324c 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -532,6 +532,16 @@ pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new ) }); +/* + * Execution Payload Envelope Delay Metrics + */ +pub static ENVELOPE_DELAY_GOSSIP: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "payload_envelope_delay_gossip", + "The first time we see this payload envelope from gossip as a delay from the start of the slot", + ) +}); + /* * Block Delay Metrics */ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index a9198f19436..70871d59364 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,7 +4,6 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::store::Error; @@ -19,6 +18,10 @@ use beacon_chain::{ sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; +use beacon_chain::{ + blob_verification::{GossipBlobError, GossipVerifiedBlob}, + payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, +}; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use logging::crit; @@ -3243,25 +3246,177 @@ impl NetworkBeaconProcessor { } } - pub async fn process_gossip_execution_payload( + pub async fn process_gossip_execution_payload_envelope( + self: Arc, + message_id: MessageId, + peer_id: PeerId, + envelope: Arc>, + seen_timestamp: Duration, + ) { + if let Some(gossip_verified_envelope) = self + .process_gossip_unverified_execution_payload_envelope( + message_id, + peer_id, + envelope.clone(), + seen_timestamp, + ) + .await + { + let beacon_block_root = gossip_verified_envelope.signed_envelope.beacon_block_root(); + + Span::current().record("beacon_block_root", beacon_block_root.to_string()); + + // TODO(gloas) in process_gossip_block here we check_and_insert on the duplicate cache + // before calling gossip_verified_block + + self.process_gossip_verified_execution_payload_envelope( + peer_id, + gossip_verified_envelope, + ) + .await; + } + } + + async fn process_gossip_unverified_execution_payload_envelope( self: &Arc, message_id: MessageId, peer_id: PeerId, - execution_payload: SignedExecutionPayloadEnvelope, + envelope: Arc>, + seen_duration: Duration, + ) -> Option> { + let envelope_delay = + get_slot_delay_ms(seen_duration, envelope.slot(), &self.chain.slot_clock); + + let verification_result = self + .chain + .clone() + .verify_envelope_for_gossip(envelope.clone()) + .await; + + let verified_envelope = match verification_result { + Ok(verified_envelope) => { + metrics::set_gauge( + &metrics::ENVELOPE_DELAY_GOSSIP, + envelope_delay.as_millis() as i64, + ); + + // Write the time the envelope was observed into the delay cache. + self.chain.envelope_times_cache.write().set_time_observed( + verified_envelope.signed_envelope.beacon_block_root(), + envelope.slot(), + seen_duration, + Some(peer_id.to_string()), + ); + + info!( + slot = %verified_envelope.signed_envelope.slot(), + root = ?verified_envelope.signed_envelope.beacon_block_root(), + "New envelope received" + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + verified_envelope + } + // TODO(gloas) penalize peers accordingly + Err(_) => return None, + }; + + // TODO(gloas) do we need to register the payload with monitored validators? + + let envelope_slot = verified_envelope.signed_envelope.slot(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); + match self.chain.slot() { + // We only need to do a simple check about the envelope slot vs the current slot beacuse + // `verify_envelope_for_gossip` already ensuresthat the envelope slot is within tolerance + // for envelope imports. + Ok(current_slot) if envelope_slot > current_slot => { + warn!( + ?envelope_slot, + ?beacon_block_root, + msg = "if this happens consistently, check system clock", + "envelope arrived early" + ); + + // TODO(gloas) update metrics to note how early the envelope arrived + + let inner_self = self.clone(); + let _process_fn = Box::pin(async move { + inner_self + .process_gossip_verified_execution_payload_envelope( + peer_id, + verified_envelope, + ) + .await; + }); + + // TODO(gloas) send to reprocess queue + None + } + Ok(_) => Some(verified_envelope), + Err(e) => { + error!( + error = ?e, + %envelope_slot, + ?beacon_block_root, + location = "envelope gossip", + "Failed to defer envelope import" + ); + None + } + } + } + + async fn process_gossip_verified_execution_payload_envelope( + self: Arc, + peer_id: PeerId, + verified_envelope: GossipVerifiedEnvelope, ) { - // TODO(EIP-7732): Implement proper execution payload envelope gossip processing. - // This should integrate with the envelope_verification.rs module once it's implemented. + let _processing_start_time = Instant::now(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); - trace!( - %peer_id, - builder_index = execution_payload.message.builder_index, - slot = %execution_payload.message.slot, - beacon_block_root = %execution_payload.message.beacon_block_root, - "Processing execution payload envelope" - ); + let result = self + .chain + .process_execution_payload_envelope( + beacon_block_root, + verified_envelope, + NotifyExecutionLayer::Yes, + BlockImportSource::Gossip, + || Ok(()), + ) + .await; - // For now, ignore all envelopes since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + // TODO(gloas) metrics + // register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); + + match &result { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + // TODO(gloas) do we need to send a `PayloadImported` event to the reporcess queue? + debug!( + ?block_root, + %peer_id, + "Gossipsub envelope processed" + ); + + // TODO(gloas) do we need to recompute head? + // should canonical_head return the block and the payload now? + self.chain.recompute_head_at_current_slot().await; + + // TODO(gloas) metrics + } + Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + trace!( + %slot, + %block_root, + "Processed envelope, waiting for other components" + ) + } + + Err(_) => { + // TODO(gloas) implement peer penalties + warn!("process_gossip_verified_execution_payload_envelope_failed") + } + } } pub fn process_gossip_execution_payload_bid( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e1adf860de0..357d6c08fd2 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -429,11 +429,17 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, execution_payload: Box>, + seen_timestamp: Duration, ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { processor - .process_gossip_execution_payload(message_id, peer_id, *execution_payload) + .process_gossip_execution_payload_envelope( + message_id, + peer_id, + Arc::new(*execution_payload), + seen_timestamp, + ) .await }; diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 8373dec3227..77d64c92e67 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -493,6 +493,7 @@ impl Router { message_id, peer_id, signed_execution_payload_envelope, + timestamp_now(), ), ) }