diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 6cc473493..a2b195fa6 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -155,7 +155,19 @@ impl AnchorBehaviour { PeerManager::new(network_config, one_epoch_duration) }; - let handshake = handshake::create_behaviour(local_keypair); + let handshake = { + let domain_type: String = network_config.domain_type.into(); + let node_info = handshake::node_info::NodeInfo::new( + domain_type, + Some(handshake::node_info::NodeMetadata { + node_version: version_with_platform(), + execution_node: "geth/v1.10.8".to_string(), + consensus_node: "lighthouse/v1.5.0".to_string(), + subnets: "00000000000000000000000000000000".to_string(), + }), + ); + handshake::Behaviour::new(local_keypair, node_info) + }; Ok(AnchorBehaviour { identify, diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 17ed598aa..77242ffb9 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -2,21 +2,44 @@ mod codec; mod envelope; pub mod node_info; +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + use discv5::libp2p_identity::Keypair; use libp2p::{ PeerId, StreamProtocol, request_response::{ - Behaviour as RequestResponseBehaviour, Config, InboundFailure, Message, OutboundFailure, - ProtocolSupport, ResponseChannel, + Behaviour as RequestResponseBehaviour, Config, Event as RequestResponseEvent, + InboundFailure, Message, OutboundFailure, ProtocolSupport, ResponseChannel, }, - swarm::NetworkBehaviour, + swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm}, }; -use tracing::{debug, trace}; +use tracing::trace; use crate::handshake::{codec::Codec, node_info::NodeInfo}; -pub type Behaviour = RequestResponseBehaviour; -pub type Event = ::ToSwarm; +/// Event emitted on handshake completion or failure. +#[derive(Debug)] +pub enum Event { + Completed { + peer_id: PeerId, + their_info: NodeInfo, + }, + Failed { + peer_id: PeerId, + error: Box, + }, +} + +/// Network behaviour handling the handshake protocol. +/// Automatically initiates handshakes on outbound connections. +pub struct Behaviour { + inner: RequestResponseBehaviour, + node_info: NodeInfo, + events: VecDeque, +} #[derive(Debug)] pub enum Error { @@ -30,30 +53,89 @@ pub enum Error { Outbound(OutboundFailure), } -/// We successfully completed a handshake. -#[derive(Debug)] -pub struct Completed { - pub peer_id: PeerId, - pub their_info: NodeInfo, -} +impl Behaviour { + /// Create a new handshake Behaviour. + /// The behaviour automatically initiates handshakes on outbound connections. + pub fn new(keypair: Keypair, node_info: NodeInfo) -> Self { + let protocol = StreamProtocol::new("/ssv/info/0.0.1"); + let inner = RequestResponseBehaviour::with_codec( + Codec::new(keypair), + [(protocol, ProtocolSupport::Full)], + Config::default(), + ); + Self { + inner, + node_info, + events: VecDeque::new(), + } + } -/// The handshake either failed because of shaking with an incompatible peer or because of some -/// network failure. -#[derive(Debug)] -pub struct Failed { - pub peer_id: PeerId, - pub error: Box, -} + fn verify_and_emit_event(&mut self, peer_id: PeerId, their_info: NodeInfo) { + match verify_node_info(&self.node_info, &their_info) { + Ok(()) => { + self.events.push_back(Event::Completed { + peer_id, + their_info, + }); + } + Err(error) => { + self.events.push_back(Event::Failed { + peer_id, + error: Box::new(error), + }); + } + } + } -/// Create a libp2p Behaviour to handle handshake requests. Events emitted from this event must be -/// fed into [`handle_event`]. -pub fn create_behaviour(keypair: Keypair) -> Behaviour { - let protocol = StreamProtocol::new("/ssv/info/0.0.1"); - Behaviour::with_codec( - Codec::new(keypair), - [(protocol, ProtocolSupport::Full)], - Config::default(), - ) + fn handle_request( + &mut self, + peer_id: PeerId, + request: NodeInfo, + channel: ResponseChannel, + ) { + trace!(?peer_id, "handling handshake request"); + + // Send our info back to the peer + if self + .inner + .send_response(channel, self.node_info.clone()) + .is_err() + { + trace!( + ?peer_id, + "Failed to send handshake response (channel closed)" + ); + } + + // Verify network compatibility and emit event + self.verify_and_emit_event(peer_id, request); + } + + fn handle_response(&mut self, peer_id: PeerId, response: NodeInfo) { + trace!(?peer_id, "handling handshake response"); + + // Verify network compatibility and emit event + self.verify_and_emit_event(peer_id, response); + } + + /// Determines if a handshake should be initiated for this connection. + /// + /// Returns `Some(peer_id)` if: + /// - The event is a ConnectionEstablished event + /// - The connection is outbound (we are the dialer) + /// - This is the first established connection to the peer (other_established == 0) + fn should_initiate_handshake<'a>( + event: &'a libp2p::swarm::FromSwarm<'a>, + ) -> Option<&'a PeerId> { + if let libp2p::swarm::FromSwarm::ConnectionEstablished(conn_est) = event + && let libp2p::core::ConnectedPoint::Dialer { .. } = conn_est.endpoint + && conn_est.other_established == 0 + { + Some(&conn_est.peer_id) + } else { + None + } + } } fn verify_node_info(ours: &NodeInfo, theirs: &NodeInfo) -> Result<(), Error> { @@ -66,114 +148,140 @@ fn verify_node_info(ours: &NodeInfo, theirs: &NodeInfo) -> Result<(), Error> { Ok(()) } -/// Handle an [`Event`] emitted by the passed [`Behaviour`]. The passed [`NodeInfo`] is used for -/// validating the remote peer's data and for responding to incoming requests. -pub fn handle_event( - our_node_info: &NodeInfo, - behaviour: &mut Behaviour, - event: Event, -) -> Option> { - match event { - Event::Message { +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = + as NetworkBehaviour>::ConnectionHandler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + connection_id: libp2p::swarm::ConnectionId, + peer: PeerId, + local_addr: &libp2p::Multiaddr, + remote_addr: &libp2p::Multiaddr, + ) -> Result, libp2p::swarm::ConnectionDenied> { + self.inner.handle_established_inbound_connection( + connection_id, peer, - message: Message::Request { - request, channel, .. - }, - .. - } => Some(handle_request( - our_node_info, - behaviour, - peer, - request, - channel, - )), - Event::Message { + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: libp2p::swarm::ConnectionId, + peer: PeerId, + addr: &libp2p::Multiaddr, + role_override: libp2p::core::Endpoint, + port_use: libp2p::core::transport::PortUse, + ) -> Result, libp2p::swarm::ConnectionDenied> { + self.inner.handle_established_outbound_connection( + connection_id, peer, - message: Message::Response { response, .. }, - .. - } => Some(handle_response(our_node_info, peer, response)), - Event::OutboundFailure { peer, error, .. } => Some(Err(Failed { - peer_id: peer, - error: Box::new(Error::Outbound(error)), - })), - Event::InboundFailure { peer, error, .. } => Some(Err(Failed { - peer_id: peer, - error: Box::new(Error::Inbound(error)), - })), - Event::ResponseSent { .. } => None, + addr, + role_override, + port_use, + ) } -} -fn handle_request( - our_node_info: &NodeInfo, - behaviour: &mut Behaviour, - peer_id: PeerId, - request: NodeInfo, - channel: ResponseChannel, -) -> Result { - trace!(?peer_id, "handling handshake request"); - // Handle incoming request: send response then verify - // Any error here is handled by the InboundFailure handler - let _ = behaviour.send_response(channel, our_node_info.clone()); - - verify_node_info(our_node_info, &request).map_err(|error| Failed { - peer_id, - error: Box::new(error), - })?; - - Ok(Completed { - peer_id, - their_info: request, - }) -} + fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { + // Auto-initiate handshake on first outbound connection + if let Some(peer_id) = Self::should_initiate_handshake(&event) { + trace!( + ?peer_id, + "Auto-initiating handshake on first outbound connection" + ); + self.inner.send_request(peer_id, self.node_info.clone()); + } + self.inner.on_swarm_event(event); + } -fn handle_response( - our_node_info: &NodeInfo, - peer_id: PeerId, - response: NodeInfo, -) -> Result { - trace!(?peer_id, "handling handshake response"); - verify_node_info(our_node_info, &response).map_err(|error| Failed { - peer_id, - error: Box::new(error), - })?; - - Ok(Completed { - peer_id, - their_info: response, - }) -} + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: libp2p::swarm::ConnectionId, + event: libp2p::swarm::THandlerOutEvent, + ) { + self.inner + .on_connection_handler_event(peer_id, connection_id, event); + } -/// Send a handshake request to a specified peer. Should be called after establishing an outgoing -/// connection. -pub fn initiate(our_node_info: &NodeInfo, behaviour: &mut Behaviour, peer_id: PeerId) { - let subnets_bitfield = our_node_info - .metadata - .as_ref() - .map(|m| m.subnets.as_str()) - .unwrap_or("none"); - debug!( - ?peer_id, - subnets_bitfield = %subnets_bitfield, - "Initiating handshake with our subnet bitfield" - ); - behaviour.send_request(&peer_id, our_node_info.clone()); + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + // Process events from inner request-response behaviour + while let Poll::Ready(event) = self.inner.poll(cx) { + match event { + ToSwarm::GenerateEvent(req_resp_event) => match req_resp_event { + RequestResponseEvent::Message { + peer, + message: + Message::Request { + request, channel, .. + }, + .. + } => { + trace!("Received handshake request"); + self.handle_request(peer, request, channel); + } + RequestResponseEvent::Message { + peer, + message: Message::Response { response, .. }, + .. + } => { + trace!(?response, "Received handshake response"); + self.handle_response(peer, response); + } + RequestResponseEvent::OutboundFailure { peer, error, .. } => { + self.events.push_back(Event::Failed { + peer_id: peer, + error: Box::new(Error::Outbound(error)), + }); + } + RequestResponseEvent::InboundFailure { peer, error, .. } => { + self.events.push_back(Event::Failed { + peer_id: peer, + error: Box::new(Error::Inbound(error)), + }); + } + RequestResponseEvent::ResponseSent { .. } => {} + }, + other => { + // Bubble up all other ToSwarm events (Dial, NotifyHandler, CloseConnection, + // etc.) These events don't contain GenerateEvent, so + // map_out's closure is never called. This is safe because + // we've exhaustively handled all GenerateEvent variants above. + return Poll::Ready( + other.map_out(|_| unreachable!("GenerateEvent already handled")), + ); + } + } + } + + // Emit queued events + if let Some(event) = self.events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + Poll::Pending + } } #[cfg(test)] mod tests { // Init tracing - static TRACING: LazyLock<()> = LazyLock::new(|| { - let env_filter = tracing_subscriber::EnvFilter::new("trace"); + static DEBUG: LazyLock<()> = LazyLock::new(|| { + let env_filter = tracing_subscriber::EnvFilter::new("debug"); tracing_subscriber::fmt().with_env_filter(env_filter).init(); }); use std::sync::LazyLock; use discv5::libp2p_identity::Keypair; - use libp2p::swarm::{Swarm, SwarmEvent}; - use libp2p_swarm_test::SwarmExt; - use tokio::select; + use libp2p::swarm::Swarm; + use libp2p_swarm_test::{SwarmExt, drive}; use super::*; use crate::handshake::node_info::NodeMetadata; @@ -190,135 +298,136 @@ mod tests { } } + fn create_test_swarm(keypair: Keypair, node_info: NodeInfo) -> Swarm { + Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, node_info)) + } + + fn assert_completed(event: Event, expected_peer: PeerId, expected_version: &str) { + match event { + Event::Completed { + peer_id, + their_info, + } => { + assert_eq!(peer_id, expected_peer); + assert_eq!(their_info.metadata.unwrap().node_version, expected_version); + } + Event::Failed { error, .. } => panic!("Expected Completed, got Failed: {:?}", error), + } + } + + fn assert_network_mismatch( + event: Event, + expected_peer: PeerId, + expected_ours: &str, + expected_theirs: &str, + ) { + match event { + Event::Failed { peer_id, error } => { + assert_eq!(peer_id, expected_peer); + match *error { + Error::NetworkMismatch { ours, theirs } => { + assert_eq!(ours, expected_ours); + assert_eq!(theirs, expected_theirs); + } + _ => panic!("Expected NetworkMismatch, got {:?}", error), + } + } + Event::Completed { .. } => panic!("Expected Failed, got Completed"), + } + } + #[tokio::test] async fn handshake_success() { - *TRACING; + *DEBUG; + + let mut local_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); + let mut remote_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "remote")); + + tokio::spawn(async move { + local_swarm.listen().with_memory_addr_external().await; + remote_swarm.connect(&mut local_swarm).await; - let local_key = Keypair::generate_ed25519(); - let remote_key = Keypair::generate_ed25519(); + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; - let mut local_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(local_key)); - let local_node_info = node_info("test", "local"); - let mut remote_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(remote_key)); - let remote_node_info = node_info("test", "remote"); + assert_completed(local_event, *remote_swarm.local_peer_id(), "remote"); + assert_completed(remote_event, *local_swarm.local_peer_id(), "local"); + }) + .await + .expect("test completed"); + } + + /// Test that verifies only ONE handshake happens during concurrent dials. + /// + /// Without the `other_established == 0` check, this test would see BOTH peers + /// initiate handshakes, leading to duplicate requests. With the check, only + /// the first ConnectionEstablished triggers a handshake initiation. + #[tokio::test] + async fn concurrent_dials_only_one_handshake() { + *DEBUG; + + let mut local_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); + let mut remote_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "remote")); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; + remote_swarm.listen().with_memory_addr_external().await; - remote_swarm.connect(&mut local_swarm).await; + // Force both peers to dial each other + let local_addr = local_swarm.external_addresses().next().unwrap().clone(); + let remote_addr = remote_swarm.external_addresses().next().unwrap().clone(); - initiate( - &remote_node_info, - remote_swarm.behaviour_mut(), - *local_swarm.local_peer_id(), - ); + local_swarm.dial(remote_addr).unwrap(); + remote_swarm.dial(local_addr).unwrap(); - let mut local_completed = false; - let mut remote_completed = false; - - while !local_completed && !remote_completed { - select!( - SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - let Some(result) = - handle_event(&local_node_info, local_swarm.behaviour_mut(), e) else { - continue; - }; - let Completed { - peer_id, - their_info, - } = result.expect("handshake to succeed"); - assert_eq!(peer_id, *remote_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "remote"); - local_completed = true; - } - SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - let Some(result) = - handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) else { - continue; - }; - let Completed { - peer_id, - their_info, - } = result.expect("handshake to succeed"); - assert_eq!(peer_id, *local_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "local"); - remote_completed = true; - } - else => {} - ) - } + // Drive until both complete - expecting exactly 1 event per peer + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; + + // Both should have completed successfully + assert_completed(local_event, *remote_swarm.local_peer_id(), "remote"); + assert_completed(remote_event, *local_swarm.local_peer_id(), "local"); + + // Key assertion: If we try to drive again with a timeout, + // there should be NO more events (no duplicate handshakes) + use tokio::time::{timeout, Duration}; + + let result = timeout(Duration::from_millis(100), async { + let ([_local], [_remote]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; + }).await; + + // Should timeout - no more handshake events should occur + assert!(result.is_err(), "Expected no more handshake events, but got some! This means duplicate handshakes occurred."); }) .await - .expect("tokio runtime failed"); + .expect("test completed"); } #[tokio::test] async fn mismatched_networks_handshake_failed() { - *TRACING; - - let local_key = Keypair::generate_ed25519(); - let remote_key = Keypair::generate_ed25519(); + *DEBUG; - let mut local_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(local_key)); - let local_node_info = node_info("test1", "local"); - let mut remote_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(remote_key)); - let remote_node_info = node_info("test2", "remote"); + let mut local_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test1", "local")); + let mut remote_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test2", "remote")); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; - remote_swarm.connect(&mut local_swarm).await; - initiate( - &remote_node_info, - remote_swarm.behaviour_mut(), - *local_swarm.local_peer_id(), - ); + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; - let mut local_failed = false; - let mut remote_failed = false; - - while !local_failed && !remote_failed { - select!( - SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - let Some(result) = - handle_event(&local_node_info, local_swarm.behaviour_mut(), e) else { - continue; - }; - let Failed { - peer_id, - error, - } = result.expect_err("handshake to fail"); - let Error::NetworkMismatch { ours, theirs } = *error else { - panic!("expected network mismatch"); - }; - assert_eq!(peer_id, *remote_swarm.local_peer_id()); - assert_eq!(ours, "test1"); - assert_eq!(theirs, "test2"); - local_failed = true; - } - SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - let Some(result) = - handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) else { - continue; - }; - let Failed { - peer_id, - error, - } = result.expect_err("handshake to fail"); - let Error::NetworkMismatch { ours, theirs } = *error else { - panic!("expected network mismatch"); - }; - assert_eq!(peer_id, *local_swarm.local_peer_id()); - assert_eq!(ours, "test2"); - assert_eq!(theirs, "test1"); - remote_failed = true; - } - else => {} - ) - } + assert_network_mismatch(local_event, *remote_swarm.local_peer_id(), "test1", "test2"); + assert_network_mismatch(remote_event, *local_swarm.local_peer_id(), "test2", "test1"); }) .await - .expect("tokio runtime failed"); + .expect("test completed"); } } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index f7583830d..47ee95a41 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -3,6 +3,7 @@ use std::{ num::{NonZeroU8, NonZeroUsize}, pin::Pin, sync::Arc, + time::Duration, }; use futures::StreamExt; @@ -10,7 +11,6 @@ use gossipsub::{IdentTopic, PublishError, TopicHash}; use libp2p::{ Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, core::{ - ConnectedPoint, muxing::StreamMuxerBox, transport::{Boxed, ListenerId}, }, @@ -227,13 +227,7 @@ impl Network { self.on_discovered_peers(peers); } AnchorBehaviourEvent::Handshake(event) => { - if let Some(result) = handshake::handle_event( - &self.node_info, - &mut self.swarm.behaviour_mut().handshake, - event, - ) { - self.handle_handshake_result(result); - } + self.handle_handshake_result(event); } AnchorBehaviourEvent::PeerManager(peer_manager::Event::Heartbeat(heartbeat)) => { if let Some(actions) = heartbeat.connect_actions { @@ -281,28 +275,21 @@ impl Network { trace!(event = ?behaviour_event, "Unhandled behaviour event"); } }, - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint: ConnectedPoint::Dialer { .. }, - .. - } => { - handshake::initiate( - &self.node_info, - &mut self.swarm.behaviour_mut().handshake, - peer_id - ); + SwarmEvent::NewListenAddr { listener_id, address } => { + self.on_new_listen_addr(listener_id, address); }, SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { debug!(?peer_id, ?error, "Outgoing connection error"); }, - SwarmEvent::IncomingConnectionError { error, .. } => { - debug!(?error, "Incoming connection error"); + SwarmEvent::IncomingConnectionError { error, send_back_addr, .. } => { + debug!(?send_back_addr, ?error, "Incoming connection error"); }, SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { - debug!(?peer_id, ?cause, "Connection closed"); - }, - SwarmEvent::NewListenAddr { listener_id, address } => { - self.on_new_listen_addr(listener_id, address); + if cause.is_some() { + debug!(?peer_id, ?cause, "Connection closed with error"); + } else { + trace!(?peer_id, "Connection closed"); + } }, _ => { trace!(event = ?swarm_message, "Unhandled swarm event"); @@ -597,12 +584,12 @@ impl Network { } } - fn handle_handshake_result(&mut self, result: Result) { - match result { - Ok(handshake::Completed { + fn handle_handshake_result(&mut self, event: handshake::Event) { + match event { + handshake::Event::Completed { peer_id, their_info, - }) => { + } => { // Record successful handshake if let Ok(counter) = crate::metrics::HANDSHAKE_SUCCESSFUL.as_ref() { counter.inc(); @@ -618,7 +605,7 @@ impl Network { debug!(%peer_id, ?their_info, "Handshake completed without metadata"); } } - Err(handshake::Failed { peer_id, error }) => { + handshake::Event::Failed { peer_id, error } => { // Determine failure reason for metrics let failure_reason = match error.as_ref() { handshake::Error::NetworkMismatch { .. } => "network_mismatch", @@ -706,7 +693,22 @@ impl Network { fn dial(&mut self, opts: DialOpts) { let peer_id = opts.get_peer_id(); if let Err(err) = self.swarm.dial(opts) { - debug!(%err, ?peer_id, "Failed to dial peer"); + // Differentiate between expected and unexpected dial failures + // + // PeerCondition::NotDialing causes DialPeerConditionFalse when we try to dial + // a peer we're already connected to or dialing. This is expected and benign, + // so we log at TRACE level to reduce noise. + // + // Other errors (unreachable addresses, transport failures, etc.) are logged + // at DEBUG level since they indicate actual problems. + match &err { + libp2p::swarm::DialError::DialPeerConditionFalse(_) => { + trace!(%err, "Dial skipped due to peer condition"); + } + _ => { + debug!(%err, ?peer_id, "Failed to dial peer"); + } + } } } @@ -741,7 +743,14 @@ fn build_swarm( let swarm_config = libp2p::swarm::Config::with_executor(Executor(executor)) .with_notify_handler_buffer_size(notify_handler_buffer_size) .with_per_connection_event_buffer_size(4) - .with_dial_concurrency_factor(dial_concurrency_factor); + .with_dial_concurrency_factor(dial_concurrency_factor) + // Set a non-zero idle connection timeout to allow time for handshake completion + // + // libp2p needs time to complete the SSV handshake protocol after connection + // establishment. 30 seconds provides sufficient time for this flow while still + // cleaning up truly idle connections. This follows guidance from rust-libp2p + // maintainers to always set a non-zero idle timeout. + .with_idle_connection_timeout(Duration::from_secs(30)); let swarm = SwarmBuilder::with_existing_identity(local_keypair) .with_tokio() diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 63e0e755f..726208444 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -120,7 +120,7 @@ impl ConnectionManager { return false; } - // Don't dial connected peers + // Don't dial already-connected peers if self.connected.contains(peer_id) { return false; } diff --git a/anchor/network/src/peer_manager/discovery.rs b/anchor/network/src/peer_manager/discovery.rs index 3480583e2..395b4956d 100644 --- a/anchor/network/src/peer_manager/discovery.rs +++ b/anchor/network/src/peer_manager/discovery.rs @@ -197,7 +197,21 @@ impl PeerDiscovery { .flatten() .cloned() .collect::>(); - debug!(?peer, ?addresses, "Let's dial!"); + debug!( + ?peer, + ?addresses, + num_addresses = addresses.len(), + "Preparing to dial peer" + ); + + // Use PeerCondition::DisconnectedAndNotDialing to prevent redundant dials + // + // This condition only allows dialing if: + // - We're not already connected to the peer + // - We're not currently dialing the peer + // + // This prevents unnecessary dial attempts to already-connected peers and avoids + // creating duplicate connections from concurrent dials. DialOpts::peer_id(*peer) .condition(PeerCondition::DisconnectedAndNotDialing) .addresses(addresses)