Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ pub struct Config {
pub enable_beacon_processor: bool,
#[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode,
pub target_peers: usize,
}

impl Default for Config {
Expand All @@ -162,7 +161,6 @@ impl Default for Config {
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED,
target_peers: 100,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl Default for Config {
enr_udp6_port: None,
enr_quic6_port: None,
enr_tcp6_port: None,
target_peers: 100,
target_peers: 200,
discv5_config,
boot_nodes_enr: vec![],
boot_nodes_multiaddr: vec![],
Expand Down
430 changes: 419 additions & 11 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs

Large diffs are not rendered by default.

43 changes: 41 additions & 2 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ impl<E: EthSpec> PeerDB<E> {
.filter(move |(_, info)| {
// We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers
info.is_connected()
&& info.is_synced_or_advanced()
Copy link
Member Author

@jimmygchen jimmygchen Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think again about this change - this is used to determine if we have enough good peers when starting discovery query:

// Already have target number of peers, no need for subnet discovery
let peers_on_subnet = self
.network_globals
.peers
.read()
.good_peers_on_subnet(s.subnet)
.count();
if peers_on_subnet >= TARGET_SUBNET_PEERS {
trace!(
subnet = ?s.subnet,
reason = "Already connected to desired peers",

So the condition to push a PeerManagerEvent::DiscoverSubnetPeers below must also satisfy this

fn maintain_custody_peers(&mut self) {
let subnets_to_discover: Vec<SubnetDiscovery> = self
.network_globals
.sampling_subnets()
.iter()
.filter_map(|custody_subnet| {
if self
.network_globals
.peers
.read()
.has_good_peers_in_custody_subnet(
custody_subnet,
MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize,
)
{
None
} else {
Some(SubnetDiscovery {
subnet: Subnet::DataColumn(*custody_subnet),
min_ttl: None,
})
}
})
.collect();

Otherwise it might result in pushing a DiscoverSubnetPeers event to network but then it gets ignored by network.

&& info.on_subnet_metadata(&subnet)
&& info.on_subnet_gossipsub(&subnet)
&& info.is_good_gossipsub_peer()
Expand All @@ -318,7 +319,43 @@ impl<E: EthSpec> PeerDB<E> {
.filter(move |(_, info)| {
// The custody_subnets hashset can be populated via enr or metadata
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);
info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer
info.is_connected()
&& info.is_good_gossipsub_peer()
&& is_custody_subnet_peer
&& matches!(
info.sync_status(),
SyncStatus::Synced { .. } | SyncStatus::Advanced { .. }
)
})
.map(|(peer_id, _)| peer_id)
}

/// Returns an iterator of all good gossipsub peers that are supposed to be custodying
/// the given subnet id.
pub fn good_custody_subnet_peer_range_sync(
&self,
subnet: DataColumnSubnetId,
epoch: Epoch,
) -> impl Iterator<Item = &PeerId> {
self.peers
.iter()
.filter(move |(_, info)| {
// The custody_subnets hashset can be populated via enr or metadata
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);

info.is_connected()
&& is_custody_subnet_peer
&& match info.sync_status() {
SyncStatus::Synced { info } => {
info.has_slot(epoch.end_slot(E::slots_per_epoch()))
}
SyncStatus::Advanced { info } => {
info.has_slot(epoch.end_slot(E::slots_per_epoch()))
}
SyncStatus::IrrelevantPeer
| SyncStatus::Behind { .. }
| SyncStatus::Unknown => false,
}
})
.map(|(peer_id, _)| peer_id)
}
Expand All @@ -333,7 +370,9 @@ impl<E: EthSpec> PeerDB<E> {
.iter()
.filter(move |(_, info)| {
// The custody_subnets hashset can be populated via enr or metadata
info.is_connected() && info.is_assigned_to_custody_subnet(&subnet)
info.is_connected()
&& info.is_synced_or_advanced()
&& info.is_assigned_to_custody_subnet(&subnet)
})
.map(|(peer_id, _)| peer_id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl<E: EthSpec> PeerInfo<E> {

/// Returns the number of long lived subnets a peer is subscribed to.
// NOTE: This currently excludes sync committee subnets
pub fn long_lived_subnet_count(&self) -> usize {
pub fn long_lived_attnet_count(&self) -> usize {
if let Some(meta_data) = self.meta_data.as_ref() {
return meta_data.attnets().num_set_bits();
} else if let Some(enr) = self.enr.as_ref()
Expand Down Expand Up @@ -222,6 +222,13 @@ impl<E: EthSpec> PeerInfo<E> {
}
}
}

long_lived_subnets.extend(
self.custody_subnets
.iter()
.map(|&id| Subnet::DataColumn(id)),
);

long_lived_subnets
}

Expand Down Expand Up @@ -262,6 +269,11 @@ impl<E: EthSpec> PeerInfo<E> {
{
return true;
}

if !self.custody_subnets.is_empty() {
return true;
}

false
}

Expand Down Expand Up @@ -318,6 +330,14 @@ impl<E: EthSpec> PeerInfo<E> {
)
}

/// Checks if the peer is synced or advanced.
pub fn is_synced_or_advanced(&self) -> bool {
matches!(
self.sync_status,
SyncStatus::Synced { .. } | SyncStatus::Advanced { .. }
)
}

/// Checks if the status is connected.
pub fn is_dialing(&self) -> bool {
matches!(self.connection_status, PeerConnectionStatus::Dialing { .. })
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ impl<E: EthSpec> RequestType<E> {
match self {
// add more protocols when versions/encodings are supported
RequestType::Status(_) => vec![
ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::StatusV2, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy),
],
RequestType::Goodbye(_) => vec![ProtocolId::new(
SupportedProtocol::GoodbyeV1,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1911,7 +1911,7 @@ impl<E: EthSpec> Network<E> {
}
},
};
debug!(our_addr = %local_addr, from = %send_back_addr, error = error_repr, "Failed incoming connection");
tracing::trace!(our_addr = %local_addr, from = %send_back_addr, error = error_repr, "Failed incoming connection");
None
}
SwarmEvent::OutgoingConnectionError {
Expand Down
18 changes: 12 additions & 6 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,23 @@ fn test_tcp_status_rpc() {
.await;

// Dummy STATUS RPC message
let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 {
let rpc_request = RequestType::Status(StatusMessage::V2(StatusMessageV2 {
fork_digest: [0; 4],
finalized_root: Hash256::zero(),
finalized_epoch: Epoch::new(1),
head_root: Hash256::zero(),
head_slot: Slot::new(1),
earliest_available_slot: Slot::new(0),
}));

// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 {
let rpc_response = Response::Status(StatusMessage::V2(StatusMessageV2 {
fork_digest: [0; 4],
finalized_root: Hash256::zero(),
finalized_epoch: Epoch::new(1),
head_root: Hash256::zero(),
head_slot: Slot::new(1),
earliest_available_slot: Slot::new(0),
}));

// build the sender future
Expand Down Expand Up @@ -1205,21 +1207,23 @@ fn test_delayed_rpc_response() {
.await;

// Dummy STATUS RPC message
let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 {
let rpc_request = RequestType::Status(StatusMessage::V2(StatusMessageV2 {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
earliest_available_slot: Slot::new(0),
}));

// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 {
let rpc_response = Response::Status(StatusMessage::V2(StatusMessageV2 {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
earliest_available_slot: Slot::new(0),
}));

// build the sender future
Expand Down Expand Up @@ -1335,21 +1339,23 @@ fn test_active_requests() {
.await;

// Dummy STATUS RPC request.
let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 {
let rpc_request = RequestType::Status(StatusMessage::V2(StatusMessageV2 {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
earliest_available_slot: Slot::new(0),
}));

// Dummy STATUS RPC response.
let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 {
let rpc_response = Response::Status(StatusMessage::V2(StatusMessageV2 {
fork_digest: [0; 4],
finalized_root: Hash256::zero(),
finalized_epoch: Epoch::new(1),
head_root: Hash256::zero(),
head_slot: Slot::new(1),
earliest_available_slot: Slot::new(0),
}));

// Number of requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
send_block_count += 1;
}
Ok(None) => {
debug!(
tracing::trace!(
%peer_id,
request_root = ?root,
"Peer requested unknown block"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::data_availability_checker::MaybeAvailableBlock;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult,
HistoricalBlockError, NotifyExecutionLayer, validator_monitor::get_slot_delay_ms,
ExecutionPayloadError, HistoricalBlockError, NotifyExecutionLayer,
validator_monitor::get_slot_delay_ms,
};
use beacon_processor::{
AsyncFn, BlockingFn, DuplicateCache,
Expand Down Expand Up @@ -774,7 +775,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
ref err @ BlockError::ExecutionPayloadError(ref epe) => {
if !epe.penalize_peer() {
if matches!(epe, ExecutionPayloadError::RejectedByExecutionEngine { .. }) {
debug!(
error = ?err,
"Invalid execution payload rejected by EE"
);
Err(ChainSegmentFailed {
message: format!(
"Peer sent a block containing invalid execution payload. Reason: {:?}",
err
),
peer_action: Some(PeerAction::LowToleranceError),
})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pawanjay176 do we need this change?

This may be due to optimistic EL right?

// An honest optimistic node may propagate blocks which are rejected by an EE, do not
// penalize them.
ExecutionPayloadError::RejectedByExecutionEngine { .. } => false,

} else if !epe.penalize_peer() {
// These errors indicate an issue with the EL and not the `ChainSegment`.
// Pause the syncing while the EL recovers
debug!(
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
CouplingError::DataColumnPeerFailure {
error,
faulty_peers,
action,
exceeded_retries,
} => {
debug!(?batch_id, error, "Block components coupling error");
Expand All @@ -325,9 +324,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
failed_columns.insert(*column);
failed_peers.insert(*peer);
}
for peer in failed_peers.iter() {
network.report_peer(*peer, *action, "failed to return columns");
}

// Only retry if peer failure **and** retries have been exceeded
if !*exceeded_retries {
Expand Down
12 changes: 2 additions & 10 deletions beacon_node/network/src/sync/block_sidecar_coupling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use beacon_chain::{
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
};
use lighthouse_network::{
PeerAction, PeerId,
PeerId,
service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
},
Expand Down Expand Up @@ -60,7 +60,6 @@ pub(crate) enum CouplingError {
DataColumnPeerFailure {
error: String,
faulty_peers: Vec<(ColumnIndex, PeerId)>,
action: PeerAction,
exceeded_retries: bool,
},
BlobPeerFailure(String),
Expand Down Expand Up @@ -249,7 +248,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
if let Err(CouplingError::DataColumnPeerFailure {
error: _,
faulty_peers,
action: _,
exceeded_retries: _,
}) = &resp
{
Expand Down Expand Up @@ -374,7 +372,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
return Err(CouplingError::DataColumnPeerFailure {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
action: PeerAction::LowToleranceError,
exceeded_retries,

});
Expand All @@ -399,7 +396,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
return Err(CouplingError::DataColumnPeerFailure {
error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
faulty_peers: naughty_peers,
action: PeerAction::LowToleranceError,
exceeded_retries
});
}
Expand Down Expand Up @@ -465,7 +461,7 @@ mod tests {
NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec,
};
use lighthouse_network::{
PeerAction, PeerId,
PeerId,
service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, Id, RangeRequestId,
Expand Down Expand Up @@ -773,15 +769,13 @@ mod tests {
if let Err(super::CouplingError::DataColumnPeerFailure {
error,
faulty_peers,
action,
exceeded_retries,
}) = result
{
assert!(error.contains("Peers did not return column"));
assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing
assert_eq!(faulty_peers[0].0, 3); // column index 3
assert_eq!(faulty_peers[1].0, 4); // column index 4
assert!(matches!(action, PeerAction::LowToleranceError));
assert!(!exceeded_retries); // First attempt, should be false
} else {
panic!("Expected PeerFailure error");
Expand Down Expand Up @@ -943,13 +937,11 @@ mod tests {
if let Err(super::CouplingError::DataColumnPeerFailure {
error: _,
faulty_peers,
action,
exceeded_retries,
}) = result
{
assert_eq!(faulty_peers.len(), 1); // column 2 missing
assert_eq!(faulty_peers[0].0, 2); // column index 2
assert!(matches!(action, PeerAction::LowToleranceError));
assert!(exceeded_retries); // Should be true after max retries
} else {
panic!("Expected PeerFailure error with exceeded_retries=true");
Expand Down
1 change: 0 additions & 1 deletion beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let range_req = entry.get_mut();
if let Some(blocks_result) = range_req.responses(&self.chain.spec) {
if let Err(CouplingError::DataColumnPeerFailure {
action: _,
error,
faulty_peers: _,
exceeded_retries,
Expand Down
Loading
Loading