diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index b28807c47e8..813f95b7bd2 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -248,6 +248,34 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns all the synced peers from the list of allowed peers that claim to have the block + /// components for the given epoch based on `status.earliest_available_slot`. + /// + /// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the + /// required data. + pub fn synced_peers_for_epoch<'a>( + &'a self, + epoch: Epoch, + allowed_peers: &'a HashSet, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(peer_id, info)| { + allowed_peers.contains(peer_id) + && info.is_connected() + && match info.sync_status() { + SyncStatus::Synced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + SyncStatus::Advanced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + _ => false, + } + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives the `peer_id` of all known connected and advanced peers. pub fn advanced_peers(&self) -> impl Iterator { self.peers @@ -291,6 +319,25 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns an iterator of all peers that are supposed to be custodying + /// the given subnet id that also belong to `allowed_peers`. + pub fn good_range_sync_custody_subnet_peer<'a>( + &'a self, + subnet: DataColumnSubnetId, + allowed_peers: &'a HashSet, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(peer_id, info)| { + // The custody_subnets hashset can be populated via enr or metadata + let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); + allowed_peers.contains(peer_id) + && info.is_connected() + && is_custody_subnet_peer + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives the ids of all known disconnected peers. pub fn disconnected_peers(&self) -> impl Iterator { self.peers @@ -828,7 +875,7 @@ impl PeerDB { ) => { // Update the ENR if one exists, and compute the custody subnets if let Some(enr) = enr { - info.set_enr(enr); + info.set_enr(enr, ); } match current_state { diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs index 5a4fc339940..aeda375a0d0 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs @@ -28,6 +28,19 @@ pub struct SyncInfo { pub earliest_available_slot: Option, } +impl SyncInfo { + /// Returns true if the provided slot is greater than `earliest_available_slot`. + /// + /// If `earliest_available_slot` does is None, then we just assume that the peer has the slot. + pub fn has_slot(&self, slot: Slot) -> bool { + if let Some(earliest_available_slot) = self.earliest_available_slot { + slot >= earliest_available_slot + } else { + true + } + } +} + impl std::cmp::PartialEq for SyncStatus { fn eq(&self, other: &Self) -> bool { matches!( diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index b36f8cc2154..3013596f9f7 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,4 +1,5 @@ use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage}; +use libp2p::PeerId; use std::fmt::{Display, Formatter}; use std::sync::Arc; use types::{ @@ -61,6 +62,11 @@ pub struct DataColumnsByRangeRequestId { pub id: Id, /// The Id of the overall By Range request for block components. pub parent_request_id: ComponentsByRangeRequestId, + /// The peer id associated with the request. + /// + /// This is useful to penalize the peer at a later point if it returned data columns that + /// did not match with the verified block. + pub peer: PeerId, } /// Block components by range request for range sync. Includes an ID for downstream consumers to @@ -306,6 +312,7 @@ mod tests { batch_id: Epoch::new(0), }, }, + peer: PeerId::random(), }; assert_eq!(format!("{id}"), "123/122/RangeSync/0/54"); } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 0418ab45534..89f72b1c87d 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,15 +1,17 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; -use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, +use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + }, + PeerAction, PeerId, }; use std::{collections::HashMap, sync::Arc}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; - pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, @@ -30,24 +32,37 @@ enum RangeBlockDataRequest { DataColumnsByRangeRequestId, ByRangeRequest>, >, + /// The column indices corresponding to the request + column_peers: HashMap>, expected_custody_columns: Vec, }, } +#[derive(Debug)] +pub struct CouplingError { + pub(crate) msg: String, + pub(crate) column_and_peer: Option<(Vec<(u64, PeerId)>, PeerAction)>, +} + impl RangeBlockComponentsRequest { pub fn new( blocks_req_id: BlocksByRangeRequestId, blobs_req_id: Option, - data_columns: Option<(Vec, Vec)>, + data_columns: Option<( + Vec<(DataColumnsByRangeRequestId, Vec)>, + Vec, + )>, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) } else if let Some((requests, expected_custody_columns)) = data_columns { + let column_peers: HashMap<_, _> = requests.into_iter().collect(); RangeBlockDataRequest::DataColumns { - requests: requests - .into_iter() - .map(|id| (id, ByRangeRequest::Active(id))) + requests: column_peers + .keys() + .map(|id| (*id, ByRangeRequest::Active(*id))) .collect(), + column_peers, expected_custody_columns, } } else { @@ -60,6 +75,28 @@ impl RangeBlockComponentsRequest { } } + /// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed + /// request for some columns. + pub fn reinsert_failed_column_requests( + &mut self, + failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::DataColumns { + requests, + expected_custody_columns: _, + column_peers, + } => { + for (request, columns) in failed_column_requests.into_iter() { + requests.insert(request, ByRangeRequest::Active(request)); + column_peers.insert(request, columns); + } + Ok(()) + } + _ => Err("not a column request".to_string()), + } + } + pub fn add_blocks( &mut self, req_id: BlocksByRangeRequestId, @@ -105,12 +142,15 @@ impl RangeBlockComponentsRequest { } } - pub fn responses(&self, spec: &ChainSpec) -> Option>, String>> { + pub fn responses( + &mut self, + spec: &ChainSpec, + ) -> Option>, CouplingError>> { let Some(blocks) = self.blocks_request.to_finished() else { return None; }; - match &self.block_data_request { + match &mut self.block_data_request { RangeBlockDataRequest::NoData => { Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) } @@ -127,8 +167,10 @@ impl RangeBlockComponentsRequest { RangeBlockDataRequest::DataColumns { requests, expected_custody_columns, + column_peers, } => { let mut data_columns = vec![]; + let mut column_to_peer_id: HashMap = HashMap::new(); for req in requests.values() { let Some(data) = req.to_finished() else { return None; @@ -136,12 +178,33 @@ impl RangeBlockComponentsRequest { data_columns.extend(data.clone()) } - Some(Self::responses_with_custody_columns( + // Note: this assumes that only 1 peer is responsible for a column + // with a batch. + for (id, columns) in column_peers { + for column in columns { + column_to_peer_id.insert(*column, id.peer); + } + } + + let resp = Self::responses_with_custody_columns( blocks.to_vec(), data_columns, + column_to_peer_id, expected_custody_columns, spec, - )) + ); + + if let Err(err) = &resp { + if let Some((peers, _)) = &err.column_and_peer { + for (_, peer) in peers.iter() { + // find the req id associated with the peer and + // delete it from the entries + requests.retain(|&k, _| k.peer != *peer); + } + } + } + + Some(resp) } } } @@ -150,7 +213,7 @@ impl RangeBlockComponentsRequest { blocks: Vec>>, blobs: Vec>>, spec: &ChainSpec, - ) -> Result>, String> { + ) -> Result>, CouplingError> { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(blocks.len()); @@ -165,17 +228,26 @@ impl RangeBlockComponentsRequest { .unwrap_or(false); pair_next_blob } { - blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?); + blob_list.push(blob_iter.next().ok_or_else(|| CouplingError { + msg: "Missing next blob".to_string(), + column_and_peer: None, + })?); } let mut blobs_buffer = vec![None; max_blobs_per_block]; for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { - return Err("Invalid blob index".to_string()); + return Err(CouplingError { + msg: "Invalid blob index".to_string(), + column_and_peer: None, + }); }; if blob_opt.is_some() { - return Err("Repeat blob index".to_string()); + return Err(CouplingError { + msg: "Repeat blob index".to_string(), + column_and_peer: None, + }); } else { *blob_opt = Some(blob); } @@ -184,13 +256,24 @@ impl RangeBlockComponentsRequest { blobs_buffer.into_iter().flatten().collect::>(), max_blobs_per_block, ) - .map_err(|_| "Blobs returned exceeds max length".to_string())?; - responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?) + .map_err(|_| CouplingError { + msg: "Blobs returned exceeds max length".to_string(), + column_and_peer: None, + })?; + responses.push( + RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError { + msg: format!("{e:?}"), + column_and_peer: None, + })?, + ) } // if accumulated sidecars is not empty, throw an error. if blob_iter.next().is_some() { - return Err("Received sidecars that don't pair well".to_string()); + return Err(CouplingError { + msg: "Received sidecars that don't pair well".to_string(), + column_and_peer: None, + }); } Ok(responses) @@ -199,9 +282,10 @@ impl RangeBlockComponentsRequest { fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, + column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], spec: &ChainSpec, - ) -> Result>, String> { + ) -> Result>, CouplingError> { // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -215,9 +299,10 @@ impl RangeBlockComponentsRequest { .insert(index, column) .is_some() { - return Err(format!( - "Repeated column block_root {block_root:?} index {index}" - )); + return Err(CouplingError { + msg: format!("Repeated column block_root {block_root:?} index {index}"), + column_and_peer: None, + }); } } @@ -235,30 +320,61 @@ impl RangeBlockComponentsRequest { // TODO(das): on the initial version of PeerDAS the beacon chain does not check // rpc custody requirements and dropping this check can allow the block to have // an inconsistent DB. - return Err(format!("No columns for block {block_root:?} with data")); + + // For now, we always assume that the block peer is right. + // This is potentially dangerous as we can get isolated on a chain with a + // malicious block peer. + // TODO: fix this by checking the proposer signature before downloading columns. + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); + return Err(CouplingError { + msg: format!("No columns for block {block_root:?} with data"), + column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)), + }); }; let mut custody_columns = vec![]; + let mut naughty_peers = vec![]; for index in expects_custody_columns { - let Some(data_column) = data_columns_by_index.remove(index) else { - return Err(format!("No column for block {block_root:?} index {index}")); - }; - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. - custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); + if let Some(data_column) = data_columns_by_index.remove(index) { + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); + } else { + // Penalize the peer for claiming to have the columns but not returning + // them + let Some(responsible_peer) = column_to_peer.get(index) else { + return Err(CouplingError { + msg: format!("Internal error, no request made for column {}", index), + column_and_peer: None, + }); + }; + naughty_peers.push((*index, *responsible_peer)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError { + msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), + column_and_peer: Some((naughty_peers, PeerAction::LowToleranceError)), + }); } // Assert that there are no columns left if !data_columns_by_index.is_empty() { let remaining_indices = data_columns_by_index.keys().collect::>(); - return Err(format!( - "Not all columns consumed for block {block_root:?}: {remaining_indices:?}" - )); + // log the error but don't return an error, we can still progress with extra columns. + tracing::error!( + ?block_root, + ?remaining_indices, + "Not all columns consumed for block" + ); } RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) - .map_err(|e| format!("{e:?}"))? + .map_err(|e| CouplingError { + msg: format!("{:?}", e), + column_and_peer: None, + })? } else { // Block has no data, expects zero columns RpcBlock::new_without_blobs(Some(block_root), block) @@ -268,7 +384,9 @@ impl RangeBlockComponentsRequest { // Assert that there are no columns left for other blocks if !data_columns_by_block.is_empty() { let remaining_roots = data_columns_by_block.keys().collect::>(); - return Err(format!("Not all columns consumed: {remaining_roots:?}")); + // log the error but don't return an error, we can still progress with responses. + // this is most likely an internal error with overrequesting or a client bug. + tracing::error!(?remaining_roots, "Not all columns consumed for block"); } Ok(rpc_blocks) @@ -303,9 +421,12 @@ mod tests { use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; - use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, Id, RangeRequestId, + use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, Id, RangeRequestId, + }, + PeerId, }; use rand::SeedableRng; use std::sync::Arc; @@ -342,6 +463,7 @@ mod tests { DataColumnsByRangeRequestId { id, parent_request_id, + peer: PeerId::random(), } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d0e62e4ada7..a20f2793a41 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -14,6 +14,7 @@ use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; @@ -81,7 +82,7 @@ pub enum RpcResponseError { RpcError(#[allow(dead_code)] RPCError), VerifyError(LookupVerifyError), CustodyRequestError(#[allow(dead_code)] CustodyRequestError), - BlockComponentCouplingError(#[allow(dead_code)] String), + BlockComponentCouplingError(CouplingError), } #[derive(Debug, PartialEq, Eq)] @@ -441,6 +442,79 @@ impl SyncNetworkContext { active_request_count_by_peer } + /// Retries only the specified failed columns by requesting them again. + /// + /// Note: This function doesn't retry the whole batch, but retries specific requests within + /// the batch. + pub fn retry_columns_by_range( + &mut self, + request_id: Id, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + request: BlocksByRangeRequest, + failed_columns: &HashSet, + ) -> Result<(), String> { + let Some(requester) = self.components_by_range_requests.keys().find_map(|r| { + if r.id == request_id { + Some(r.requester) + } else { + None + } + }) else { + return Err("request id not present".to_string()); + }; + + let active_request_count_by_peer = self.active_request_count_by_peer(); + + debug!( + ?failed_columns, + "Retrying only failed column requests from other peers" + ); + + // Attempt to find all required custody peers to request the failed columns from + let columns_by_range_peers_to_request = self + .select_columns_by_range_peers_to_request( + &failed_columns, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + ) + .map_err(|e| format!("{:?}", e))?; + + // Reuse the id for the request that received partially correct responses + let id = ComponentsByRangeRequestId { + id: request_id, + requester, + }; + + let data_column_requests = columns_by_range_peers_to_request + .into_iter() + .map(|(peer_id, columns)| { + self.send_data_columns_by_range_request( + peer_id, + DataColumnsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + columns, + }, + id, + ) + }) + .collect::, _>>() + .map_err(|e| format!("{:?}", e))?; + + // instead of creating a new `RangeBlockComponentsRequest`, we reinsert + // the new requests created for the failed requests + let Some(range_request) = self.components_by_range_requests.get_mut(&id) else { + return Err( + "retrying custody request for range request that does not exist".to_string(), + ); + }; + + range_request.reinsert_failed_column_requests(data_column_requests)?; + Ok(()) + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, @@ -619,20 +693,31 @@ impl SyncNetworkContext { let request = entry.get_mut(); match range_block_component { RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { - request - .add_blocks(req_id, blocks) - .map_err(RpcResponseError::BlockComponentCouplingError) + request.add_blocks(req_id, blocks).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }), RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { - request - .add_blobs(req_id, blobs) - .map_err(RpcResponseError::BlockComponentCouplingError) + request.add_blobs(req_id, blobs).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }), RangeBlockComponent::CustodyColumns(req_id, resp) => { resp.and_then(|(custody_columns, _)| { request .add_custody_columns(req_id, custody_columns) - .map_err(RpcResponseError::BlockComponentCouplingError) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError { + msg: e, + column_and_peer: None, + }) + }) }) } } @@ -641,8 +726,12 @@ impl SyncNetworkContext { return Some(Err(e)); } - if let Some(blocks_result) = entry.get().responses(&self.chain.spec) { - entry.remove(); + if let Some(blocks_result) = entry.get_mut().responses(&self.chain.spec) { + if blocks_result.is_ok() { + // remove the entry only if it coupled successfully with + // no errors + entry.remove(); + } // If the request is finished, dequeue everything Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError)) } else { @@ -1075,10 +1164,12 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRangeRequest, parent_request_id: ComponentsByRangeRequestId, - ) -> Result { + ) -> Result<(DataColumnsByRangeRequestId, Vec), RpcRequestSendError> { + let requested_columns = request.columns.clone(); let id = DataColumnsByRangeRequestId { id: self.next_id(), parent_request_id, + peer: peer_id, }; self.send_network_msg(NetworkMessage::SendRequest { @@ -1106,7 +1197,7 @@ impl SyncNetworkContext { false, DataColumnsByRangeRequestItems::new(request), ); - Ok(id) + Ok((id, requested_columns)) } pub fn is_execution_engine_online(&self) -> bool { diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 264f83ee820..e31930075a8 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -89,6 +89,7 @@ pub enum BatchOperationOutcome { Failed { blacklist: bool }, } +#[derive(Debug)] pub enum BatchProcessingResult { Success, FaultyFailure, @@ -364,7 +365,6 @@ impl BatchInfo { } } - #[must_use = "Batch may have failed"] pub fn processing_completed( &mut self, procesing_result: BatchProcessingResult, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index cc49c437112..82bd5fa605a 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -2,6 +2,7 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use super::RangeSyncType; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; @@ -12,7 +13,7 @@ use logging::crit; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use strum::IntoStaticStr; use tracing::{debug, instrument, warn}; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -833,6 +834,34 @@ impl SyncingChain { ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { + if let RpcResponseError::BlockComponentCouplingError(CouplingError { + column_and_peer, + msg, + }) = &err + { + debug!(?batch_id, msg, "Block components coupling error"); + // Note: we don't fail the batch here because a `CouplingError` is + // recoverable by requesting from other honest peers. + if let Some((column_and_peer, action)) = column_and_peer { + let mut failed_columns = HashSet::new(); + let mut failed_peers = HashSet::new(); + for (column, peer) in column_and_peer { + failed_columns.insert(*column); + failed_peers.insert(*peer); + } + for peer in failed_peers.iter() { + network.report_peer(*peer, *action, "failed to return columns"); + } + + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + failed_peers, + ); + } + } // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer @@ -898,7 +927,7 @@ impl SyncingChain { .network_globals() .peers .read() - .synced_peers() + .synced_peers_for_epoch(batch_id, &self.peers) .cloned() .collect::>(); @@ -958,6 +987,50 @@ impl SyncingChain { Ok(KeepChain) } + /// Retries partial column requests within the batch by creating new requests for the failed columns. + #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] + pub fn retry_partial_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + id: Id, + failed_columns: HashSet, + mut failed_peers: HashSet, + ) -> ProcessingResult { + if let Some(batch) = self.batches.get_mut(&batch_id) { + failed_peers.extend(&batch.failed_peers()); + let req = batch.to_blocks_by_range_request().0; + + let synced_peers = network + .network_globals() + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + + match network.retry_columns_by_range( + id, + &synced_peers, + &failed_peers, + req, + &failed_columns, + ) { + Ok(_) => { + debug!( + ?batch_id, + id, "Retried column requests from different peers" + ); + return Ok(KeepChain); + } + Err(e) => { + debug!(?batch_id, id, e, "Failed to retry partial batch"); + } + } + } + Ok(KeepChain) + } + /// Returns true if this chain is currently syncing. #[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)] pub fn is_syncing(&self) -> bool { @@ -1039,9 +1112,8 @@ impl SyncingChain { .network_globals() .peers .read() - .good_custody_subnet_peer(*subnet_id) + .good_range_sync_custody_subnet_peer(*subnet_id, &self.peers) .count(); - peer_count > 0 }); peers_on_all_custody_subnets