From 3de5838aba9e4b7d0a9fbbb3d5d7f38dcd83def2 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 3 Jul 2025 16:11:04 -0700 Subject: [PATCH 01/11] Initial commit --- .../src/service/api_types.rs | 7 + .../src/sync/block_sidecar_coupling.rs | 212 +++++++++++++++--- .../network/src/sync/network_context.rs | 65 +++++- .../network/src/sync/range_sync/chain.rs | 13 ++ 4 files changed, 259 insertions(+), 38 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index b36f8cc2154..3013596f9f7 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,4 +1,5 @@ use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage}; +use libp2p::PeerId; use std::fmt::{Display, Formatter}; use std::sync::Arc; use types::{ @@ -61,6 +62,11 @@ pub struct DataColumnsByRangeRequestId { pub id: Id, /// The Id of the overall By Range request for block components. pub parent_request_id: ComponentsByRangeRequestId, + /// The peer id associated with the request. + /// + /// This is useful to penalize the peer at a later point if it returned data columns that + /// did not match with the verified block. + pub peer: PeerId, } /// Block components by range request for range sync. Includes an ID for downstream consumers to @@ -306,6 +312,7 @@ mod tests { batch_id: Epoch::new(0), }, }, + peer: PeerId::random(), }; assert_eq!(format!("{id}"), "123/122/RangeSync/0/54"); } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 0418ab45534..c0967bfb355 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,8 +1,11 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; -use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, +use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + }, + PeerAction, PeerId, }; use std::{collections::HashMap, sync::Arc}; use types::{ @@ -15,6 +18,8 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, + /// The column indices corresponding to the id + column_peers: HashMap>, } enum ByRangeRequest { @@ -34,19 +39,33 @@ enum RangeBlockDataRequest { }, } +#[derive(Debug)] +pub struct CouplingError { + pub(crate) msg: String, + pub(crate) peer_action: Option, + pub(crate) column_and_peer: Option>, +} + impl RangeBlockComponentsRequest { pub fn new( blocks_req_id: BlocksByRangeRequestId, blobs_req_id: Option, - data_columns: Option<(Vec, Vec)>, + data_columns: Option<( + Vec<(DataColumnsByRangeRequestId, Vec)>, + Vec, + )>, ) -> Self { + let mut column_peers = HashMap::new(); let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) } else if let Some((requests, expected_custody_columns)) = data_columns { RangeBlockDataRequest::DataColumns { requests: requests .into_iter() - .map(|id| (id, ByRangeRequest::Active(id))) + .map(|(id, indices)| { + column_peers.insert(id, indices); + (id, ByRangeRequest::Active(id)) + }) .collect(), expected_custody_columns, } @@ -57,6 +76,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, + column_peers, } } @@ -105,7 +125,29 @@ impl RangeBlockComponentsRequest { } } - pub fn responses(&self, spec: &ChainSpec) -> Option>, String>> { + pub fn remove_custody_columns( + &self, + req_id: DataColumnsByRangeRequestId, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::NoData => { + Err("received data columns but expected no data".to_owned()) + } + RangeBlockDataRequest::Blobs(_) => { + Err("received data columns but expected blobs".to_owned()) + } + RangeBlockDataRequest::DataColumns { + ref mut requests, .. + } => { + let req = requests + .get_mut(&req_id) + .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + req.remove_invalid_entries(id); + } + } + } + + pub fn responses(&self, spec: &ChainSpec) -> Option>, CouplingError>> { let Some(blocks) = self.blocks_request.to_finished() else { return None; }; @@ -129,6 +171,7 @@ impl RangeBlockComponentsRequest { expected_custody_columns, } => { let mut data_columns = vec![]; + let mut column_peers: HashMap = HashMap::new(); for req in requests.values() { let Some(data) = req.to_finished() else { return None; @@ -136,12 +179,37 @@ impl RangeBlockComponentsRequest { data_columns.extend(data.clone()) } - Some(Self::responses_with_custody_columns( + // Note: this assumes that only 1 peer is responsible for a column + // with a batch. + for (id, columns) in &self.column_peers { + for column in columns { + column_peers.insert(*column, id.peer); + } + } + + let resp = Self::responses_with_custody_columns( blocks.to_vec(), data_columns, + column_peers, expected_custody_columns, spec, - )) + ); + + if let Err(err) = resp { + if let Some(peers) = err.column_and_peer { + for (column, peer) in peers { + // find the req id associated with the peer and + // un-finish it + for (req_id, req) in requests.iter() { + if req_id.peer == peer { + req.remove_invalid_entries(*req_id); + } + } + } + } + } + + Some(resp) } } } @@ -150,7 +218,7 @@ impl RangeBlockComponentsRequest { blocks: Vec>>, blobs: Vec>>, spec: &ChainSpec, - ) -> Result>, String> { + ) -> Result>, CouplingError> { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(blocks.len()); @@ -165,17 +233,29 @@ impl RangeBlockComponentsRequest { .unwrap_or(false); pair_next_blob } { - blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?); + blob_list.push(blob_iter.next().ok_or_else(|| CouplingError { + msg: "Missing next blob".to_string(), + column_and_peer: None, + peer_action: None, + })?); } let mut blobs_buffer = vec![None; max_blobs_per_block]; for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { - return Err("Invalid blob index".to_string()); + return Err(CouplingError { + msg: "Invalid blob index".to_string(), + column_and_peer: None, + peer_action: None, + }); }; if blob_opt.is_some() { - return Err("Repeat blob index".to_string()); + return Err(CouplingError { + msg: "Repeat blob index".to_string(), + column_and_peer: None, + peer_action: None, + }); } else { *blob_opt = Some(blob); } @@ -184,13 +264,27 @@ impl RangeBlockComponentsRequest { blobs_buffer.into_iter().flatten().collect::>(), max_blobs_per_block, ) - .map_err(|_| "Blobs returned exceeds max length".to_string())?; - responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?) + .map_err(|_| CouplingError { + msg: "Blobs returned exceeds max length".to_string(), + column_and_peer: None, + peer_action: None, + })?; + responses.push( + RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError { + msg: format!("{e:?}"), + column_and_peer: None, + peer_action: None, + })?, + ) } // if accumulated sidecars is not empty, throw an error. if blob_iter.next().is_some() { - return Err("Received sidecars that don't pair well".to_string()); + return Err(CouplingError { + msg: "Received sidecars that don't pair well".to_string(), + column_and_peer: None, + peer_action: None, + }); } Ok(responses) @@ -199,9 +293,10 @@ impl RangeBlockComponentsRequest { fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, + column_peers: HashMap, expects_custody_columns: &[ColumnIndex], spec: &ChainSpec, - ) -> Result>, String> { + ) -> Result>, CouplingError> { // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -215,9 +310,11 @@ impl RangeBlockComponentsRequest { .insert(index, column) .is_some() { - return Err(format!( - "Repeated column block_root {block_root:?} index {index}" - )); + return Err(CouplingError { + msg: format!("Repeated column block_root {block_root:?} index {index}"), + column_and_peer: None, + peer_action: None, + }); } } @@ -235,30 +332,64 @@ impl RangeBlockComponentsRequest { // TODO(das): on the initial version of PeerDAS the beacon chain does not check // rpc custody requirements and dropping this check can allow the block to have // an inconsistent DB. - return Err(format!("No columns for block {block_root:?} with data")); + + // For now, we always assume that the block peer is right. + // This is potentially dangerous as we can get isolated on a chain with a + // malicious block peer. + // TODO: fix this by checking the proposer signature before downloading columns. + let responsible_peers = column_peers.values().cloned().collect(); + return Err(CouplingError { + msg: format!("No columns for block {block_root:?} with data"), + peer_action: Some(PeerAction::LowToleranceError), + column_and_peer: Some(responsible_peers), + }); }; let mut custody_columns = vec![]; + let mut naughty_peers = vec![]; for index in expects_custody_columns { - let Some(data_column) = data_columns_by_index.remove(index) else { - return Err(format!("No column for block {block_root:?} index {index}")); - }; - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. - custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); + if let Some(data_column) = data_columns_by_index.remove(index) { + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); + } else { + // Penalize the peer for claiming to have the columns but not returning + // them + let Some(responsible_peer) = column_peers.get(index) else { + return Err(( + None, + format!("Internal error, no request made for column {}", index), + )); + }; + naughty_peers.push((*responsible_peer, *index)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError { + msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), + peer_action: Some(PeerAction::LowToleranceError), + column_and_peer: Some(naughty_peers), + }); } // Assert that there are no columns left if !data_columns_by_index.is_empty() { let remaining_indices = data_columns_by_index.keys().collect::>(); - return Err(format!( - "Not all columns consumed for block {block_root:?}: {remaining_indices:?}" - )); + // log the error but don't return an error, we can still progress with extra columns. + tracing::error!( + ?block_root, + ?remaining_indices, + "Not all columns consumed for block" + ); } RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) - .map_err(|e| format!("{e:?}"))? + .map_err(|e| CouplingError { + msg: format!("{}", e), + column_and_peer: None, + peer_action: None, + })? } else { // Block has no data, expects zero columns RpcBlock::new_without_blobs(Some(block_root), block) @@ -268,7 +399,9 @@ impl RangeBlockComponentsRequest { // Assert that there are no columns left for other blocks if !data_columns_by_block.is_empty() { let remaining_roots = data_columns_by_block.keys().collect::>(); - return Err(format!("Not all columns consumed: {remaining_roots:?}")); + // log the error but don't return an error, we can still progress with responses. + // this is most likely an internal error with overrequesting or a client bug. + tracing::error!(?remaining_roots, "Not all columns consumed for block"); } Ok(rpc_blocks) @@ -289,6 +422,13 @@ impl ByRangeRequest { } } + fn remove_invalid_entries(&mut self, id: I) { + match self { + Self::Complete(_) => *self = Self::Active(id), + Self::Active(_) => {} + } + } + fn to_finished(&self) -> Option<&T> { match self { Self::Active(_) => None, @@ -303,9 +443,12 @@ mod tests { use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; - use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, Id, RangeRequestId, + use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, Id, RangeRequestId, + }, + PeerId, }; use rand::SeedableRng; use std::sync::Arc; @@ -342,6 +485,7 @@ mod tests { DataColumnsByRangeRequestId { id, parent_request_id, + peer: PeerId::random(), } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d0e62e4ada7..6164ee0fdff 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -14,6 +14,7 @@ use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; @@ -81,7 +82,7 @@ pub enum RpcResponseError { RpcError(#[allow(dead_code)] RPCError), VerifyError(LookupVerifyError), CustodyRequestError(#[allow(dead_code)] CustodyRequestError), - BlockComponentCouplingError(#[allow(dead_code)] String), + BlockComponentCouplingError(CouplingError), } #[derive(Debug, PartialEq, Eq)] @@ -441,6 +442,54 @@ impl SyncNetworkContext { active_request_count_by_peer } + pub fn retry_columns_by_range( + &mut self, + requester: RangeRequestId, + request_id: Id, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + failed_columns: HashSet, + ) -> Result<(), String> { + let active_request_count_by_peer = self.active_request_count_by_peer(); + // Attempt to find all required custody peers to request the failed columns from + let columns_by_range_peers_to_request = self.select_columns_by_range_peers_to_request( + &failed_columns, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?; + + // Reuse the id for the request that received partially correct responses + let id = ComponentsByRangeRequestId { + id: request_id, + requester, + }; + + let data_column_requests = columns_by_range_peers_to_request + .into_iter() + .map(|(peer_id, columns)| { + self.send_data_columns_by_range_request( + peer_id, + DataColumnsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + columns, + }, + id, + ) + }) + .collect::, _>>()?; + + // instead of creating a new `RangeBlockComponentsRequest`, we reinsert + // the new requests created for the failed requests + + let Some(range_request) = self.components_by_range_requests.get(id) else { + return Err( + "retrying custody request for range request that does not exist".to_string(), + ); + }; + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, @@ -642,7 +691,11 @@ impl SyncNetworkContext { } if let Some(blocks_result) = entry.get().responses(&self.chain.spec) { - entry.remove(); + if blocks_result.is_ok() { + // remove the entry only if it coupled successfully with + // no errors + entry.remove(); + } // If the request is finished, dequeue everything Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError)) } else { @@ -1075,10 +1128,12 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRangeRequest, parent_request_id: ComponentsByRangeRequestId, - ) -> Result { + ) -> Result<(DataColumnsByRangeRequestId, Vec), RpcRequestSendError> { + let requested_columns = request.columns.clone(); let id = DataColumnsByRangeRequestId { id: self.next_id(), parent_request_id, + peer: peer_id, }; self.send_network_msg(NetworkMessage::SendRequest { @@ -1106,7 +1161,7 @@ impl SyncNetworkContext { false, DataColumnsByRangeRequestItems::new(request), ); - Ok(id) + Ok((id, requested_columns)) } pub fn is_execution_engine_online(&self) -> bool { @@ -1340,6 +1395,7 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>>> { + debug!(%peer_id, ?id, "Received blocks by range response"); let resp = self.blocks_by_range_requests.on_response(id, rpc_event); self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) } @@ -1362,6 +1418,7 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { + debug!(%peer_id, ?id, "Received data columns by range response"); let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index cc49c437112..b617b9f34c4 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -2,6 +2,7 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use super::RangeSyncType; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; @@ -833,6 +834,18 @@ impl SyncingChain { ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { + if let RpcResponseError::BlockComponentCouplingError(CouplingError { + column_and_peer, + msg, + peer_action, + }) = err + { + if let Some(columns_and_peers) = column_and_peer { + for (column, peer) in column_and_peers { + + } + } + } // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer From 0bf46bd2d25398adc99bbdaab7aab012b86edc3e Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 3 Jul 2025 19:25:53 -0700 Subject: [PATCH 02/11] compiles --- .../src/sync/block_sidecar_coupling.rs | 116 ++++++++++-------- .../network/src/sync/network_context.rs | 69 ++++++++--- .../network/src/sync/range_sync/chain.rs | 71 +++++++++-- 3 files changed, 179 insertions(+), 77 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index c0967bfb355..706b8b894e8 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -42,8 +42,7 @@ enum RangeBlockDataRequest { #[derive(Debug)] pub struct CouplingError { pub(crate) msg: String, - pub(crate) peer_action: Option, - pub(crate) column_and_peer: Option>, + pub(crate) column_and_peer: Option<(Vec<(u64, PeerId)>, PeerAction)>, } impl RangeBlockComponentsRequest { @@ -80,6 +79,25 @@ impl RangeBlockComponentsRequest { } } + pub fn reinsert_failed_column_requests( + &mut self, + failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::DataColumns { + requests, + expected_custody_columns: _, + } => { + for (request, columns) in failed_column_requests.into_iter() { + requests.insert(request, ByRangeRequest::Active(request)); + self.column_peers.insert(request, columns); + } + Ok(()) + } + _ => Err("not a column request".to_string()), + } + } + pub fn add_blocks( &mut self, req_id: BlocksByRangeRequestId, @@ -125,34 +143,38 @@ impl RangeBlockComponentsRequest { } } - pub fn remove_custody_columns( - &self, - req_id: DataColumnsByRangeRequestId, - ) -> Result<(), String> { - match &mut self.block_data_request { - RangeBlockDataRequest::NoData => { - Err("received data columns but expected no data".to_owned()) - } - RangeBlockDataRequest::Blobs(_) => { - Err("received data columns but expected blobs".to_owned()) - } - RangeBlockDataRequest::DataColumns { - ref mut requests, .. - } => { - let req = requests - .get_mut(&req_id) - .ok_or(format!("unknown data columns by range req_id {req_id}"))?; - req.remove_invalid_entries(id); - } - } - } - - pub fn responses(&self, spec: &ChainSpec) -> Option>, CouplingError>> { + // pub fn remove_custody_columns( + // &mut self, + // req_id: DataColumnsByRangeRequestId, + // ) -> Result<(), String> { + // match &mut self.block_data_request { + // RangeBlockDataRequest::NoData => { + // Err("received data columns but expected no data".to_owned()) + // } + // RangeBlockDataRequest::Blobs(_) => { + // Err("received data columns but expected blobs".to_owned()) + // } + // RangeBlockDataRequest::DataColumns { + // ref mut requests, .. + // } => { + // let req = requests + // .get_mut(&req_id) + // .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + // req.remove_invalid_entries(req_id); + // Ok(()) + // } + // } + // } + + pub fn responses( + &mut self, + spec: &ChainSpec, + ) -> Option>, CouplingError>> { let Some(blocks) = self.blocks_request.to_finished() else { return None; }; - match &self.block_data_request { + match &mut self.block_data_request { RangeBlockDataRequest::NoData => { Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) } @@ -195,13 +217,13 @@ impl RangeBlockComponentsRequest { spec, ); - if let Err(err) = resp { - if let Some(peers) = err.column_and_peer { - for (column, peer) in peers { + if let Err(err) = &resp { + if let Some((peers, _)) = &err.column_and_peer { + for (_, peer) in peers.iter() { // find the req id associated with the peer and // un-finish it - for (req_id, req) in requests.iter() { - if req_id.peer == peer { + for (req_id, req) in requests.iter_mut() { + if req_id.peer == *peer { req.remove_invalid_entries(*req_id); } } @@ -236,7 +258,6 @@ impl RangeBlockComponentsRequest { blob_list.push(blob_iter.next().ok_or_else(|| CouplingError { msg: "Missing next blob".to_string(), column_and_peer: None, - peer_action: None, })?); } @@ -247,14 +268,12 @@ impl RangeBlockComponentsRequest { return Err(CouplingError { msg: "Invalid blob index".to_string(), column_and_peer: None, - peer_action: None, }); }; if blob_opt.is_some() { return Err(CouplingError { msg: "Repeat blob index".to_string(), column_and_peer: None, - peer_action: None, }); } else { *blob_opt = Some(blob); @@ -267,13 +286,11 @@ impl RangeBlockComponentsRequest { .map_err(|_| CouplingError { msg: "Blobs returned exceeds max length".to_string(), column_and_peer: None, - peer_action: None, })?; responses.push( RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError { msg: format!("{e:?}"), column_and_peer: None, - peer_action: None, })?, ) } @@ -283,7 +300,6 @@ impl RangeBlockComponentsRequest { return Err(CouplingError { msg: "Received sidecars that don't pair well".to_string(), column_and_peer: None, - peer_action: None, }); } @@ -313,7 +329,6 @@ impl RangeBlockComponentsRequest { return Err(CouplingError { msg: format!("Repeated column block_root {block_root:?} index {index}"), column_and_peer: None, - peer_action: None, }); } } @@ -337,11 +352,10 @@ impl RangeBlockComponentsRequest { // This is potentially dangerous as we can get isolated on a chain with a // malicious block peer. // TODO: fix this by checking the proposer signature before downloading columns. - let responsible_peers = column_peers.values().cloned().collect(); + let responsible_peers = column_peers.iter().map(|c| (*c.0, *c.1)).collect(); return Err(CouplingError { msg: format!("No columns for block {block_root:?} with data"), - peer_action: Some(PeerAction::LowToleranceError), - column_and_peer: Some(responsible_peers), + column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)), }); }; @@ -357,19 +371,18 @@ impl RangeBlockComponentsRequest { // Penalize the peer for claiming to have the columns but not returning // them let Some(responsible_peer) = column_peers.get(index) else { - return Err(( - None, - format!("Internal error, no request made for column {}", index), - )); + return Err(CouplingError { + msg: format!("Internal error, no request made for column {}", index), + column_and_peer: None, + }); }; - naughty_peers.push((*responsible_peer, *index)); + naughty_peers.push((*index, *responsible_peer)); } } if !naughty_peers.is_empty() { return Err(CouplingError { msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), - peer_action: Some(PeerAction::LowToleranceError), - column_and_peer: Some(naughty_peers), + column_and_peer: Some((naughty_peers, PeerAction::LowToleranceError)), }); } @@ -386,10 +399,9 @@ impl RangeBlockComponentsRequest { RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) .map_err(|e| CouplingError { - msg: format!("{}", e), - column_and_peer: None, - peer_action: None, - })? + msg: format!("{:?}", e), + column_and_peer: None, + })? } else { // Block has no data, expects zero columns RpcBlock::new_without_blobs(Some(block_root), block) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 6164ee0fdff..8f09ced6c0b 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -444,20 +444,38 @@ impl SyncNetworkContext { pub fn retry_columns_by_range( &mut self, - requester: RangeRequestId, request_id: Id, peers: &HashSet, peers_to_deprioritize: &HashSet, - failed_columns: HashSet, + request: BlocksByRangeRequest, + failed_columns: &HashSet, ) -> Result<(), String> { + let Some(requester) = self.components_by_range_requests.keys().find_map(|r| { + if r.id == request_id { + Some(r.requester) + } else { + None + } + }) else { + return Err("request id not present".to_string()); + }; + let active_request_count_by_peer = self.active_request_count_by_peer(); // Attempt to find all required custody peers to request the failed columns from - let columns_by_range_peers_to_request = self.select_columns_by_range_peers_to_request( - &failed_columns, - peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?; + + debug!( + ?failed_columns, + "Retrying only failed column requests from other peers" + ); + + let columns_by_range_peers_to_request = self + .select_columns_by_range_peers_to_request( + &failed_columns, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + ) + .map_err(|e| format!("{:?}", e))?; // Reuse the id for the request that received partially correct responses let id = ComponentsByRangeRequestId { @@ -478,16 +496,20 @@ impl SyncNetworkContext { id, ) }) - .collect::, _>>()?; + .collect::, _>>() + .map_err(|e| format!("{:?}", e))?; // instead of creating a new `RangeBlockComponentsRequest`, we reinsert // the new requests created for the failed requests - let Some(range_request) = self.components_by_range_requests.get(id) else { + let Some(range_request) = self.components_by_range_requests.get_mut(&id) else { return Err( "retrying custody request for range request that does not exist".to_string(), ); }; + + range_request.reinsert_failed_column_requests(data_column_requests)?; + Ok(()) } /// A blocks by range request sent by the range sync algorithm @@ -668,20 +690,31 @@ impl SyncNetworkContext { let request = entry.get_mut(); match range_block_component { RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { - request - .add_blocks(req_id, blocks) - .map_err(RpcResponseError::BlockComponentCouplingError) + request.add_blocks(req_id, blocks).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }), RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { - request - .add_blobs(req_id, blobs) - .map_err(RpcResponseError::BlockComponentCouplingError) + request.add_blobs(req_id, blobs).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }), RangeBlockComponent::CustodyColumns(req_id, resp) => { resp.and_then(|(custody_columns, _)| { request .add_custody_columns(req_id, custody_columns) - .map_err(RpcResponseError::BlockComponentCouplingError) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }) } } @@ -690,7 +723,7 @@ impl SyncNetworkContext { return Some(Err(e)); } - if let Some(blocks_result) = entry.get().responses(&self.chain.spec) { + if let Some(blocks_result) = entry.get_mut().responses(&self.chain.spec) { if blocks_result.is_ok() { // remove the entry only if it coupled successfully with // no errors diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index b617b9f34c4..967ac4a7a12 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -13,7 +13,7 @@ use logging::crit; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use strum::IntoStaticStr; use tracing::{debug, instrument, warn}; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -836,13 +836,28 @@ impl SyncingChain { if let Some(batch) = self.batches.get_mut(&batch_id) { if let RpcResponseError::BlockComponentCouplingError(CouplingError { column_and_peer, - msg, - peer_action, - }) = err + msg: _, + }) = &err { - if let Some(columns_and_peers) = column_and_peer { - for (column, peer) in column_and_peers { - + if let Some((column_and_peer, action)) = column_and_peer { + let mut failed_columns = HashSet::new(); + for (column, peer) in column_and_peer { + network.report_peer(*peer, *action, "failed to return columns"); + failed_columns.insert(*column); + if let BatchOperationOutcome::Failed { blacklist } = + batch.download_failed(Some(*peer))? + { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }); + } + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + ); } } } @@ -971,6 +986,48 @@ impl SyncingChain { Ok(KeepChain) } + #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] + pub fn retry_partial_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + id: Id, + failed_columns: HashSet, + ) -> ProcessingResult { + if let Some(batch) = self.batches.get_mut(&batch_id) { + let failed_peers = batch.failed_peers(); + let req = batch.to_blocks_by_range_request().0; + + let synced_peers = network + .network_globals() + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + + match network.retry_columns_by_range( + id, + &synced_peers, + &failed_peers, + req, + &failed_columns, + ) { + Ok(_) => { + debug!( + ?batch_id, + id, "Retried column requests from different peers" + ); + return Ok(KeepChain); + } + Err(e) => { + debug!(?batch_id, id, e, "Failed to retry partial batch"); + } + } + } + Ok(KeepChain) + } + /// Returns true if this chain is currently syncing. #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn is_syncing(&self) -> bool { From b9bdc5fe8684c30747aa0d54ef97951d32df13a9 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 4 Jul 2025 01:04:44 -0700 Subject: [PATCH 03/11] Fix some issues --- .../src/sync/block_sidecar_coupling.rs | 20 +++++-------------- .../network/src/sync/network_context.rs | 6 ++++-- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 706b8b894e8..75f9f592735 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -8,11 +8,11 @@ use lighthouse_network::{ PeerAction, PeerId, }; use std::{collections::HashMap, sync::Arc}; +use tracing::debug; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; - pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, @@ -194,8 +194,9 @@ impl RangeBlockComponentsRequest { } => { let mut data_columns = vec![]; let mut column_peers: HashMap = HashMap::new(); - for req in requests.values() { + for (req_id, req) in requests.iter() { let Some(data) = req.to_finished() else { + debug!(?req_id, "Req is not finished"); return None; }; data_columns.extend(data.clone()) @@ -221,12 +222,8 @@ impl RangeBlockComponentsRequest { if let Some((peers, _)) = &err.column_and_peer { for (_, peer) in peers.iter() { // find the req id associated with the peer and - // un-finish it - for (req_id, req) in requests.iter_mut() { - if req_id.peer == *peer { - req.remove_invalid_entries(*req_id); - } - } + // delete it from the entries + requests.retain(|&k, _| k.peer != *peer); } } } @@ -434,13 +431,6 @@ impl ByRangeRequest { } } - fn remove_invalid_entries(&mut self, id: I) { - match self { - Self::Complete(_) => *self = Self::Active(id), - Self::Active(_) => {} - } - } - fn to_finished(&self) -> Option<&T> { match self { Self::Active(_) => None, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 8f09ced6c0b..abfa2d60041 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -719,6 +719,7 @@ impl SyncNetworkContext { } } } { + debug!("Error while inserting range component response"); entry.remove(); return Some(Err(e)); } @@ -727,6 +728,7 @@ impl SyncNetworkContext { if blocks_result.is_ok() { // remove the entry only if it coupled successfully with // no errors + debug!("Removing entry"); entry.remove(); } // If the request is finished, dequeue everything @@ -1428,7 +1430,7 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>>> { - debug!(%peer_id, ?id, "Received blocks by range response"); + // debug!(%peer_id, ?id, "Received blocks by range response"); let resp = self.blocks_by_range_requests.on_response(id, rpc_event); self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) } @@ -1451,7 +1453,7 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { - debug!(%peer_id, ?id, "Received data columns by range response"); + // debug!(%peer_id, ?id, "Received data columns by range response"); let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); From eab726cc5201cbe507fb122312efe2c04e67c998 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 4 Jul 2025 15:19:21 -0700 Subject: [PATCH 04/11] Request batches from peers that have the earliest available slot --- .../src/peer_manager/peerdb.rs | 28 +++++++++++++++++++ .../src/peer_manager/peerdb/sync_status.rs | 13 +++++++++ .../network/src/sync/range_sync/chain.rs | 2 +- 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index b28807c47e8..0598f96bf38 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -248,6 +248,34 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns all the synced peers from the list of allowed peers that claim to have the block + /// components for the given epoch based on `status.earliest_available_slot`. + /// + /// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the + /// required data. + pub fn synced_peers_for_epoch<'a>( + &'a self, + epoch: Epoch, + allowed_peers: &'a HashSet, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(peer_id, info)| { + allowed_peers.contains(peer_id) + && info.is_connected() + && match info.sync_status() { + SyncStatus::Synced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + SyncStatus::Advanced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + _ => false, + } + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives the `peer_id` of all known connected and advanced peers. pub fn advanced_peers(&self) -> impl Iterator { self.peers diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs index 5a4fc339940..aeda375a0d0 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs @@ -28,6 +28,19 @@ pub struct SyncInfo { pub earliest_available_slot: Option, } +impl SyncInfo { + /// Returns true if the provided slot is greater than `earliest_available_slot`. + /// + /// If `earliest_available_slot` does is None, then we just assume that the peer has the slot. + pub fn has_slot(&self, slot: Slot) -> bool { + if let Some(earliest_available_slot) = self.earliest_available_slot { + slot >= earliest_available_slot + } else { + true + } + } +} + impl std::cmp::PartialEq for SyncStatus { fn eq(&self, other: &Self) -> bool { matches!( diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 967ac4a7a12..e495d5a10b4 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -926,7 +926,7 @@ impl SyncingChain { .network_globals() .peers .read() - .synced_peers() + .synced_peers_for_epoch(batch_id, &self.peers) .cloned() .collect::>(); From 4f28e0ef2fb4682ef18d929adab1909640ef65c2 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 8 Jul 2025 18:59:12 -0700 Subject: [PATCH 05/11] Fix issues --- .../src/peer_manager/peerdb.rs | 19 ++++++++ .../src/sync/block_sidecar_coupling.rs | 23 --------- .../network/src/sync/range_sync/batch.rs | 16 +++++++ .../network/src/sync/range_sync/chain.rs | 48 ++++++++++++------- .../src/sync/range_sync/chain_collection.rs | 18 +++++-- .../network/src/sync/range_sync/range.rs | 18 ++++--- 6 files changed, 88 insertions(+), 54 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 0598f96bf38..5c82688bf2e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -319,6 +319,25 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns an iterator of all good gossipsub peers that are supposed to be custodying + /// the given subnet id. + pub fn good_range_sync_custody_subnet_peer<'a>( + &'a self, + subnet: DataColumnSubnetId, + allowed_peers: &'a HashSet, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(peer_id, info)| { + // The custody_subnets hashset can be populated via enr or metadata + let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); + allowed_peers.contains(peer_id) + && info.is_connected() + && is_custody_subnet_peer + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives the ids of all known disconnected peers. pub fn disconnected_peers(&self) -> impl Iterator { self.peers diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 75f9f592735..a10bded7ea4 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -143,29 +143,6 @@ impl RangeBlockComponentsRequest { } } - // pub fn remove_custody_columns( - // &mut self, - // req_id: DataColumnsByRangeRequestId, - // ) -> Result<(), String> { - // match &mut self.block_data_request { - // RangeBlockDataRequest::NoData => { - // Err("received data columns but expected no data".to_owned()) - // } - // RangeBlockDataRequest::Blobs(_) => { - // Err("received data columns but expected blobs".to_owned()) - // } - // RangeBlockDataRequest::DataColumns { - // ref mut requests, .. - // } => { - // let req = requests - // .get_mut(&req_id) - // .ok_or(format!("unknown data columns by range req_id {req_id}"))?; - // req.remove_invalid_entries(req_id); - // Ok(()) - // } - // } - // } - pub fn responses( &mut self, spec: &ChainSpec, diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 264f83ee820..52a38c40376 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -8,6 +8,7 @@ use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::time::{Duration, Instant}; use strum::Display; +use tracing::debug; use types::{Epoch, EthSpec, Slot}; /// The number of times to retry a batch before it is considered failed. @@ -89,6 +90,7 @@ pub enum BatchOperationOutcome { Failed { blacklist: bool }, } +#[derive(Debug)] pub enum BatchProcessingResult { Success, FaultyFailure, @@ -205,7 +207,9 @@ impl BatchInfo { } /// Verifies if an incoming block belongs to this batch. + #[tracing::instrument(skip(self), fields(batch = %self))] pub fn is_expecting_block(&self, request_id: &Id) -> bool { + debug!(?request_id, "Batch is expecting block?"); if let BatchState::Downloading(expected_id) = &self.state { return expected_id == request_id; } @@ -272,11 +276,13 @@ impl BatchInfo { /// Marks the batch as ready to be processed if the blocks are in the range. The number of /// received blocks is returned, or the wrong batch end on failure #[must_use = "Batch may have failed"] + #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn download_completed( &mut self, blocks: Vec>, peer: PeerId, ) -> Result { + debug!(?peer, "Download completed with peer_id"); match self.state.poison() { BatchState::Downloading(_) => { let received = blocks.len(); @@ -300,10 +306,12 @@ impl BatchInfo { /// The `peer` parameter, when set to None, does not increment the failed attempts of /// this batch and register the peer, rather attempts a re-download. #[must_use = "Batch may have failed"] + #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn download_failed( &mut self, peer: Option, ) -> Result { + debug!(?peer, "Download failed with peer_id"); match self.state.poison() { BatchState::Downloading(_) => { // register the attempt and check if the batch can be tried again @@ -330,7 +338,9 @@ impl BatchInfo { } } + #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { + debug!(?request_id, "Download started with req_id"); match self.state.poison() { BatchState::AwaitingDownload => { self.state = BatchState::Downloading(request_id); @@ -347,7 +357,9 @@ impl BatchInfo { } } + #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { + debug!("Processing started "); match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks, start_instant) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); @@ -365,10 +377,12 @@ impl BatchInfo { } #[must_use = "Batch may have failed"] + #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn processing_completed( &mut self, procesing_result: BatchProcessingResult, ) -> Result { + debug!(?procesing_result, "Processing completed with status"); match self.state.poison() { BatchState::Processing(attempt) => { self.state = match procesing_result { @@ -406,7 +420,9 @@ impl BatchInfo { } #[must_use = "Batch may have failed"] + #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn validation_failed(&mut self) -> Result { + debug!("Validation failed"); match self.state.poison() { BatchState::AwaitingValidation(attempt) => { self.failed_processing_attempts.push(attempt); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index e495d5a10b4..8b0f7a4b672 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -8,7 +8,7 @@ use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, Ba use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use lighthouse_network::service::api_types::Id; -use lighthouse_network::{PeerAction, PeerId}; +use lighthouse_network::{PeerAction, PeerId, SyncInfo}; use logging::crit; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use strum::IntoStaticStr; @@ -83,6 +83,10 @@ pub struct SyncingChain { /// The target head root. pub target_head_root: Hash256, + pub target_finalized_root: Hash256, + + pub target_finalized_epoch: Epoch, + /// Sorted map of batches undergoing some kind of processing. batches: BTreeMap>, @@ -128,6 +132,8 @@ impl SyncingChain { start_epoch: Epoch, target_head_slot: Slot, target_head_root: Hash256, + target_finalized_root: Hash256, + target_finalized_epoch: Epoch, peer_id: PeerId, chain_type: SyncingChainType, ) -> Self { @@ -137,6 +143,8 @@ impl SyncingChain { start_epoch, target_head_slot, target_head_root, + target_finalized_epoch, + target_finalized_root, batches: BTreeMap::new(), peers: HashSet::from_iter([peer_id]), to_be_downloaded: start_epoch, @@ -149,8 +157,15 @@ impl SyncingChain { } /// Returns true if this chain has the same target - pub fn has_same_target(&self, target_head_slot: Slot, target_head_root: Hash256) -> bool { - self.target_head_slot == target_head_slot && self.target_head_root == target_head_root + pub fn has_same_target(&self, remote_info: &SyncInfo) -> bool { + self.target_finalized_root == remote_info.finalized_root + && self.target_finalized_epoch == remote_info.finalized_epoch + && (self.target_head_root == remote_info.head_root + || self + .target_head_slot + .as_u64() + .abs_diff(remote_info.head_slot.as_u64()) + <= T::EthSpec::slots_per_epoch() * 2) } /// Check if the chain has peers from which to process batches. @@ -235,9 +250,9 @@ impl SyncingChain { // request_id matches // TODO(das): removed peer_id matching as the node may request a different peer for data // columns. - if !batch.is_expecting_block(&request_id) { - return Ok(KeepChain); - } + // if !batch.is_expecting_block(&request_id) { + // return Ok(KeepChain); + // } batch } }; @@ -265,6 +280,7 @@ impl SyncingChain { network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { + debug!(?batch_id, "Attempting to process batch"); // Only process batches if this chain is Syncing, and only one at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { return Ok(KeepChain); @@ -844,14 +860,7 @@ impl SyncingChain { for (column, peer) in column_and_peer { network.report_peer(*peer, *action, "failed to return columns"); failed_columns.insert(*column); - if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(Some(*peer))? - { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: batch_id, - }); - } + return self.retry_partial_batch( network, batch_id, @@ -930,6 +939,11 @@ impl SyncingChain { .cloned() .collect::>(); + debug!( + ?self.peers, + "All synced peers", + ); + match network.block_components_by_range_request( batch_type, request, @@ -1109,9 +1123,11 @@ impl SyncingChain { .network_globals() .peers .read() - .good_custody_subnet_peer(*subnet_id) + .good_range_sync_custody_subnet_peer(*subnet_id, &self.peers) .count(); - + if peer_count == 0 { + debug!(peer_count, ?subnet_id, ?self.peers, "Peer count"); + } peer_count > 0 }); peers_on_all_custody_subnets diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 9f500c61e0b..9a3ffef2679 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -330,8 +330,7 @@ impl ChainCollection { debug!("including head peer"); self.add_peer_or_create_chain( local_epoch, - peer_sync_info.head_root, - peer_sync_info.head_slot, + peer_sync_info, peer_id, RangeSyncType::Head, network, @@ -456,8 +455,7 @@ impl ChainCollection { pub fn add_peer_or_create_chain( &mut self, start_epoch: Epoch, - target_head_root: Hash256, - target_head_slot: Slot, + remote_info: SyncInfo, peer: PeerId, sync_type: RangeSyncType, network: &mut SyncNetworkContext, @@ -468,9 +466,15 @@ impl ChainCollection { &mut self.head_chains }; + let target_head_root = remote_info.head_root; + let target_head_slot = remote_info.head_slot; + let target_finalized_root = remote_info.finalized_root; + let target_finalized_epoch = remote_info.finalized_epoch; + + debug!(len=collection.len(), "Collection length"); match collection .iter_mut() - .find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root)) + .find(|(_, chain)| chain.has_same_target(&remote_info)) { Some((&id, chain)) => { debug!(peer_id = %peer, ?sync_type, id, "Adding peer to known chain"); @@ -495,6 +499,8 @@ impl ChainCollection { start_epoch, target_head_slot, target_head_root, + target_finalized_root, + target_finalized_epoch, peer, sync_type.into(), ); @@ -506,6 +512,8 @@ impl ChainCollection { %start_epoch, %target_head_slot, ?target_head_root, + ?target_finalized_root, + ?target_finalized_epoch, "New chain added to sync" ); collection.insert(id, new_chain); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index f34816d1de2..60e4d299af0 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -155,20 +155,19 @@ where debug!(%peer_id, "Finalization sync peer joined"); self.awaiting_head_peers.remove(&peer_id); - // Because of our change in finalized sync batch size from 2 to 1 and our transition - // to using exact epoch boundaries for batches (rather than one slot past the epoch - // boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's - // finalized slot in order to finalize the chain locally. - let target_head_slot = - remote_finalized_slot + (2 * T::EthSpec::slots_per_epoch()) + 1; + // // Because of our change in finalized sync batch size from 2 to 1 and our transition + // // to using exact epoch boundaries for batches (rather than one slot past the epoch + // // boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's + // // finalized slot in order to finalize the chain locally. + // let target_head_slot = + // remote_finalized_slot + (2 * T::EthSpec::slots_per_epoch()) + 1; // Note: We keep current head chains. These can continue syncing whilst we complete // this new finalized chain. self.chains.add_peer_or_create_chain( local_info.finalized_epoch, - remote_info.finalized_root, - target_head_slot, + remote_info, peer_id, RangeSyncType::Finalized, network, @@ -199,8 +198,7 @@ where .epoch(T::EthSpec::slots_per_epoch()); self.chains.add_peer_or_create_chain( start_epoch, - remote_info.head_root, - remote_info.head_slot, + remote_info, peer_id, RangeSyncType::Head, network, From 743055cfb1e004759454c9ff5e1f580f38656877 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 9 Jul 2025 12:16:12 -0700 Subject: [PATCH 06/11] Remove unsafe optimisations --- .../network/src/sync/range_sync/chain.rs | 27 +++++-------------- .../src/sync/range_sync/chain_collection.rs | 18 ++++--------- .../network/src/sync/range_sync/range.rs | 10 ++++--- 3 files changed, 17 insertions(+), 38 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 8b0f7a4b672..35928385a23 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -8,7 +8,7 @@ use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, Ba use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use lighthouse_network::service::api_types::Id; -use lighthouse_network::{PeerAction, PeerId, SyncInfo}; +use lighthouse_network::{PeerAction, PeerId}; use logging::crit; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use strum::IntoStaticStr; @@ -83,10 +83,6 @@ pub struct SyncingChain { /// The target head root. pub target_head_root: Hash256, - pub target_finalized_root: Hash256, - - pub target_finalized_epoch: Epoch, - /// Sorted map of batches undergoing some kind of processing. batches: BTreeMap>, @@ -132,8 +128,6 @@ impl SyncingChain { start_epoch: Epoch, target_head_slot: Slot, target_head_root: Hash256, - target_finalized_root: Hash256, - target_finalized_epoch: Epoch, peer_id: PeerId, chain_type: SyncingChainType, ) -> Self { @@ -143,8 +137,6 @@ impl SyncingChain { start_epoch, target_head_slot, target_head_root, - target_finalized_epoch, - target_finalized_root, batches: BTreeMap::new(), peers: HashSet::from_iter([peer_id]), to_be_downloaded: start_epoch, @@ -157,15 +149,8 @@ impl SyncingChain { } /// Returns true if this chain has the same target - pub fn has_same_target(&self, remote_info: &SyncInfo) -> bool { - self.target_finalized_root == remote_info.finalized_root - && self.target_finalized_epoch == remote_info.finalized_epoch - && (self.target_head_root == remote_info.head_root - || self - .target_head_slot - .as_u64() - .abs_diff(remote_info.head_slot.as_u64()) - <= T::EthSpec::slots_per_epoch() * 2) + pub fn has_same_target(&self, target_head_slot: Slot, target_head_root: Hash256) -> bool { + self.target_head_slot == target_head_slot && self.target_head_root == target_head_root } /// Check if the chain has peers from which to process batches. @@ -250,9 +235,9 @@ impl SyncingChain { // request_id matches // TODO(das): removed peer_id matching as the node may request a different peer for data // columns. - // if !batch.is_expecting_block(&request_id) { - // return Ok(KeepChain); - // } + if !batch.is_expecting_block(&request_id) { + return Ok(KeepChain); + } batch } }; diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 9a3ffef2679..9f500c61e0b 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -330,7 +330,8 @@ impl ChainCollection { debug!("including head peer"); self.add_peer_or_create_chain( local_epoch, - peer_sync_info, + peer_sync_info.head_root, + peer_sync_info.head_slot, peer_id, RangeSyncType::Head, network, @@ -455,7 +456,8 @@ impl ChainCollection { pub fn add_peer_or_create_chain( &mut self, start_epoch: Epoch, - remote_info: SyncInfo, + target_head_root: Hash256, + target_head_slot: Slot, peer: PeerId, sync_type: RangeSyncType, network: &mut SyncNetworkContext, @@ -466,15 +468,9 @@ impl ChainCollection { &mut self.head_chains }; - let target_head_root = remote_info.head_root; - let target_head_slot = remote_info.head_slot; - let target_finalized_root = remote_info.finalized_root; - let target_finalized_epoch = remote_info.finalized_epoch; - - debug!(len=collection.len(), "Collection length"); match collection .iter_mut() - .find(|(_, chain)| chain.has_same_target(&remote_info)) + .find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root)) { Some((&id, chain)) => { debug!(peer_id = %peer, ?sync_type, id, "Adding peer to known chain"); @@ -499,8 +495,6 @@ impl ChainCollection { start_epoch, target_head_slot, target_head_root, - target_finalized_root, - target_finalized_epoch, peer, sync_type.into(), ); @@ -512,8 +506,6 @@ impl ChainCollection { %start_epoch, %target_head_slot, ?target_head_root, - ?target_finalized_root, - ?target_finalized_epoch, "New chain added to sync" ); collection.insert(id, new_chain); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 60e4d299af0..102129ccab4 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -159,15 +159,16 @@ where // // to using exact epoch boundaries for batches (rather than one slot past the epoch // // boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's // // finalized slot in order to finalize the chain locally. - // let target_head_slot = - // remote_finalized_slot + (2 * T::EthSpec::slots_per_epoch()) + 1; + let target_head_slot = + remote_finalized_slot + (2 * T::EthSpec::slots_per_epoch()) + 1; // Note: We keep current head chains. These can continue syncing whilst we complete // this new finalized chain. self.chains.add_peer_or_create_chain( local_info.finalized_epoch, - remote_info, + remote_info.finalized_root, + target_head_slot, peer_id, RangeSyncType::Finalized, network, @@ -198,7 +199,8 @@ where .epoch(T::EthSpec::slots_per_epoch()); self.chains.add_peer_or_create_chain( start_epoch, - remote_info, + remote_info.head_root, + remote_info.head_slot, peer_id, RangeSyncType::Head, network, From ab069a601b4cf4877d6db91e236b504e114bac08 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 9 Jul 2025 13:04:37 -0700 Subject: [PATCH 07/11] Remove debugging logs --- .../network/src/sync/block_sidecar_coupling.rs | 4 +--- beacon_node/network/src/sync/network_context.rs | 4 ---- beacon_node/network/src/sync/range_sync/batch.rs | 15 --------------- beacon_node/network/src/sync/range_sync/chain.rs | 6 ------ 4 files changed, 1 insertion(+), 28 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index a10bded7ea4..08acd6a20c9 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -8,7 +8,6 @@ use lighthouse_network::{ PeerAction, PeerId, }; use std::{collections::HashMap, sync::Arc}; -use tracing::debug; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, @@ -171,9 +170,8 @@ impl RangeBlockComponentsRequest { } => { let mut data_columns = vec![]; let mut column_peers: HashMap = HashMap::new(); - for (req_id, req) in requests.iter() { + for req in requests.values() { let Some(data) = req.to_finished() else { - debug!(?req_id, "Req is not finished"); return None; }; data_columns.extend(data.clone()) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index abfa2d60041..ba345ed7452 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -719,7 +719,6 @@ impl SyncNetworkContext { } } } { - debug!("Error while inserting range component response"); entry.remove(); return Some(Err(e)); } @@ -728,7 +727,6 @@ impl SyncNetworkContext { if blocks_result.is_ok() { // remove the entry only if it coupled successfully with // no errors - debug!("Removing entry"); entry.remove(); } // If the request is finished, dequeue everything @@ -1430,7 +1428,6 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>>> { - // debug!(%peer_id, ?id, "Received blocks by range response"); let resp = self.blocks_by_range_requests.on_response(id, rpc_event); self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) } @@ -1453,7 +1450,6 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { - // debug!(%peer_id, ?id, "Received data columns by range response"); let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 52a38c40376..78478b98007 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -8,7 +8,6 @@ use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::time::{Duration, Instant}; use strum::Display; -use tracing::debug; use types::{Epoch, EthSpec, Slot}; /// The number of times to retry a batch before it is considered failed. @@ -207,9 +206,7 @@ impl BatchInfo { } /// Verifies if an incoming block belongs to this batch. - #[tracing::instrument(skip(self), fields(batch = %self))] pub fn is_expecting_block(&self, request_id: &Id) -> bool { - debug!(?request_id, "Batch is expecting block?"); if let BatchState::Downloading(expected_id) = &self.state { return expected_id == request_id; } @@ -276,13 +273,11 @@ impl BatchInfo { /// Marks the batch as ready to be processed if the blocks are in the range. The number of /// received blocks is returned, or the wrong batch end on failure #[must_use = "Batch may have failed"] - #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn download_completed( &mut self, blocks: Vec>, peer: PeerId, ) -> Result { - debug!(?peer, "Download completed with peer_id"); match self.state.poison() { BatchState::Downloading(_) => { let received = blocks.len(); @@ -306,12 +301,10 @@ impl BatchInfo { /// The `peer` parameter, when set to None, does not increment the failed attempts of /// this batch and register the peer, rather attempts a re-download. #[must_use = "Batch may have failed"] - #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn download_failed( &mut self, peer: Option, ) -> Result { - debug!(?peer, "Download failed with peer_id"); match self.state.poison() { BatchState::Downloading(_) => { // register the attempt and check if the batch can be tried again @@ -338,9 +331,7 @@ impl BatchInfo { } } - #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { - debug!(?request_id, "Download started with req_id"); match self.state.poison() { BatchState::AwaitingDownload => { self.state = BatchState::Downloading(request_id); @@ -357,9 +348,7 @@ impl BatchInfo { } } - #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { - debug!("Processing started "); match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks, start_instant) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); @@ -376,13 +365,11 @@ impl BatchInfo { } } - #[must_use = "Batch may have failed"] #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn processing_completed( &mut self, procesing_result: BatchProcessingResult, ) -> Result { - debug!(?procesing_result, "Processing completed with status"); match self.state.poison() { BatchState::Processing(attempt) => { self.state = match procesing_result { @@ -420,9 +407,7 @@ impl BatchInfo { } #[must_use = "Batch may have failed"] - #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn validation_failed(&mut self) -> Result { - debug!("Validation failed"); match self.state.poison() { BatchState::AwaitingValidation(attempt) => { self.failed_processing_attempts.push(attempt); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 35928385a23..c6579235ada 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -265,7 +265,6 @@ impl SyncingChain { network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { - debug!(?batch_id, "Attempting to process batch"); // Only process batches if this chain is Syncing, and only one at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { return Ok(KeepChain); @@ -924,11 +923,6 @@ impl SyncingChain { .cloned() .collect::>(); - debug!( - ?self.peers, - "All synced peers", - ); - match network.block_components_by_range_request( batch_type, request, From f28a3b32932c990d3e69c426486cb44341882495 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 9 Jul 2025 13:08:03 -0700 Subject: [PATCH 08/11] Add a log --- beacon_node/network/src/sync/range_sync/chain.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index c6579235ada..80533703e64 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -836,9 +836,10 @@ impl SyncingChain { if let Some(batch) = self.batches.get_mut(&batch_id) { if let RpcResponseError::BlockComponentCouplingError(CouplingError { column_and_peer, - msg: _, + msg, }) = &err { + debug!(msg, "Block components coupling error"); if let Some((column_and_peer, action)) = column_and_peer { let mut failed_columns = HashSet::new(); for (column, peer) in column_and_peer { @@ -1104,9 +1105,6 @@ impl SyncingChain { .read() .good_range_sync_custody_subnet_peer(*subnet_id, &self.peers) .count(); - if peer_count == 0 { - debug!(peer_count, ?subnet_id, ?self.peers, "Peer count"); - } peer_count > 0 }); peers_on_all_custody_subnets From 07a979a9beb5cd393b4cb2d5ce7a88783ea93dfa Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 9 Jul 2025 18:54:05 -0700 Subject: [PATCH 09/11] Cleanup --- .../src/peer_manager/peerdb.rs | 6 ++-- .../src/sync/block_sidecar_coupling.rs | 35 +++++++++---------- .../network/src/sync/range_sync/range.rs | 8 ++--- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 5c82688bf2e..813f95b7bd2 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -319,8 +319,8 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } - /// Returns an iterator of all good gossipsub peers that are supposed to be custodying - /// the given subnet id. + /// Returns an iterator of all peers that are supposed to be custodying + /// the given subnet id that also belong to `allowed_peers`. pub fn good_range_sync_custody_subnet_peer<'a>( &'a self, subnet: DataColumnSubnetId, @@ -875,7 +875,7 @@ impl PeerDB { ) => { // Update the ENR if one exists, and compute the custody subnets if let Some(enr) = enr { - info.set_enr(enr); + info.set_enr(enr, ); } match current_state { diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 08acd6a20c9..02ae3e01f17 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -17,8 +17,6 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, - /// The column indices corresponding to the id - column_peers: HashMap>, } enum ByRangeRequest { @@ -34,6 +32,8 @@ enum RangeBlockDataRequest { DataColumnsByRangeRequestId, ByRangeRequest>, >, + /// The column indices corresponding to the request + column_peers: HashMap>, expected_custody_columns: Vec, }, } @@ -53,18 +53,16 @@ impl RangeBlockComponentsRequest { Vec, )>, ) -> Self { - let mut column_peers = HashMap::new(); let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) } else if let Some((requests, expected_custody_columns)) = data_columns { + let column_peers: HashMap<_, _> = requests.into_iter().collect(); RangeBlockDataRequest::DataColumns { - requests: requests - .into_iter() - .map(|(id, indices)| { - column_peers.insert(id, indices); - (id, ByRangeRequest::Active(id)) - }) + requests: column_peers + .keys() + .map(|id| (*id, ByRangeRequest::Active(*id))) .collect(), + column_peers, expected_custody_columns, } } else { @@ -74,7 +72,6 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, - column_peers, } } @@ -86,10 +83,11 @@ impl RangeBlockComponentsRequest { RangeBlockDataRequest::DataColumns { requests, expected_custody_columns: _, + column_peers, } => { for (request, columns) in failed_column_requests.into_iter() { requests.insert(request, ByRangeRequest::Active(request)); - self.column_peers.insert(request, columns); + column_peers.insert(request, columns); } Ok(()) } @@ -167,9 +165,10 @@ impl RangeBlockComponentsRequest { RangeBlockDataRequest::DataColumns { requests, expected_custody_columns, + column_peers, } => { let mut data_columns = vec![]; - let mut column_peers: HashMap = HashMap::new(); + let mut column_to_peer_id: HashMap = HashMap::new(); for req in requests.values() { let Some(data) = req.to_finished() else { return None; @@ -179,16 +178,16 @@ impl RangeBlockComponentsRequest { // Note: this assumes that only 1 peer is responsible for a column // with a batch. - for (id, columns) in &self.column_peers { + for (id, columns) in column_peers { for column in columns { - column_peers.insert(*column, id.peer); + column_to_peer_id.insert(*column, id.peer); } } let resp = Self::responses_with_custody_columns( blocks.to_vec(), data_columns, - column_peers, + column_to_peer_id, expected_custody_columns, spec, ); @@ -281,7 +280,7 @@ impl RangeBlockComponentsRequest { fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, - column_peers: HashMap, + column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], spec: &ChainSpec, ) -> Result>, CouplingError> { @@ -324,7 +323,7 @@ impl RangeBlockComponentsRequest { // This is potentially dangerous as we can get isolated on a chain with a // malicious block peer. // TODO: fix this by checking the proposer signature before downloading columns. - let responsible_peers = column_peers.iter().map(|c| (*c.0, *c.1)).collect(); + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); return Err(CouplingError { msg: format!("No columns for block {block_root:?} with data"), column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)), @@ -342,7 +341,7 @@ impl RangeBlockComponentsRequest { } else { // Penalize the peer for claiming to have the columns but not returning // them - let Some(responsible_peer) = column_peers.get(index) else { + let Some(responsible_peer) = column_to_peer.get(index) else { return Err(CouplingError { msg: format!("Internal error, no request made for column {}", index), column_and_peer: None, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 102129ccab4..f34816d1de2 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -155,10 +155,10 @@ where debug!(%peer_id, "Finalization sync peer joined"); self.awaiting_head_peers.remove(&peer_id); - // // Because of our change in finalized sync batch size from 2 to 1 and our transition - // // to using exact epoch boundaries for batches (rather than one slot past the epoch - // // boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's - // // finalized slot in order to finalize the chain locally. + // Because of our change in finalized sync batch size from 2 to 1 and our transition + // to using exact epoch boundaries for batches (rather than one slot past the epoch + // boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's + // finalized slot in order to finalize the chain locally. let target_head_slot = remote_finalized_slot + (2 * T::EthSpec::slots_per_epoch()) + 1; From 11adbbf339fca1e4cc44a61672d60420a3344962 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 9 Jul 2025 19:40:50 -0700 Subject: [PATCH 10/11] Downscore unique peers --- .../network/src/sync/range_sync/chain.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 80533703e64..b4c35f6b567 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -839,20 +839,19 @@ impl SyncingChain { msg, }) = &err { - debug!(msg, "Block components coupling error"); + debug!(?batch_id, msg, "Block components coupling error"); if let Some((column_and_peer, action)) = column_and_peer { let mut failed_columns = HashSet::new(); + let mut failed_peers = HashSet::new(); for (column, peer) in column_and_peer { - network.report_peer(*peer, *action, "failed to return columns"); failed_columns.insert(*column); - - return self.retry_partial_batch( - network, - batch_id, - request_id, - failed_columns, - ); + failed_peers.insert(*peer); } + for peer in failed_peers.into_iter() { + network.report_peer(peer, *action, "failed to return columns"); + } + + return self.retry_partial_batch(network, batch_id, request_id, failed_columns); } } // A batch could be retried without the peer failing the request (disconnecting/ From 1f5bff999ba5cd07f8a156831f1cde2ee00ca031 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 9 Jul 2025 19:57:12 -0700 Subject: [PATCH 11/11] More cleanup --- .../network/src/sync/block_sidecar_coupling.rs | 2 ++ .../network/src/sync/network_context.rs | 7 +++++-- .../network/src/sync/range_sync/batch.rs | 1 - .../network/src/sync/range_sync/chain.rs | 18 ++++++++++++++---- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 02ae3e01f17..89f72b1c87d 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -75,6 +75,8 @@ impl RangeBlockComponentsRequest { } } + /// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed + /// request for some columns. pub fn reinsert_failed_column_requests( &mut self, failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ba345ed7452..a20f2793a41 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -442,6 +442,10 @@ impl SyncNetworkContext { active_request_count_by_peer } + /// Retries only the specified failed columns by requesting them again. + /// + /// Note: This function doesn't retry the whole batch, but retries specific requests within + /// the batch. pub fn retry_columns_by_range( &mut self, request_id: Id, @@ -461,13 +465,13 @@ impl SyncNetworkContext { }; let active_request_count_by_peer = self.active_request_count_by_peer(); - // Attempt to find all required custody peers to request the failed columns from debug!( ?failed_columns, "Retrying only failed column requests from other peers" ); + // Attempt to find all required custody peers to request the failed columns from let columns_by_range_peers_to_request = self .select_columns_by_range_peers_to_request( &failed_columns, @@ -501,7 +505,6 @@ impl SyncNetworkContext { // instead of creating a new `RangeBlockComponentsRequest`, we reinsert // the new requests created for the failed requests - let Some(range_request) = self.components_by_range_requests.get_mut(&id) else { return Err( "retrying custody request for range request that does not exist".to_string(), diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 78478b98007..e31930075a8 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -365,7 +365,6 @@ impl BatchInfo { } } - #[tracing::instrument(skip(self), fields(state = %self.state, start = %self.start_slot))] pub fn processing_completed( &mut self, procesing_result: BatchProcessingResult, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index b4c35f6b567..82bd5fa605a 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -840,6 +840,8 @@ impl SyncingChain { }) = &err { debug!(?batch_id, msg, "Block components coupling error"); + // Note: we don't fail the batch here because a `CouplingError` is + // recoverable by requesting from other honest peers. if let Some((column_and_peer, action)) = column_and_peer { let mut failed_columns = HashSet::new(); let mut failed_peers = HashSet::new(); @@ -847,11 +849,17 @@ impl SyncingChain { failed_columns.insert(*column); failed_peers.insert(*peer); } - for peer in failed_peers.into_iter() { - network.report_peer(peer, *action, "failed to return columns"); + for peer in failed_peers.iter() { + network.report_peer(*peer, *action, "failed to return columns"); } - return self.retry_partial_batch(network, batch_id, request_id, failed_columns); + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + failed_peers, + ); } } // A batch could be retried without the peer failing the request (disconnecting/ @@ -979,6 +987,7 @@ impl SyncingChain { Ok(KeepChain) } + /// Retries partial column requests within the batch by creating new requests for the failed columns. #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn retry_partial_batch( &mut self, @@ -986,9 +995,10 @@ impl SyncingChain { batch_id: BatchId, id: Id, failed_columns: HashSet, + mut failed_peers: HashSet, ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { - let failed_peers = batch.failed_peers(); + failed_peers.extend(&batch.failed_peers()); let req = batch.to_blocks_by_range_request().0; let synced_peers = network