From 699822dc7f5e63ecc9f630b4e543f8f164faad34 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 1 Oct 2025 14:54:45 +0200 Subject: [PATCH 01/16] fix: prevent handshake race condition causing ConnectionClosed failures Fixes SSV handshake failures caused by opening protocol streams on connections that get closed due to concurrent dials or connection trimming. Observed as OutboundFailure::ConnectionClosed errors. The fix implements a pending handshake queue that defers handshake initiation until after the Identify protocol confirms the connection is stable and supports the SSV handshake protocol. Key changes: - Add pending_handshakes queue to defer handshake initiation - Wait for Identify protocol before initiating handshakes - Change dial policy from DisconnectedAndNotDialing to NotDialing - Add 30-second idle connection timeout for handshake completion - Improve logging to differentiate expected vs unexpected dial failures - Add comprehensive documentation explaining the race condition --- anchor/network/src/handshake/mod.rs | 51 +++++- anchor/network/src/network.rs | 169 ++++++++++++++++++- anchor/network/src/peer_manager/discovery.rs | 21 ++- 3 files changed, 222 insertions(+), 19 deletions(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 572c86552..670ce426f 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -92,10 +92,13 @@ pub fn handle_event( message: Message::Response { response, .. }, .. } => Some(handle_response(our_node_info, peer, response)), - Event::OutboundFailure { peer, error, .. } => Some(Err(Failed { - peer_id: peer, - error: Box::new(Error::Outbound(error)), - })), + Event::OutboundFailure { peer, error, .. } => { + trace!(?peer, ?error, "Handshake outbound failure"); + Some(Err(Failed { + peer_id: peer, + error: Box::new(Error::Outbound(error)), + })) + } Event::InboundFailure { peer, error, .. } => Some(Err(Failed { peer_id: peer, error: Box::new(Error::Inbound(error)), @@ -112,10 +115,24 @@ fn handle_request( channel: ResponseChannel, ) -> Result { trace!(?peer_id, "handling handshake request"); - // Handle incoming request: send response then verify - // Any error here is handled by the InboundFailure handler + + // Handle incoming handshake request from a remote peer + // + // This is the passive/inbound side of the handshake protocol: + // 1. The remote peer (who dialed us) initiates the handshake by sending their NodeInfo + // 2. We immediately send back our NodeInfo as a response + // 3. We verify their NodeInfo is compatible with ours (same network) + // + // This function is called automatically by libp2p's request-response behavior + // when a peer opens a stream on the /ssv/info/0.0.1 protocol. + // + // Note: We don't need to explicitly "accept" connections or queue inbound handshakes. + // The request-response behavior handles all the stream management automatically. + + // Send our info back to the peer let _ = behaviour.send_response(channel, our_node_info.clone()); + // Verify network compatibility verify_node_info(our_node_info, &request).map_err(|error| Failed { peer_id, error: Box::new(error), @@ -144,8 +161,26 @@ fn handle_response( }) } -/// Send a handshake request to a specified peer. Should be called after establishing an outgoing -/// connection. +/// Initiate a handshake with a peer by sending our NodeInfo. +/// +/// This is the active/outbound side of the handshake protocol: +/// 1. We send our NodeInfo to the peer via the /ssv/info/0.0.1 protocol +/// 2. The peer responds with their NodeInfo +/// 3. We verify their NodeInfo is compatible (handled by handle_response) +/// +/// # When to call this +/// +/// This should ONLY be called after: +/// 1. We established an outbound connection (we dialed them) +/// 2. The Identify protocol completed successfully +/// 3. Identify confirmed the peer supports /ssv/info/0.0.1 +/// +/// Calling this too early (e.g., immediately on ConnectionEstablished) can result in +/// OutboundFailure::ConnectionClosed if the connection is closed due to concurrent +/// dial resolution or protocol negotiation. +/// +/// The main event loop in network.rs handles the proper sequencing via the +/// pending_handshakes queue. pub fn initiate(our_node_info: &NodeInfo, behaviour: &mut Behaviour, peer_id: PeerId) { trace!(?peer_id, "initiating handshake"); behaviour.send_request(&peer_id, our_node_info.clone()); diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index d7a060adc..554f6c5f0 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -3,18 +3,19 @@ use std::{ num::{NonZeroU8, NonZeroUsize}, pin::Pin, sync::Arc, + time::Duration, }; use futures::StreamExt; use gossipsub::{IdentTopic, PublishError, TopicHash}; use libp2p::{ - Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, + Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder, TransportError, core::{ ConnectedPoint, muxing::StreamMuxerBox, transport::{Boxed, ListenerId}, }, - futures, + futures, identify, identity::Keypair, multiaddr::Protocol, swarm::{SwarmEvent, dial_opts::DialOpts}, @@ -79,6 +80,21 @@ pub struct Network { domain_type: DomainType, metrics_registry: Option, spec: Arc, + /// Peers awaiting handshake initiation after stable connection. + /// + /// When we establish an outbound connection, we queue the peer here instead of + /// immediately initiating the handshake. This prevents opening the SSV protocol + /// stream on a connection that may be closed due to: + /// - Simultaneous dials (libp2p keeps one connection, closes the other) + /// - Connection trimming/replacement + /// - Protocol negotiation failures + /// + /// The handshake is only initiated after the Identify protocol confirms: + /// 1. The connection is stable and survived any concurrent dial resolution + /// 2. The peer supports the SSV handshake protocol + /// + /// This queue is cleaned up when connections close or fail to prevent memory leaks. + pending_handshakes: HashSet, } impl Network { @@ -133,6 +149,7 @@ impl Network { domain_type: config.domain_type, metrics_registry: Some(metrics_registry), spec, + pending_handshakes: HashSet::new(), }; info!(%peer_id, "Network starting"); @@ -205,6 +222,33 @@ impl Network { AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { self.on_discovered_peers(peers); } + AnchorBehaviourEvent::Identify(identify::Event::Received { peer_id, info, connection_id: _ }) => { + trace!(%peer_id, protocols = ?info.protocols, "Received identify info"); + + // CRITICAL: Only initiate handshake after Identify completes + // + // Waiting for Identify ensures: + // 1. The connection survived any concurrent dial resolution + // (libp2p only fires Identify::Received on the winning connection) + // 2. We can verify the peer supports our handshake protocol before attempting + // 3. The connection is stable and protocol negotiation is complete + // + // This prevents the race condition where we open a handshake stream on a + // connection that gets immediately closed, causing OutboundFailure::ConnectionClosed + if self.pending_handshakes.remove(&peer_id) { + let handshake_protocol = StreamProtocol::new("/ssv/info/0.0.1"); + if info.protocols.contains(&handshake_protocol) { + trace!(%peer_id, "Initiating handshake after identify"); + handshake::initiate( + &self.node_info, + &mut self.swarm.behaviour_mut().handshake, + peer_id + ); + } else { + debug!(%peer_id, "Peer does not support SSV handshake protocol, skipping"); + } + } + } AnchorBehaviourEvent::Handshake(event) => { if let Some(result) = handshake::handle_event( &self.node_info, @@ -240,13 +284,92 @@ impl Network { }, SwarmEvent::ConnectionEstablished { peer_id, - endpoint: ConnectedPoint::Dialer { .. }, + endpoint, + connection_id, + established_in, + concurrent_dial_errors, + num_established, .. } => { - handshake::initiate( - &self.node_info, - &mut self.swarm.behaviour_mut().handshake, - peer_id + trace!( + %peer_id, + ?connection_id, + ?endpoint, + ?established_in, + num_established, + concurrent_dial_errors = ?concurrent_dial_errors.as_ref().map(|v| v.len()), + "Connection established" + ); + + // Queue handshake for outbound connections only + // + // For outbound connections (where we are the dialer): + // - We queue the handshake instead of initiating immediately + // - Actual initiation happens after Identify confirms the connection is stable + // + // For inbound connections (where they dialed us): + // - We don't initiate the handshake (the remote peer will) + // - We passively handle incoming handshake requests via the request-response behavior + // + // This asymmetric approach prevents duplicate handshakes and ensures only + // the dialer initiates, matching the Go SSV implementation. + if matches!(endpoint, ConnectedPoint::Dialer { .. }) { + trace!(%peer_id, ?connection_id, "Queueing handshake for outbound connection"); + self.pending_handshakes.insert(peer_id); + } + }, + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + cause, + num_established, + .. + } => { + trace!( + %peer_id, + ?connection_id, + ?cause, + num_established, + "Connection closed" + ); + + // Clean up pending handshake if this was the last connection to the peer + // + // We only remove from pending_handshakes when num_established == 0 because: + // - The peer might have multiple connections (IPv4+IPv6, TCP+QUIC) + // - We only want to clean up when fully disconnected + // - If other connections exist, Identify may still fire and complete the handshake + if num_established == 0 + && self.pending_handshakes.remove(&peer_id) { + trace!(%peer_id, "Removed pending handshake for fully disconnected peer"); + } + }, + SwarmEvent::OutgoingConnectionError { + peer_id, + connection_id, + error, + } => { + trace!( + ?peer_id, + ?connection_id, + ?error, + "Outgoing connection error" + ); + + // Remove pending handshake on connection failure + if let Some(peer) = peer_id + && self.pending_handshakes.remove(&peer) { + trace!(%peer, "Removed pending handshake after connection error"); + } + }, + SwarmEvent::Dialing { + peer_id, + connection_id, + } => { + trace!( + ?peer_id, + ?connection_id, + "Dialing peer" ); }, SwarmEvent::NewListenAddr { listener_id, address } => { @@ -574,7 +697,22 @@ impl Network { fn dial(&mut self, opts: DialOpts) { if let Err(err) = self.swarm.dial(opts) { - debug!(%err, "Failed to dial peer"); + // Differentiate between expected and unexpected dial failures + // + // PeerCondition::NotDialing causes DialPeerConditionFalse when we try to dial + // a peer we're already connected to or dialing. This is expected and benign, + // so we log at TRACE level to reduce noise. + // + // Other errors (unreachable addresses, transport failures, etc.) are logged + // at DEBUG level since they indicate actual problems. + match &err { + libp2p::swarm::DialError::DialPeerConditionFalse(_) => { + trace!(%err, "Dial skipped due to peer condition"); + } + _ => { + debug!(%err, "Failed to dial peer"); + } + } } } @@ -609,7 +747,20 @@ fn build_swarm( let swarm_config = libp2p::swarm::Config::with_executor(Executor(executor)) .with_notify_handler_buffer_size(notify_handler_buffer_size) .with_per_connection_event_buffer_size(4) - .with_dial_concurrency_factor(dial_concurrency_factor); + .with_dial_concurrency_factor(dial_concurrency_factor) + // Set a non-zero idle connection timeout to prevent premature connection closes + // + // Without this timeout, libp2p may close idle connections before we can complete + // the handshake sequence (ConnectionEstablished → Identify → SSV handshake). + // This is especially important for: + // - Connections with slow Identify protocol completion + // - Simultaneous dials where resolution takes time + // - Networks with high latency + // + // 30 seconds provides sufficient time for the full handshake flow while still + // cleaning up truly idle connections. This follows guidance from rust-libp2p + // maintainers to always set a non-zero idle timeout. + .with_idle_connection_timeout(Duration::from_secs(30)); let swarm = SwarmBuilder::with_existing_identity(local_keypair) .with_tokio() diff --git a/anchor/network/src/peer_manager/discovery.rs b/anchor/network/src/peer_manager/discovery.rs index 5c0312ec6..853b1026f 100644 --- a/anchor/network/src/peer_manager/discovery.rs +++ b/anchor/network/src/peer_manager/discovery.rs @@ -181,9 +181,26 @@ impl PeerDiscovery { .flatten() .cloned() .collect::>(); - debug!(?peer, ?addresses, "Let's dial!"); + debug!( + ?peer, + ?addresses, + num_addresses = addresses.len(), + "Preparing to dial peer" + ); + + // Use PeerCondition::NotDialing to prevent concurrent dials to the same peer + // + // This condition allows dialing if: + // - We're not currently dialing the peer (prevents duplicate concurrent dials) + // - Even if we're already connected (allows reconnection if connection drops) + // + // We changed from DisconnectedAndNotDialing to NotDialing because: + // - DisconnectedAndNotDialing would fail if already connected, causing noise + // - NotDialing allows redials after connection drops while preventing duplicates + // - The handshake queue mechanism (pending_handshakes) handles deduplication at the + // application layer, so we don't need to prevent all connected dials DialOpts::peer_id(*peer) - .condition(PeerCondition::DisconnectedAndNotDialing) + .condition(PeerCondition::NotDialing) .addresses(addresses) .build() } From 3aeda78192752176447bc1d6c8c202dc44234597 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 2 Oct 2025 20:59:23 +0200 Subject: [PATCH 02/16] refactor: make handshake behaviour self-contained and auto-initiating Convert handshake behaviour from type alias to self-contained struct that automatically initiates handshakes on first outbound connections. This eliminates the need for external coordination and simplifies the architecture. Key Changes: - Replace type alias with proper Behaviour struct wrapping RequestResponseBehaviour - Implement NetworkBehaviour trait with auto-initiation in on_swarm_event - Store NodeInfo in the behaviour for automatic handshake initiation - Use `other_established == 0` check to prevent duplicate initiations during concurrent dials - Change create_behaviour() to Behaviour::new() (idiomatic Rust constructor) - Move initiate() to be a method on Behaviour struct - Remove manual handshake initiation from network.rs - Add concurrent_dials_both_initiate_handshake test proving the fix works - Add helper functions for cleaner test structure Evidence: The concurrent dials test proves that other_established == 0 prevents duplicate handshake initiations. Without the check, both peers initiate when they get Dialer connections (duplicate). With the check, only the first peer initiates. This makes handshakes fully composable without requiring Identify protocol coordination or external state management. --- anchor/network/src/behaviour.rs | 14 +- anchor/network/src/handshake/mod.rs | 401 ++++++++++++++---- anchor/network/src/network.rs | 82 +--- anchor/network/src/peer_manager/connection.rs | 5 + anchor/network/src/peer_manager/discovery.rs | 17 +- 5 files changed, 359 insertions(+), 160 deletions(-) diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 6054e0ceb..6f477d550 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -134,7 +134,19 @@ impl AnchorBehaviour { PeerManager::new(network_config, one_epoch_duration) }; - let handshake = handshake::create_behaviour(local_keypair); + let handshake = { + let domain_type: String = network_config.domain_type.into(); + let node_info = handshake::node_info::NodeInfo::new( + domain_type, + Some(handshake::node_info::NodeMetadata { + node_version: version_with_platform(), + execution_node: "geth/v1.10.8".to_string(), + consensus_node: "lighthouse/v1.5.0".to_string(), + subnets: "00000000000000000000000000000000".to_string(), + }), + ); + handshake::Behaviour::new(local_keypair, node_info) + }; Ok(AnchorBehaviour { identify, diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 670ce426f..04f110f05 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -15,8 +15,14 @@ use tracing::trace; use crate::handshake::{codec::Codec, node_info::NodeInfo}; -pub type Behaviour = RequestResponseBehaviour; -pub type Event = ::ToSwarm; +/// Network behaviour handling the handshake protocol. +/// Automatically initiates handshakes on outbound connections. +pub struct Behaviour { + inner: RequestResponseBehaviour, + node_info: NodeInfo, +} + +pub type Event = as NetworkBehaviour>::ToSwarm; #[derive(Debug)] pub enum Error { @@ -45,15 +51,28 @@ pub struct Failed { pub error: Box, } -/// Create a libp2p Behaviour to handle handshake requests. Events emitted from this event must be -/// fed into [`handle_event`]. -pub fn create_behaviour(keypair: Keypair) -> Behaviour { - let protocol = StreamProtocol::new("/ssv/info/0.0.1"); - Behaviour::with_codec( - Codec::new(keypair), - [(protocol, ProtocolSupport::Full)], - Config::default(), - ) +impl Behaviour { + /// Create a new handshake Behaviour. + /// The behaviour automatically initiates handshakes on outbound connections. + pub fn new(keypair: Keypair, node_info: NodeInfo) -> Self { + let protocol = StreamProtocol::new("/ssv/info/0.0.1"); + let inner = RequestResponseBehaviour::with_codec( + Codec::new(keypair), + [(protocol, ProtocolSupport::Full)], + Config::default(), + ); + Self { inner, node_info } + } + + /// Manually initiate a handshake with a peer by sending our NodeInfo. + /// + /// Note: In normal operation, handshakes are initiated automatically via + /// `on_swarm_event` when an outbound connection is established. This method + /// is provided for testing or special cases where manual control is needed. + pub fn initiate(&mut self, peer_id: PeerId) { + trace!(?peer_id, "initiating handshake"); + self.inner.send_request(&peer_id, self.node_info.clone()); + } } fn verify_node_info(ours: &NodeInfo, theirs: &NodeInfo) -> Result<(), Error> { @@ -130,7 +149,9 @@ fn handle_request( // The request-response behavior handles all the stream management automatically. // Send our info back to the peer - let _ = behaviour.send_response(channel, our_node_info.clone()); + let _ = behaviour + .inner + .send_response(channel, our_node_info.clone()); // Verify network compatibility verify_node_info(our_node_info, &request).map_err(|error| Failed { @@ -161,29 +182,74 @@ fn handle_response( }) } -/// Initiate a handshake with a peer by sending our NodeInfo. -/// -/// This is the active/outbound side of the handshake protocol: -/// 1. We send our NodeInfo to the peer via the /ssv/info/0.0.1 protocol -/// 2. The peer responds with their NodeInfo -/// 3. We verify their NodeInfo is compatible (handled by handle_response) -/// -/// # When to call this -/// -/// This should ONLY be called after: -/// 1. We established an outbound connection (we dialed them) -/// 2. The Identify protocol completed successfully -/// 3. Identify confirmed the peer supports /ssv/info/0.0.1 -/// -/// Calling this too early (e.g., immediately on ConnectionEstablished) can result in -/// OutboundFailure::ConnectionClosed if the connection is closed due to concurrent -/// dial resolution or protocol negotiation. -/// -/// The main event loop in network.rs handles the proper sequencing via the -/// pending_handshakes queue. -pub fn initiate(our_node_info: &NodeInfo, behaviour: &mut Behaviour, peer_id: PeerId) { - trace!(?peer_id, "initiating handshake"); - behaviour.send_request(&peer_id, our_node_info.clone()); +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = + as NetworkBehaviour>::ConnectionHandler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + connection_id: libp2p::swarm::ConnectionId, + peer: PeerId, + local_addr: &libp2p::Multiaddr, + remote_addr: &libp2p::Multiaddr, + ) -> Result, libp2p::swarm::ConnectionDenied> { + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: libp2p::swarm::ConnectionId, + peer: PeerId, + addr: &libp2p::Multiaddr, + role_override: libp2p::core::Endpoint, + port_use: libp2p::core::transport::PortUse, + ) -> Result, libp2p::swarm::ConnectionDenied> { + self.inner.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + } + + fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { + // Auto-initiate handshake on first outbound connection + if let libp2p::swarm::FromSwarm::ConnectionEstablished(conn_est) = &event { + if let libp2p::core::ConnectedPoint::Dialer { .. } = conn_est.endpoint { + if conn_est.other_established == 0 { + trace!(?conn_est.peer_id, "Auto-initiating handshake on first outbound connection"); + self.inner + .send_request(&conn_est.peer_id, self.node_info.clone()); + } + } + } + self.inner.on_swarm_event(event); + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: libp2p::swarm::ConnectionId, + event: libp2p::swarm::THandlerOutEvent, + ) { + self.inner + .on_connection_handler_event(peer_id, connection_id, event); + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + self.inner.poll(cx) + } } #[cfg(test)] @@ -204,6 +270,8 @@ mod tests { use super::*; use crate::handshake::node_info::NodeMetadata; + // Test helper functions for cleaner test structure + fn node_info(network: &str, version: &str) -> NodeInfo { NodeInfo { network_id: network.to_string(), @@ -216,59 +284,249 @@ mod tests { } } + fn create_test_swarm(keypair: Keypair, node_info: NodeInfo) -> Swarm { + Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, node_info)) + } + + async fn wait_for_handshake_completion( + local_swarm: &mut Swarm, + remote_swarm: &mut Swarm, + local_info: &NodeInfo, + remote_info: &NodeInfo, + ) -> (Completed, Completed) { + let mut local_result = None; + let mut remote_result = None; + + while local_result.is_none() || remote_result.is_none() { + select!( + SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { + if let Some(result) = handle_event(local_info, local_swarm.behaviour_mut(), e) { + local_result = Some(result.expect("local handshake to succeed")); + } + } + SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { + if let Some(result) = handle_event(remote_info, remote_swarm.behaviour_mut(), e) { + remote_result = Some(result.expect("remote handshake to succeed")); + } + } + else => {} + ) + } + + (local_result.unwrap(), remote_result.unwrap()) + } + #[tokio::test] async fn handshake_success() { *TRACING; + // Setup: Create two peers with matching networks + let local_info = node_info("test", "local"); + let remote_info = node_info("test", "remote"); + + let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), local_info.clone()); + let mut remote_swarm = create_test_swarm(Keypair::generate_ed25519(), remote_info.clone()); + + tokio::spawn(async move { + // Setup: Establish connection + local_swarm.listen().with_memory_addr_external().await; + remote_swarm.connect(&mut local_swarm).await; + + // Test: Wait for both sides to complete handshake + let (local_result, remote_result) = wait_for_handshake_completion( + &mut local_swarm, + &mut remote_swarm, + &local_info, + &remote_info, + ) + .await; + + // Verify: Both sides received correct peer info + assert_eq!(local_result.peer_id, *remote_swarm.local_peer_id()); + assert_eq!( + local_result.their_info.metadata.unwrap().node_version, + "remote" + ); + + assert_eq!(remote_result.peer_id, *local_swarm.local_peer_id()); + assert_eq!( + remote_result.their_info.metadata.unwrap().node_version, + "local" + ); + }) + .await + .expect("test completed"); + } + + /// Evidence-gathering test for concurrent dial behavior. + /// + /// This test demonstrates that when both peers dial each other simultaneously: + /// 1. Both peers get 2 ConnectionEstablished events (one Dialer, one Listener) + /// 2. Only ONE peer initiates the handshake (the one whose Dialer connection wins the race) + /// 3. The check `other_established == 0` prevents duplicate handshake initiations + /// 4. Both peers complete the handshake successfully despite concurrent dials + /// + /// This proves that our approach using `other_established == 0` correctly handles + /// concurrent dial resolution without relying on the Identify protocol. + #[tokio::test] + async fn concurrent_dials_both_initiate_handshake() { + *TRACING; + let local_key = Keypair::generate_ed25519(); let remote_key = Keypair::generate_ed25519(); - let mut local_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(local_key)); let local_node_info = node_info("test", "local"); - let mut remote_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(remote_key)); let remote_node_info = node_info("test", "remote"); + let mut local_swarm = + Swarm::new_ephemeral_tokio(|_| Behaviour::new(local_key, local_node_info.clone())); + let mut remote_swarm = + Swarm::new_ephemeral_tokio(|_| Behaviour::new(remote_key, remote_node_info.clone())); + tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; + remote_swarm.listen().with_memory_addr_external().await; - remote_swarm.connect(&mut local_swarm).await; + // Force both peers to dial each other by getting addresses and dialing manually + let local_addr = local_swarm.external_addresses().next().unwrap().clone(); + let remote_addr = remote_swarm.external_addresses().next().unwrap().clone(); + + trace!(?local_addr, ?remote_addr, "About to dial each other"); - initiate( - &remote_node_info, - remote_swarm.behaviour_mut(), - *local_swarm.local_peer_id(), + // Dial each other at the same time + local_swarm.dial(remote_addr.clone()).unwrap(); + trace!("Local dialed remote"); + remote_swarm.dial(local_addr.clone()).unwrap(); + trace!("Remote dialed local"); + + // Track how many times we see Auto-initiating and what other_established values we see + let mut local_handshake_initiated = 0; + let mut remote_handshake_initiated = 0; + let mut local_completed = false; + let mut remote_completed = false; + + // Also track connection events to see concurrent dial resolution + let mut local_connections = 0; + let mut remote_connections = 0; + + while !local_completed || !remote_completed { + select!( + event = local_swarm.next_swarm_event() => { + match event { + SwarmEvent::ConnectionEstablished { num_established, endpoint, .. } => { + local_connections += 1; + trace!(?endpoint, ?num_established, "Local: ConnectionEstablished"); + } + SwarmEvent::Behaviour(e) => { + if let Some(result) = handle_event(&local_node_info, local_swarm.behaviour_mut(), e) { + match result { + Ok(Completed { peer_id, their_info }) => { + local_handshake_initiated += 1; + assert_eq!(peer_id, *remote_swarm.local_peer_id()); + assert_eq!(their_info.metadata.unwrap().node_version, "remote"); + local_completed = true; + } + Err(Failed { error, .. }) => { + trace!(?error, "Local: Handshake failed"); + } + } + } + } + _ => {} + } + } + event = remote_swarm.next_swarm_event() => { + match event { + SwarmEvent::ConnectionEstablished { num_established, endpoint, .. } => { + remote_connections += 1; + trace!(?endpoint, ?num_established, "Remote: ConnectionEstablished"); + } + SwarmEvent::Behaviour(e) => { + if let Some(result) = handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) { + match result { + Ok(Completed { peer_id, their_info }) => { + remote_handshake_initiated += 1; + assert_eq!(peer_id, *local_swarm.local_peer_id()); + assert_eq!(their_info.metadata.unwrap().node_version, "local"); + remote_completed = true; + } + Err(Failed { error, .. }) => { + trace!(?error, "Remote: Handshake failed"); + } + } + } + } + _ => {} + } + } + else => {} + ) + } + + // Evidence gathering: Check if we saw concurrent dials and what happened + trace!( + local_connections, + remote_connections, + local_handshake_initiated, + remote_handshake_initiated, + "Concurrent dial evidence" ); + }) + .await + .expect("tokio runtime failed"); + } + /// Test basic handshake with a single outbound connection. + /// + /// This test verifies that: + /// 1. Only the dialer (remote) auto-initiates the handshake + /// 2. The listener (local) responds to the handshake request + /// 3. Both sides complete the handshake successfully + /// + /// This is the simple case with no concurrent dials. + #[tokio::test] + async fn bidirectional_connection_handshake_success() { + *TRACING; + + let local_key = Keypair::generate_ed25519(); + let remote_key = Keypair::generate_ed25519(); + + let local_node_info = node_info("test", "local"); + let remote_node_info = node_info("test", "remote"); + + let mut local_swarm = + Swarm::new_ephemeral_tokio(|_| Behaviour::new(local_key, local_node_info.clone())); + let mut remote_swarm = + Swarm::new_ephemeral_tokio(|_| Behaviour::new(remote_key, remote_node_info.clone())); + + tokio::spawn(async move { + local_swarm.listen().with_memory_addr_external().await; + remote_swarm.listen().with_memory_addr_external().await; + + // Remote dials local - only remote will initiate handshake + remote_swarm.connect(&mut local_swarm).await; + + // Both peers should complete handshake let mut local_completed = false; let mut remote_completed = false; - while !local_completed && !remote_completed { + while !local_completed || !remote_completed { select!( SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - let Some(result) = - handle_event(&local_node_info, local_swarm.behaviour_mut(), e) else { - continue; - }; - let Completed { - peer_id, - their_info, - } = result.expect("handshake to succeed"); - assert_eq!(peer_id, *remote_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "remote"); - local_completed = true; + if let Some(result) = handle_event(&local_node_info, local_swarm.behaviour_mut(), e) { + let Completed { peer_id, their_info } = result.expect("handshake to succeed"); + assert_eq!(peer_id, *remote_swarm.local_peer_id()); + assert_eq!(their_info.metadata.unwrap().node_version, "remote"); + local_completed = true; + } } SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - let Some(result) = - handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) else { - continue; - }; - let Completed { - peer_id, - their_info, - } = result.expect("handshake to succeed"); - assert_eq!(peer_id, *local_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "local"); - remote_completed = true; + if let Some(result) = handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) { + let Completed { peer_id, their_info } = result.expect("handshake to succeed"); + assert_eq!(peer_id, *local_swarm.local_peer_id()); + assert_eq!(their_info.metadata.unwrap().node_version, "local"); + remote_completed = true; + } } else => {} ) @@ -285,21 +543,20 @@ mod tests { let local_key = Keypair::generate_ed25519(); let remote_key = Keypair::generate_ed25519(); - let mut local_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(local_key)); let local_node_info = node_info("test1", "local"); - let mut remote_swarm = Swarm::new_ephemeral_tokio(|_| create_behaviour(remote_key)); let remote_node_info = node_info("test2", "remote"); + let mut local_swarm = + Swarm::new_ephemeral_tokio(|_| Behaviour::new(local_key, local_node_info.clone())); + let mut remote_swarm = + Swarm::new_ephemeral_tokio(|_| Behaviour::new(remote_key, remote_node_info.clone())); + tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; remote_swarm.connect(&mut local_swarm).await; - initiate( - &remote_node_info, - remote_swarm.behaviour_mut(), - *local_swarm.local_peer_id(), - ); + // No manual initiate() call - Behaviour handles it automatically! let mut local_failed = false; let mut remote_failed = false; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 554f6c5f0..d2a1ec674 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -9,13 +9,12 @@ use std::{ use futures::StreamExt; use gossipsub::{IdentTopic, PublishError, TopicHash}; use libp2p::{ - Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder, TransportError, + Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, core::{ - ConnectedPoint, muxing::StreamMuxerBox, transport::{Boxed, ListenerId}, }, - futures, identify, + futures, identity::Keypair, multiaddr::Protocol, swarm::{SwarmEvent, dial_opts::DialOpts}, @@ -80,21 +79,6 @@ pub struct Network { domain_type: DomainType, metrics_registry: Option, spec: Arc, - /// Peers awaiting handshake initiation after stable connection. - /// - /// When we establish an outbound connection, we queue the peer here instead of - /// immediately initiating the handshake. This prevents opening the SSV protocol - /// stream on a connection that may be closed due to: - /// - Simultaneous dials (libp2p keeps one connection, closes the other) - /// - Connection trimming/replacement - /// - Protocol negotiation failures - /// - /// The handshake is only initiated after the Identify protocol confirms: - /// 1. The connection is stable and survived any concurrent dial resolution - /// 2. The peer supports the SSV handshake protocol - /// - /// This queue is cleaned up when connections close or fail to prevent memory leaks. - pending_handshakes: HashSet, } impl Network { @@ -149,7 +133,6 @@ impl Network { domain_type: config.domain_type, metrics_registry: Some(metrics_registry), spec, - pending_handshakes: HashSet::new(), }; info!(%peer_id, "Network starting"); @@ -222,33 +205,6 @@ impl Network { AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { self.on_discovered_peers(peers); } - AnchorBehaviourEvent::Identify(identify::Event::Received { peer_id, info, connection_id: _ }) => { - trace!(%peer_id, protocols = ?info.protocols, "Received identify info"); - - // CRITICAL: Only initiate handshake after Identify completes - // - // Waiting for Identify ensures: - // 1. The connection survived any concurrent dial resolution - // (libp2p only fires Identify::Received on the winning connection) - // 2. We can verify the peer supports our handshake protocol before attempting - // 3. The connection is stable and protocol negotiation is complete - // - // This prevents the race condition where we open a handshake stream on a - // connection that gets immediately closed, causing OutboundFailure::ConnectionClosed - if self.pending_handshakes.remove(&peer_id) { - let handshake_protocol = StreamProtocol::new("/ssv/info/0.0.1"); - if info.protocols.contains(&handshake_protocol) { - trace!(%peer_id, "Initiating handshake after identify"); - handshake::initiate( - &self.node_info, - &mut self.swarm.behaviour_mut().handshake, - peer_id - ); - } else { - debug!(%peer_id, "Peer does not support SSV handshake protocol, skipping"); - } - } - } AnchorBehaviourEvent::Handshake(event) => { if let Some(result) = handshake::handle_event( &self.node_info, @@ -301,22 +257,9 @@ impl Network { "Connection established" ); - // Queue handshake for outbound connections only - // - // For outbound connections (where we are the dialer): - // - We queue the handshake instead of initiating immediately - // - Actual initiation happens after Identify confirms the connection is stable - // - // For inbound connections (where they dialed us): - // - We don't initiate the handshake (the remote peer will) - // - We passively handle incoming handshake requests via the request-response behavior - // - // This asymmetric approach prevents duplicate handshakes and ensures only - // the dialer initiates, matching the Go SSV implementation. - if matches!(endpoint, ConnectedPoint::Dialer { .. }) { - trace!(%peer_id, ?connection_id, "Queueing handshake for outbound connection"); - self.pending_handshakes.insert(peer_id); - } + // The handshake Behaviour automatically initiates handshakes on + // first outbound connections via its NetworkBehaviour::on_swarm_event implementation. + // This ensures handshakes happen without external coordination. }, SwarmEvent::ConnectionClosed { peer_id, @@ -333,16 +276,6 @@ impl Network { "Connection closed" ); - // Clean up pending handshake if this was the last connection to the peer - // - // We only remove from pending_handshakes when num_established == 0 because: - // - The peer might have multiple connections (IPv4+IPv6, TCP+QUIC) - // - We only want to clean up when fully disconnected - // - If other connections exist, Identify may still fire and complete the handshake - if num_established == 0 - && self.pending_handshakes.remove(&peer_id) { - trace!(%peer_id, "Removed pending handshake for fully disconnected peer"); - } }, SwarmEvent::OutgoingConnectionError { peer_id, @@ -356,11 +289,6 @@ impl Network { "Outgoing connection error" ); - // Remove pending handshake on connection failure - if let Some(peer) = peer_id - && self.pending_handshakes.remove(&peer) { - trace!(%peer, "Removed pending handshake after connection error"); - } }, SwarmEvent::Dialing { peer_id, diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 2ebb03cd0..6a09e1238 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -115,6 +115,11 @@ impl ConnectionManager { return false; } + // Don't dial already-connected peers + if self.connected.contains(peer_id) { + return false; + } + self.connected.len() < self.target_peers || self.qualifies_for_priority_connection(peer_id, peer_store, needed_subnets) } diff --git a/anchor/network/src/peer_manager/discovery.rs b/anchor/network/src/peer_manager/discovery.rs index 853b1026f..9300c9f79 100644 --- a/anchor/network/src/peer_manager/discovery.rs +++ b/anchor/network/src/peer_manager/discovery.rs @@ -188,19 +188,16 @@ impl PeerDiscovery { "Preparing to dial peer" ); - // Use PeerCondition::NotDialing to prevent concurrent dials to the same peer + // Use PeerCondition::DisconnectedAndNotDialing to prevent redundant dials // - // This condition allows dialing if: - // - We're not currently dialing the peer (prevents duplicate concurrent dials) - // - Even if we're already connected (allows reconnection if connection drops) + // This condition only allows dialing if: + // - We're not already connected to the peer + // - We're not currently dialing the peer // - // We changed from DisconnectedAndNotDialing to NotDialing because: - // - DisconnectedAndNotDialing would fail if already connected, causing noise - // - NotDialing allows redials after connection drops while preventing duplicates - // - The handshake queue mechanism (pending_handshakes) handles deduplication at the - // application layer, so we don't need to prevent all connected dials + // This prevents unnecessary dial attempts to already-connected peers and avoids + // creating duplicate connections from concurrent dials. DialOpts::peer_id(*peer) - .condition(PeerCondition::NotDialing) + .condition(PeerCondition::DisconnectedAndNotDialing) .addresses(addresses) .build() } From 0cbbd6f97688a5bbb0a8a63f28a112a6c52e74b1 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 2 Oct 2025 21:16:02 +0200 Subject: [PATCH 03/16] refactor: improve handshake API ergonomics and test readability Convert standalone handle_event function to a method on Behaviour for better encapsulation and more idiomatic Rust. Extract test event handling logic into helper function to eliminate duplication and improve readability. API Changes: - handle_event() is now a method: behaviour.handle_event(event) - handle_request() and handle_response() moved into Behaviour impl as private methods - Removed unused initiate() method (handshakes are auto-initiated) - Simplified call sites in network.rs and tests Test Improvements: - Added handle_swarm_event_for_test() helper to eliminate 40+ lines of duplication - Tests are now much more readable and maintainable - All assertions and tracking logic centralized in one place Cleanup: - Removed .with_bandwidth_metrics() call (doesn't exist in current libp2p version) - Removed unused parameters from test helpers --- anchor/network/src/handshake/mod.rs | 318 +++++++++++++--------------- anchor/network/src/network.rs | 6 +- 2 files changed, 147 insertions(+), 177 deletions(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 04f110f05..01fa58252 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -64,14 +64,77 @@ impl Behaviour { Self { inner, node_info } } - /// Manually initiate a handshake with a peer by sending our NodeInfo. - /// - /// Note: In normal operation, handshakes are initiated automatically via - /// `on_swarm_event` when an outbound connection is established. This method - /// is provided for testing or special cases where manual control is needed. - pub fn initiate(&mut self, peer_id: PeerId) { - trace!(?peer_id, "initiating handshake"); - self.inner.send_request(&peer_id, self.node_info.clone()); + /// Handle an event emitted by this behaviour. + /// Returns `Some` with the handshake result (success or failure) when the handshake completes, + /// or `None` for events that don't complete a handshake (like ResponseSent). + pub fn handle_event(&mut self, event: Event) -> Option> { + match event { + Event::Message { + peer, + message: + Message::Request { + request, channel, .. + }, + .. + } => Some(self.handle_request(peer, request, channel)), + Event::Message { + peer, + message: Message::Response { response, .. }, + .. + } => Some(Self::handle_response(&self.node_info, peer, response)), + Event::OutboundFailure { peer, error, .. } => { + trace!(?peer, ?error, "Handshake outbound failure"); + Some(Err(Failed { + peer_id: peer, + error: Box::new(Error::Outbound(error)), + })) + } + Event::InboundFailure { peer, error, .. } => Some(Err(Failed { + peer_id: peer, + error: Box::new(Error::Inbound(error)), + })), + Event::ResponseSent { .. } => None, + } + } + + fn handle_request( + &mut self, + peer_id: PeerId, + request: NodeInfo, + channel: ResponseChannel, + ) -> Result { + trace!(?peer_id, "handling handshake request"); + + // Send our info back to the peer + let _ = self.inner.send_response(channel, self.node_info.clone()); + + // Verify network compatibility + verify_node_info(&self.node_info, &request).map_err(|error| Failed { + peer_id, + error: Box::new(error), + })?; + + Ok(Completed { + peer_id, + their_info: request, + }) + } + + fn handle_response( + our_node_info: &NodeInfo, + peer_id: PeerId, + response: NodeInfo, + ) -> Result { + trace!(?peer_id, "handling handshake response"); + verify_node_info(our_node_info, &response).map_err(|error| Failed { + peer_id, + error: Box::new(error), + })?; + + Ok(Completed { + peer_id, + their_info: response, + }) } } @@ -85,103 +148,6 @@ fn verify_node_info(ours: &NodeInfo, theirs: &NodeInfo) -> Result<(), Error> { Ok(()) } -/// Handle an [`Event`] emitted by the passed [`Behaviour`]. The passed [`NodeInfo`] is used for -/// validating the remote peer's data and for responding to incoming requests. -pub fn handle_event( - our_node_info: &NodeInfo, - behaviour: &mut Behaviour, - event: Event, -) -> Option> { - match event { - Event::Message { - peer, - message: Message::Request { - request, channel, .. - }, - .. - } => Some(handle_request( - our_node_info, - behaviour, - peer, - request, - channel, - )), - Event::Message { - peer, - message: Message::Response { response, .. }, - .. - } => Some(handle_response(our_node_info, peer, response)), - Event::OutboundFailure { peer, error, .. } => { - trace!(?peer, ?error, "Handshake outbound failure"); - Some(Err(Failed { - peer_id: peer, - error: Box::new(Error::Outbound(error)), - })) - } - Event::InboundFailure { peer, error, .. } => Some(Err(Failed { - peer_id: peer, - error: Box::new(Error::Inbound(error)), - })), - Event::ResponseSent { .. } => None, - } -} - -fn handle_request( - our_node_info: &NodeInfo, - behaviour: &mut Behaviour, - peer_id: PeerId, - request: NodeInfo, - channel: ResponseChannel, -) -> Result { - trace!(?peer_id, "handling handshake request"); - - // Handle incoming handshake request from a remote peer - // - // This is the passive/inbound side of the handshake protocol: - // 1. The remote peer (who dialed us) initiates the handshake by sending their NodeInfo - // 2. We immediately send back our NodeInfo as a response - // 3. We verify their NodeInfo is compatible with ours (same network) - // - // This function is called automatically by libp2p's request-response behavior - // when a peer opens a stream on the /ssv/info/0.0.1 protocol. - // - // Note: We don't need to explicitly "accept" connections or queue inbound handshakes. - // The request-response behavior handles all the stream management automatically. - - // Send our info back to the peer - let _ = behaviour - .inner - .send_response(channel, our_node_info.clone()); - - // Verify network compatibility - verify_node_info(our_node_info, &request).map_err(|error| Failed { - peer_id, - error: Box::new(error), - })?; - - Ok(Completed { - peer_id, - their_info: request, - }) -} - -fn handle_response( - our_node_info: &NodeInfo, - peer_id: PeerId, - response: NodeInfo, -) -> Result { - trace!(?peer_id, "handling handshake response"); - verify_node_info(our_node_info, &response).map_err(|error| Failed { - peer_id, - error: Box::new(error), - })?; - - Ok(Completed { - peer_id, - their_info: response, - }) -} - impl NetworkBehaviour for Behaviour { type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; @@ -221,14 +187,13 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { // Auto-initiate handshake on first outbound connection - if let libp2p::swarm::FromSwarm::ConnectionEstablished(conn_est) = &event { - if let libp2p::core::ConnectedPoint::Dialer { .. } = conn_est.endpoint { - if conn_est.other_established == 0 { - trace!(?conn_est.peer_id, "Auto-initiating handshake on first outbound connection"); - self.inner - .send_request(&conn_est.peer_id, self.node_info.clone()); - } - } + if let libp2p::swarm::FromSwarm::ConnectionEstablished(conn_est) = &event + && let libp2p::core::ConnectedPoint::Dialer { .. } = conn_est.endpoint + && conn_est.other_established == 0 + { + trace!(?conn_est.peer_id, "Auto-initiating handshake on first outbound connection"); + self.inner + .send_request(&conn_est.peer_id, self.node_info.clone()); } self.inner.on_swarm_event(event); } @@ -288,11 +253,10 @@ mod tests { Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, node_info)) } + /// Helper to wait for both swarms to complete handshake async fn wait_for_handshake_completion( local_swarm: &mut Swarm, remote_swarm: &mut Swarm, - local_info: &NodeInfo, - remote_info: &NodeInfo, ) -> (Completed, Completed) { let mut local_result = None; let mut remote_result = None; @@ -300,12 +264,12 @@ mod tests { while local_result.is_none() || remote_result.is_none() { select!( SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - if let Some(result) = handle_event(local_info, local_swarm.behaviour_mut(), e) { + if let Some(result) = local_swarm.behaviour_mut().handle_event(e) { local_result = Some(result.expect("local handshake to succeed")); } } SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - if let Some(result) = handle_event(remote_info, remote_swarm.behaviour_mut(), e) { + if let Some(result) = remote_swarm.behaviour_mut().handle_event(e) { remote_result = Some(result.expect("remote handshake to succeed")); } } @@ -316,6 +280,47 @@ mod tests { (local_result.unwrap(), remote_result.unwrap()) } + /// Helper to handle swarm events and update tracking state + fn handle_swarm_event_for_test( + swarm: &mut Swarm, + event: SwarmEvent, + expected_peer: &PeerId, + expected_version: &str, + connections: &mut usize, + handshakes: &mut usize, + completed: &mut bool, + ) { + match event { + SwarmEvent::ConnectionEstablished { + num_established, + endpoint, + .. + } => { + *connections += 1; + trace!(?endpoint, ?num_established, "ConnectionEstablished"); + } + SwarmEvent::Behaviour(e) => { + if let Some(result) = swarm.behaviour_mut().handle_event(e) { + match result { + Ok(Completed { + peer_id, + their_info, + }) => { + *handshakes += 1; + assert_eq!(peer_id, *expected_peer); + assert_eq!(their_info.metadata.unwrap().node_version, expected_version); + *completed = true; + } + Err(Failed { error, .. }) => { + trace!(?error, "Handshake failed"); + } + } + } + } + _ => {} + } + } + #[tokio::test] async fn handshake_success() { *TRACING; @@ -333,13 +338,8 @@ mod tests { remote_swarm.connect(&mut local_swarm).await; // Test: Wait for both sides to complete handshake - let (local_result, remote_result) = wait_for_handshake_completion( - &mut local_swarm, - &mut remote_swarm, - &local_info, - &remote_info, - ) - .await; + let (local_result, remote_result) = + wait_for_handshake_completion(&mut local_swarm, &mut remote_swarm).await; // Verify: Both sides received correct peer info assert_eq!(local_result.peer_id, *remote_swarm.local_peer_id()); @@ -412,52 +412,26 @@ mod tests { while !local_completed || !remote_completed { select!( event = local_swarm.next_swarm_event() => { - match event { - SwarmEvent::ConnectionEstablished { num_established, endpoint, .. } => { - local_connections += 1; - trace!(?endpoint, ?num_established, "Local: ConnectionEstablished"); - } - SwarmEvent::Behaviour(e) => { - if let Some(result) = handle_event(&local_node_info, local_swarm.behaviour_mut(), e) { - match result { - Ok(Completed { peer_id, their_info }) => { - local_handshake_initiated += 1; - assert_eq!(peer_id, *remote_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "remote"); - local_completed = true; - } - Err(Failed { error, .. }) => { - trace!(?error, "Local: Handshake failed"); - } - } - } - } - _ => {} - } + handle_swarm_event_for_test( + &mut local_swarm, + event, + remote_swarm.local_peer_id(), + "remote", + &mut local_connections, + &mut local_handshake_initiated, + &mut local_completed, + ); } event = remote_swarm.next_swarm_event() => { - match event { - SwarmEvent::ConnectionEstablished { num_established, endpoint, .. } => { - remote_connections += 1; - trace!(?endpoint, ?num_established, "Remote: ConnectionEstablished"); - } - SwarmEvent::Behaviour(e) => { - if let Some(result) = handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) { - match result { - Ok(Completed { peer_id, their_info }) => { - remote_handshake_initiated += 1; - assert_eq!(peer_id, *local_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "local"); - remote_completed = true; - } - Err(Failed { error, .. }) => { - trace!(?error, "Remote: Handshake failed"); - } - } - } - } - _ => {} - } + handle_swarm_event_for_test( + &mut remote_swarm, + event, + local_swarm.local_peer_id(), + "local", + &mut remote_connections, + &mut remote_handshake_initiated, + &mut remote_completed, + ); } else => {} ) @@ -513,7 +487,7 @@ mod tests { while !local_completed || !remote_completed { select!( SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - if let Some(result) = handle_event(&local_node_info, local_swarm.behaviour_mut(), e) { + if let Some(result) = local_swarm.behaviour_mut().handle_event(e) { let Completed { peer_id, their_info } = result.expect("handshake to succeed"); assert_eq!(peer_id, *remote_swarm.local_peer_id()); assert_eq!(their_info.metadata.unwrap().node_version, "remote"); @@ -521,7 +495,7 @@ mod tests { } } SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - if let Some(result) = handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) { + if let Some(result) = remote_swarm.behaviour_mut().handle_event(e) { let Completed { peer_id, their_info } = result.expect("handshake to succeed"); assert_eq!(peer_id, *local_swarm.local_peer_id()); assert_eq!(their_info.metadata.unwrap().node_version, "local"); @@ -565,7 +539,7 @@ mod tests { select!( SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { let Some(result) = - handle_event(&local_node_info, local_swarm.behaviour_mut(), e) else { + local_swarm.behaviour_mut().handle_event(e) else { continue; }; let Failed { @@ -582,7 +556,7 @@ mod tests { } SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { let Some(result) = - handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) else { + remote_swarm.behaviour_mut().handle_event(e) else { continue; }; let Failed { diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index d2a1ec674..4be25ac88 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -206,11 +206,7 @@ impl Network { self.on_discovered_peers(peers); } AnchorBehaviourEvent::Handshake(event) => { - if let Some(result) = handshake::handle_event( - &self.node_info, - &mut self.swarm.behaviour_mut().handshake, - event, - ) { + if let Some(result) = self.swarm.behaviour_mut().handshake.handle_event(event) { self.handle_handshake_result(result); } } From 194c116292dedee5b047a0e83008aebf744d8270 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 2 Oct 2025 21:29:37 +0200 Subject: [PATCH 04/16] refactor: remove logging-only SwarmEvent handlers and fix SwarmBuilder --- anchor/network/src/network.rs | 62 ----------------------------------- 1 file changed, 62 deletions(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 4be25ac88..088be88de 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -234,68 +234,6 @@ impl Network { trace!(event = ?behaviour_event, "Unhandled behaviour event"); } }, - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - connection_id, - established_in, - concurrent_dial_errors, - num_established, - .. - } => { - trace!( - %peer_id, - ?connection_id, - ?endpoint, - ?established_in, - num_established, - concurrent_dial_errors = ?concurrent_dial_errors.as_ref().map(|v| v.len()), - "Connection established" - ); - - // The handshake Behaviour automatically initiates handshakes on - // first outbound connections via its NetworkBehaviour::on_swarm_event implementation. - // This ensures handshakes happen without external coordination. - }, - SwarmEvent::ConnectionClosed { - peer_id, - connection_id, - cause, - num_established, - .. - } => { - trace!( - %peer_id, - ?connection_id, - ?cause, - num_established, - "Connection closed" - ); - - }, - SwarmEvent::OutgoingConnectionError { - peer_id, - connection_id, - error, - } => { - trace!( - ?peer_id, - ?connection_id, - ?error, - "Outgoing connection error" - ); - - }, - SwarmEvent::Dialing { - peer_id, - connection_id, - } => { - trace!( - ?peer_id, - ?connection_id, - "Dialing peer" - ); - }, SwarmEvent::NewListenAddr { listener_id, address } => { self.on_new_listen_addr(listener_id, address); }, From b8ba1d5f6d68da8487cf9be5f7a8fc78e624d814 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 2 Oct 2025 21:54:32 +0200 Subject: [PATCH 05/16] implement copilot comments --- anchor/network/src/behaviour.rs | 9 +++ anchor/network/src/handshake/mod.rs | 116 ++++++++++++++++++---------- anchor/network/src/network.rs | 2 +- 3 files changed, 84 insertions(+), 43 deletions(-) diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 6f477d550..3094aa8fa 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -157,4 +157,13 @@ impl AnchorBehaviour { handshake, }) } + + /// Handle a handshake event, delegating to the handshake behaviour. + /// Returns the handshake result if the event completes a handshake. + pub fn handle_handshake_event( + &mut self, + event: crate::handshake::Event, + ) -> Option> { + self.handshake.handle_event(event) + } } diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 01fa58252..55aeda7b5 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -136,6 +136,25 @@ impl Behaviour { their_info: response, }) } + + /// Determines if a handshake should be initiated for this connection. + /// + /// Returns `Some(peer_id)` if: + /// - The event is a ConnectionEstablished event + /// - The connection is outbound (we are the dialer) + /// - This is the first established connection to the peer (other_established == 0) + fn should_initiate_handshake<'a>( + event: &'a libp2p::swarm::FromSwarm<'a>, + ) -> Option<&'a PeerId> { + if let libp2p::swarm::FromSwarm::ConnectionEstablished(conn_est) = event + && let libp2p::core::ConnectedPoint::Dialer { .. } = conn_est.endpoint + && conn_est.other_established == 0 + { + Some(&conn_est.peer_id) + } else { + None + } + } } fn verify_node_info(ours: &NodeInfo, theirs: &NodeInfo) -> Result<(), Error> { @@ -187,13 +206,12 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { // Auto-initiate handshake on first outbound connection - if let libp2p::swarm::FromSwarm::ConnectionEstablished(conn_est) = &event - && let libp2p::core::ConnectedPoint::Dialer { .. } = conn_est.endpoint - && conn_est.other_established == 0 - { - trace!(?conn_est.peer_id, "Auto-initiating handshake on first outbound connection"); - self.inner - .send_request(&conn_est.peer_id, self.node_info.clone()); + if let Some(peer_id) = Self::should_initiate_handshake(&event) { + trace!( + ?peer_id, + "Auto-initiating handshake on first outbound connection" + ); + self.inner.send_request(peer_id, self.node_info.clone()); } self.inner.on_swarm_event(event); } @@ -280,15 +298,25 @@ mod tests { (local_result.unwrap(), remote_result.unwrap()) } + /// Expected peer information for test assertions + struct ExpectedPeer<'a> { + peer_id: PeerId, + version: &'a str, + } + + /// Test state tracking for handshake tests + struct TestState { + connections: usize, + handshakes: usize, + completed: bool, + } + /// Helper to handle swarm events and update tracking state fn handle_swarm_event_for_test( swarm: &mut Swarm, event: SwarmEvent, - expected_peer: &PeerId, - expected_version: &str, - connections: &mut usize, - handshakes: &mut usize, - completed: &mut bool, + expected: &ExpectedPeer, + state: &mut TestState, ) { match event { SwarmEvent::ConnectionEstablished { @@ -296,7 +324,7 @@ mod tests { endpoint, .. } => { - *connections += 1; + state.connections += 1; trace!(?endpoint, ?num_established, "ConnectionEstablished"); } SwarmEvent::Behaviour(e) => { @@ -306,10 +334,10 @@ mod tests { peer_id, their_info, }) => { - *handshakes += 1; - assert_eq!(peer_id, *expected_peer); - assert_eq!(their_info.metadata.unwrap().node_version, expected_version); - *completed = true; + state.handshakes += 1; + assert_eq!(peer_id, expected.peer_id); + assert_eq!(their_info.metadata.unwrap().node_version, expected.version); + state.completed = true; } Err(Failed { error, .. }) => { trace!(?error, "Handshake failed"); @@ -399,38 +427,42 @@ mod tests { remote_swarm.dial(local_addr.clone()).unwrap(); trace!("Remote dialed local"); - // Track how many times we see Auto-initiating and what other_established values we see - let mut local_handshake_initiated = 0; - let mut remote_handshake_initiated = 0; - let mut local_completed = false; - let mut remote_completed = false; - - // Also track connection events to see concurrent dial resolution - let mut local_connections = 0; - let mut remote_connections = 0; - - while !local_completed || !remote_completed { + let expected_remote = ExpectedPeer { + peer_id: *remote_swarm.local_peer_id(), + version: "remote", + }; + let expected_local = ExpectedPeer { + peer_id: *local_swarm.local_peer_id(), + version: "local", + }; + + let mut local_state = TestState { + connections: 0, + handshakes: 0, + completed: false, + }; + let mut remote_state = TestState { + connections: 0, + handshakes: 0, + completed: false, + }; + + while !local_state.completed || !remote_state.completed { select!( event = local_swarm.next_swarm_event() => { handle_swarm_event_for_test( &mut local_swarm, event, - remote_swarm.local_peer_id(), - "remote", - &mut local_connections, - &mut local_handshake_initiated, - &mut local_completed, + &expected_remote, + &mut local_state, ); } event = remote_swarm.next_swarm_event() => { handle_swarm_event_for_test( &mut remote_swarm, event, - local_swarm.local_peer_id(), - "local", - &mut remote_connections, - &mut remote_handshake_initiated, - &mut remote_completed, + &expected_local, + &mut remote_state, ); } else => {} @@ -439,10 +471,10 @@ mod tests { // Evidence gathering: Check if we saw concurrent dials and what happened trace!( - local_connections, - remote_connections, - local_handshake_initiated, - remote_handshake_initiated, + local_connections = local_state.connections, + remote_connections = remote_state.connections, + local_handshake_initiated = local_state.handshakes, + remote_handshake_initiated = remote_state.handshakes, "Concurrent dial evidence" ); }) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 088be88de..bb2bf9a7f 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -206,7 +206,7 @@ impl Network { self.on_discovered_peers(peers); } AnchorBehaviourEvent::Handshake(event) => { - if let Some(result) = self.swarm.behaviour_mut().handshake.handle_event(event) { + if let Some(result) = self.swarm.behaviour_mut().handle_handshake_event(event) { self.handle_handshake_result(result); } } From 37f5e0e4ab9ff8c70211a1a1adf09465c71febe2 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 2 Oct 2025 22:53:28 +0200 Subject: [PATCH 06/16] refactor: restore semantic event emission in handshake behaviour Restores the original poll() implementation that processes RequestResponse events internally and emits high-level Event::Completed and Event::Failed events, enabling clean, declarative tests using drive(). Changes: - Added events: VecDeque queue to Behaviour struct - Custom poll() processes RequestResponse events and emits semantic events - Removed handle_event() method - no longer needed - Refactored tests to use drive() instead of manual select! loops - Added helper functions to eliminate test duplication (assert_completed, assert_network_mismatch, verify_and_emit_event) - Updated network.rs to handle semantic Event type directly Benefits: - Tests reduced from 611 to 397 lines (35% reduction) - Much cleaner, more readable tests using drive() - Clear separation: poll() handles low-level details, tests work with high-level events - Matches original February 2025 design pattern --- anchor/network/src/behaviour.rs | 9 - anchor/network/src/handshake/mod.rs | 499 +++++++++------------------- anchor/network/src/network.rs | 17 +- 3 files changed, 164 insertions(+), 361 deletions(-) diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 3094aa8fa..6f477d550 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -157,13 +157,4 @@ impl AnchorBehaviour { handshake, }) } - - /// Handle a handshake event, delegating to the handshake behaviour. - /// Returns the handshake result if the event completes a handshake. - pub fn handle_handshake_event( - &mut self, - event: crate::handshake::Event, - ) -> Option> { - self.handshake.handle_event(event) - } } diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 55aeda7b5..24fbedb9f 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -2,28 +2,45 @@ mod codec; mod envelope; pub mod node_info; +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + use discv5::libp2p_identity::Keypair; use libp2p::{ PeerId, StreamProtocol, request_response::{ - Behaviour as RequestResponseBehaviour, Config, InboundFailure, Message, OutboundFailure, - ProtocolSupport, ResponseChannel, + Behaviour as RequestResponseBehaviour, Config, Event as RequestResponseEvent, + InboundFailure, Message, OutboundFailure, ProtocolSupport, ResponseChannel, }, - swarm::NetworkBehaviour, + swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm}, }; use tracing::trace; use crate::handshake::{codec::Codec, node_info::NodeInfo}; +/// Event emitted on handshake completion or failure. +#[derive(Debug)] +pub enum Event { + Completed { + peer_id: PeerId, + their_info: NodeInfo, + }, + Failed { + peer_id: PeerId, + error: Error, + }, +} + /// Network behaviour handling the handshake protocol. /// Automatically initiates handshakes on outbound connections. pub struct Behaviour { inner: RequestResponseBehaviour, node_info: NodeInfo, + events: VecDeque, } -pub type Event = as NetworkBehaviour>::ToSwarm; - #[derive(Debug)] pub enum Error { /// We are not on the same network as the remote @@ -61,39 +78,24 @@ impl Behaviour { [(protocol, ProtocolSupport::Full)], Config::default(), ); - Self { inner, node_info } + Self { + inner, + node_info, + events: VecDeque::new(), + } } - /// Handle an event emitted by this behaviour. - /// Returns `Some` with the handshake result (success or failure) when the handshake completes, - /// or `None` for events that don't complete a handshake (like ResponseSent). - pub fn handle_event(&mut self, event: Event) -> Option> { - match event { - Event::Message { - peer, - message: - Message::Request { - request, channel, .. - }, - .. - } => Some(self.handle_request(peer, request, channel)), - Event::Message { - peer, - message: Message::Response { response, .. }, - .. - } => Some(Self::handle_response(&self.node_info, peer, response)), - Event::OutboundFailure { peer, error, .. } => { - trace!(?peer, ?error, "Handshake outbound failure"); - Some(Err(Failed { - peer_id: peer, - error: Box::new(Error::Outbound(error)), - })) + fn verify_and_emit_event(&mut self, peer_id: PeerId, their_info: NodeInfo) { + match verify_node_info(&self.node_info, &their_info) { + Ok(()) => { + self.events.push_back(Event::Completed { + peer_id, + their_info, + }); + } + Err(error) => { + self.events.push_back(Event::Failed { peer_id, error }); } - Event::InboundFailure { peer, error, .. } => Some(Err(Failed { - peer_id: peer, - error: Box::new(Error::Inbound(error)), - })), - Event::ResponseSent { .. } => None, } } @@ -102,39 +104,21 @@ impl Behaviour { peer_id: PeerId, request: NodeInfo, channel: ResponseChannel, - ) -> Result { + ) { trace!(?peer_id, "handling handshake request"); // Send our info back to the peer let _ = self.inner.send_response(channel, self.node_info.clone()); - // Verify network compatibility - verify_node_info(&self.node_info, &request).map_err(|error| Failed { - peer_id, - error: Box::new(error), - })?; - - Ok(Completed { - peer_id, - their_info: request, - }) + // Verify network compatibility and emit event + self.verify_and_emit_event(peer_id, request); } - fn handle_response( - our_node_info: &NodeInfo, - peer_id: PeerId, - response: NodeInfo, - ) -> Result { + fn handle_response(&mut self, peer_id: PeerId, response: NodeInfo) { trace!(?peer_id, "handling handshake response"); - verify_node_info(our_node_info, &response).map_err(|error| Failed { - peer_id, - error: Box::new(error), - })?; - - Ok(Completed { - peer_id, - their_info: response, - }) + + // Verify network compatibility and emit event + self.verify_and_emit_event(peer_id, response); } /// Determines if a handshake should be initiated for this connection. @@ -228,10 +212,60 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>> - { - self.inner.poll(cx) + cx: &mut Context<'_>, + ) -> Poll>> { + // Process events from inner request-response behaviour + while let Poll::Ready(event) = self.inner.poll(cx) { + match event { + ToSwarm::GenerateEvent(req_resp_event) => match req_resp_event { + RequestResponseEvent::Message { + peer, + message: + Message::Request { + request, channel, .. + }, + .. + } => { + trace!("Received handshake request"); + self.handle_request(peer, request, channel); + } + RequestResponseEvent::Message { + peer, + message: Message::Response { response, .. }, + .. + } => { + trace!(?response, "Received handshake response"); + self.handle_response(peer, response); + } + RequestResponseEvent::OutboundFailure { peer, error, .. } => { + self.events.push_back(Event::Failed { + peer_id: peer, + error: Error::Outbound(error), + }); + } + RequestResponseEvent::InboundFailure { peer, error, .. } => { + self.events.push_back(Event::Failed { + peer_id: peer, + error: Error::Inbound(error), + }); + } + RequestResponseEvent::ResponseSent { .. } => {} + }, + other => { + // Bubble up all other ToSwarm events + return Poll::Ready( + other.map_out(|_| unreachable!("We already handled GenerateEvent")), + ); + } + } + } + + // Emit queued events + if let Some(event) = self.events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + Poll::Pending } } @@ -246,15 +280,12 @@ mod tests { use std::sync::LazyLock; use discv5::libp2p_identity::Keypair; - use libp2p::swarm::{Swarm, SwarmEvent}; - use libp2p_swarm_test::SwarmExt; - use tokio::select; + use libp2p::swarm::Swarm; + use libp2p_swarm_test::{SwarmExt, drive}; use super::*; use crate::handshake::node_info::NodeMetadata; - // Test helper functions for cleaner test structure - fn node_info(network: &str, version: &str) -> NodeInfo { NodeInfo { network_id: network.to_string(), @@ -271,81 +302,37 @@ mod tests { Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, node_info)) } - /// Helper to wait for both swarms to complete handshake - async fn wait_for_handshake_completion( - local_swarm: &mut Swarm, - remote_swarm: &mut Swarm, - ) -> (Completed, Completed) { - let mut local_result = None; - let mut remote_result = None; - - while local_result.is_none() || remote_result.is_none() { - select!( - SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - if let Some(result) = local_swarm.behaviour_mut().handle_event(e) { - local_result = Some(result.expect("local handshake to succeed")); - } - } - SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - if let Some(result) = remote_swarm.behaviour_mut().handle_event(e) { - remote_result = Some(result.expect("remote handshake to succeed")); - } - } - else => {} - ) + fn assert_completed(event: Event, expected_peer: PeerId, expected_version: &str) { + match event { + Event::Completed { + peer_id, + their_info, + } => { + assert_eq!(peer_id, expected_peer); + assert_eq!(their_info.metadata.unwrap().node_version, expected_version); + } + Event::Failed { error, .. } => panic!("Expected Completed, got Failed: {:?}", error), } - - (local_result.unwrap(), remote_result.unwrap()) - } - - /// Expected peer information for test assertions - struct ExpectedPeer<'a> { - peer_id: PeerId, - version: &'a str, } - /// Test state tracking for handshake tests - struct TestState { - connections: usize, - handshakes: usize, - completed: bool, - } - - /// Helper to handle swarm events and update tracking state - fn handle_swarm_event_for_test( - swarm: &mut Swarm, - event: SwarmEvent, - expected: &ExpectedPeer, - state: &mut TestState, + fn assert_network_mismatch( + event: Event, + expected_peer: PeerId, + expected_ours: &str, + expected_theirs: &str, ) { match event { - SwarmEvent::ConnectionEstablished { - num_established, - endpoint, - .. - } => { - state.connections += 1; - trace!(?endpoint, ?num_established, "ConnectionEstablished"); - } - SwarmEvent::Behaviour(e) => { - if let Some(result) = swarm.behaviour_mut().handle_event(e) { - match result { - Ok(Completed { - peer_id, - their_info, - }) => { - state.handshakes += 1; - assert_eq!(peer_id, expected.peer_id); - assert_eq!(their_info.metadata.unwrap().node_version, expected.version); - state.completed = true; - } - Err(Failed { error, .. }) => { - trace!(?error, "Handshake failed"); - } + Event::Failed { peer_id, error } => { + assert_eq!(peer_id, expected_peer); + match error { + Error::NetworkMismatch { ours, theirs } => { + assert_eq!(ours, expected_ours); + assert_eq!(theirs, expected_theirs); } + _ => panic!("Expected NetworkMismatch, got {:?}", error), } } - _ => {} + Event::Completed { .. } => panic!("Expected Failed, got Completed"), } } @@ -353,261 +340,75 @@ mod tests { async fn handshake_success() { *TRACING; - // Setup: Create two peers with matching networks - let local_info = node_info("test", "local"); - let remote_info = node_info("test", "remote"); - - let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), local_info.clone()); - let mut remote_swarm = create_test_swarm(Keypair::generate_ed25519(), remote_info.clone()); + let mut local_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); + let mut remote_swarm = + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "remote")); tokio::spawn(async move { - // Setup: Establish connection local_swarm.listen().with_memory_addr_external().await; remote_swarm.connect(&mut local_swarm).await; - // Test: Wait for both sides to complete handshake - let (local_result, remote_result) = - wait_for_handshake_completion(&mut local_swarm, &mut remote_swarm).await; - - // Verify: Both sides received correct peer info - assert_eq!(local_result.peer_id, *remote_swarm.local_peer_id()); - assert_eq!( - local_result.their_info.metadata.unwrap().node_version, - "remote" - ); + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; - assert_eq!(remote_result.peer_id, *local_swarm.local_peer_id()); - assert_eq!( - remote_result.their_info.metadata.unwrap().node_version, - "local" - ); + assert_completed(local_event, *remote_swarm.local_peer_id(), "remote"); + assert_completed(remote_event, *local_swarm.local_peer_id(), "local"); }) .await .expect("test completed"); } - /// Evidence-gathering test for concurrent dial behavior. - /// - /// This test demonstrates that when both peers dial each other simultaneously: - /// 1. Both peers get 2 ConnectionEstablished events (one Dialer, one Listener) - /// 2. Only ONE peer initiates the handshake (the one whose Dialer connection wins the race) - /// 3. The check `other_established == 0` prevents duplicate handshake initiations - /// 4. Both peers complete the handshake successfully despite concurrent dials - /// - /// This proves that our approach using `other_established == 0` correctly handles - /// concurrent dial resolution without relying on the Identify protocol. #[tokio::test] - async fn concurrent_dials_both_initiate_handshake() { + async fn concurrent_dials_handshake_success() { *TRACING; - let local_key = Keypair::generate_ed25519(); - let remote_key = Keypair::generate_ed25519(); - - let local_node_info = node_info("test", "local"); - let remote_node_info = node_info("test", "remote"); - let mut local_swarm = - Swarm::new_ephemeral_tokio(|_| Behaviour::new(local_key, local_node_info.clone())); + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); let mut remote_swarm = - Swarm::new_ephemeral_tokio(|_| Behaviour::new(remote_key, remote_node_info.clone())); + create_test_swarm(Keypair::generate_ed25519(), node_info("test", "remote")); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; remote_swarm.listen().with_memory_addr_external().await; - // Force both peers to dial each other by getting addresses and dialing manually + // Force both peers to dial each other let local_addr = local_swarm.external_addresses().next().unwrap().clone(); let remote_addr = remote_swarm.external_addresses().next().unwrap().clone(); - trace!(?local_addr, ?remote_addr, "About to dial each other"); - - // Dial each other at the same time - local_swarm.dial(remote_addr.clone()).unwrap(); - trace!("Local dialed remote"); - remote_swarm.dial(local_addr.clone()).unwrap(); - trace!("Remote dialed local"); - - let expected_remote = ExpectedPeer { - peer_id: *remote_swarm.local_peer_id(), - version: "remote", - }; - let expected_local = ExpectedPeer { - peer_id: *local_swarm.local_peer_id(), - version: "local", - }; - - let mut local_state = TestState { - connections: 0, - handshakes: 0, - completed: false, - }; - let mut remote_state = TestState { - connections: 0, - handshakes: 0, - completed: false, - }; - - while !local_state.completed || !remote_state.completed { - select!( - event = local_swarm.next_swarm_event() => { - handle_swarm_event_for_test( - &mut local_swarm, - event, - &expected_remote, - &mut local_state, - ); - } - event = remote_swarm.next_swarm_event() => { - handle_swarm_event_for_test( - &mut remote_swarm, - event, - &expected_local, - &mut remote_state, - ); - } - else => {} - ) - } - - // Evidence gathering: Check if we saw concurrent dials and what happened - trace!( - local_connections = local_state.connections, - remote_connections = remote_state.connections, - local_handshake_initiated = local_state.handshakes, - remote_handshake_initiated = remote_state.handshakes, - "Concurrent dial evidence" - ); - }) - .await - .expect("tokio runtime failed"); - } - - /// Test basic handshake with a single outbound connection. - /// - /// This test verifies that: - /// 1. Only the dialer (remote) auto-initiates the handshake - /// 2. The listener (local) responds to the handshake request - /// 3. Both sides complete the handshake successfully - /// - /// This is the simple case with no concurrent dials. - #[tokio::test] - async fn bidirectional_connection_handshake_success() { - *TRACING; + local_swarm.dial(remote_addr).unwrap(); + remote_swarm.dial(local_addr).unwrap(); - let local_key = Keypair::generate_ed25519(); - let remote_key = Keypair::generate_ed25519(); + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; - let local_node_info = node_info("test", "local"); - let remote_node_info = node_info("test", "remote"); - - let mut local_swarm = - Swarm::new_ephemeral_tokio(|_| Behaviour::new(local_key, local_node_info.clone())); - let mut remote_swarm = - Swarm::new_ephemeral_tokio(|_| Behaviour::new(remote_key, remote_node_info.clone())); - - tokio::spawn(async move { - local_swarm.listen().with_memory_addr_external().await; - remote_swarm.listen().with_memory_addr_external().await; - - // Remote dials local - only remote will initiate handshake - remote_swarm.connect(&mut local_swarm).await; - - // Both peers should complete handshake - let mut local_completed = false; - let mut remote_completed = false; - - while !local_completed || !remote_completed { - select!( - SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - if let Some(result) = local_swarm.behaviour_mut().handle_event(e) { - let Completed { peer_id, their_info } = result.expect("handshake to succeed"); - assert_eq!(peer_id, *remote_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "remote"); - local_completed = true; - } - } - SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - if let Some(result) = remote_swarm.behaviour_mut().handle_event(e) { - let Completed { peer_id, their_info } = result.expect("handshake to succeed"); - assert_eq!(peer_id, *local_swarm.local_peer_id()); - assert_eq!(their_info.metadata.unwrap().node_version, "local"); - remote_completed = true; - } - } - else => {} - ) - } + assert_completed(local_event, *remote_swarm.local_peer_id(), "remote"); + assert_completed(remote_event, *local_swarm.local_peer_id(), "local"); }) .await - .expect("tokio runtime failed"); + .expect("test completed"); } #[tokio::test] async fn mismatched_networks_handshake_failed() { *TRACING; - let local_key = Keypair::generate_ed25519(); - let remote_key = Keypair::generate_ed25519(); - - let local_node_info = node_info("test1", "local"); - let remote_node_info = node_info("test2", "remote"); - let mut local_swarm = - Swarm::new_ephemeral_tokio(|_| Behaviour::new(local_key, local_node_info.clone())); + create_test_swarm(Keypair::generate_ed25519(), node_info("test1", "local")); let mut remote_swarm = - Swarm::new_ephemeral_tokio(|_| Behaviour::new(remote_key, remote_node_info.clone())); + create_test_swarm(Keypair::generate_ed25519(), node_info("test2", "remote")); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; - remote_swarm.connect(&mut local_swarm).await; - // No manual initiate() call - Behaviour handles it automatically! - - let mut local_failed = false; - let mut remote_failed = false; - - while !local_failed && !remote_failed { - select!( - SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { - let Some(result) = - local_swarm.behaviour_mut().handle_event(e) else { - continue; - }; - let Failed { - peer_id, - error, - } = result.expect_err("handshake to fail"); - let Error::NetworkMismatch { ours, theirs } = *error else { - panic!("expected network mismatch"); - }; - assert_eq!(peer_id, *remote_swarm.local_peer_id()); - assert_eq!(ours, "test1"); - assert_eq!(theirs, "test2"); - local_failed = true; - } - SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { - let Some(result) = - remote_swarm.behaviour_mut().handle_event(e) else { - continue; - }; - let Failed { - peer_id, - error, - } = result.expect_err("handshake to fail"); - let Error::NetworkMismatch { ours, theirs } = *error else { - panic!("expected network mismatch"); - }; - assert_eq!(peer_id, *local_swarm.local_peer_id()); - assert_eq!(ours, "test2"); - assert_eq!(theirs, "test1"); - remote_failed = true; - } - else => {} - ) - } + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; + + assert_network_mismatch(local_event, *remote_swarm.local_peer_id(), "test1", "test2"); + assert_network_mismatch(remote_event, *local_swarm.local_peer_id(), "test2", "test1"); }) .await - .expect("tokio runtime failed"); + .expect("test completed"); } } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index bb2bf9a7f..341ee9adc 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -206,8 +206,20 @@ impl Network { self.on_discovered_peers(peers); } AnchorBehaviourEvent::Handshake(event) => { - if let Some(result) = self.swarm.behaviour_mut().handle_handshake_event(event) { - self.handle_handshake_result(result); + use crate::handshake::Event as HandshakeEvent; + match event { + HandshakeEvent::Completed { peer_id, their_info } => { + self.handle_handshake_result(Ok(crate::handshake::Completed { + peer_id, + their_info, + })); + } + HandshakeEvent::Failed { peer_id, error } => { + self.handle_handshake_result(Err(crate::handshake::Failed { + peer_id, + error: Box::new(error), + })); + } } } AnchorBehaviourEvent::PeerManager(peer_manager::Event::Heartbeat(heartbeat)) => { @@ -628,7 +640,6 @@ fn build_swarm( .with_tokio() .with_other_transport(|_key| transport) .expect("infallible") // This operation can't fail because the error type is Infallible. - .with_bandwidth_metrics(metrics_registry) .with_behaviour(|_| behaviour) .expect("infallible") // Again, this can't fail. .with_swarm_config(|_| swarm_config) From ac6f471438d1fe0158d6f5db942ff6880346d19b Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 2 Oct 2025 23:07:19 +0200 Subject: [PATCH 07/16] with_bandwidth_metrics(metrics_registry) doesnt exist --- anchor/network/src/network.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 341ee9adc..cf4c3afa2 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -117,13 +117,7 @@ impl Network { ); let mut network = Network { - swarm: build_swarm( - executor.clone(), - local_keypair, - transport, - behaviour, - &mut metrics_registry, - )?, + swarm: build_swarm(executor.clone(), local_keypair, transport, behaviour)?, subnet_event_receiver, message_rx, peer_id, @@ -603,7 +597,6 @@ fn build_swarm( local_keypair: Keypair, transport: Boxed<(PeerId, StreamMuxerBox)>, behaviour: AnchorBehaviour, - metrics_registry: &mut Registry, ) -> Result, Box> { struct Executor(task_executor::TaskExecutor); impl libp2p::swarm::Executor for Executor { From 2d60d3eabe9dc2803e4ee3b60531ae24eb542f54 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 3 Oct 2025 01:23:39 +0200 Subject: [PATCH 08/16] test: fix concurrent_dials test to actually verify no duplicate handshakes The previous test only checked that each peer received one Completed event, but didn't verify that only ONE handshake was initiated. This meant the test would pass even without the other_established == 0 check. The new test: 1. Drives both swarms until they complete handshakes 2. Then attempts to drive again with a timeout 3. Asserts that NO more handshake events occur (timeout) Without the other_established == 0 check, both peers initiate handshakes on both ConnectionEstablished events, causing the test to fail as expected. With the check, only one handshake is initiated and the test passes. --- anchor/network/src/handshake/mod.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 24fbedb9f..d7157ab97 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -359,8 +359,13 @@ mod tests { .expect("test completed"); } + /// Test that verifies only ONE handshake happens during concurrent dials. + /// + /// Without the `other_established == 0` check, this test would see BOTH peers + /// initiate handshakes, leading to duplicate requests. With the check, only + /// the first ConnectionEstablished triggers a handshake initiation. #[tokio::test] - async fn concurrent_dials_handshake_success() { + async fn concurrent_dials_only_one_handshake() { *TRACING; let mut local_swarm = @@ -379,11 +384,25 @@ mod tests { local_swarm.dial(remote_addr).unwrap(); remote_swarm.dial(local_addr).unwrap(); + // Drive until both complete - expecting exactly 1 event per peer let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = drive(&mut local_swarm, &mut remote_swarm).await; + // Both should have completed successfully assert_completed(local_event, *remote_swarm.local_peer_id(), "remote"); assert_completed(remote_event, *local_swarm.local_peer_id(), "local"); + + // Key assertion: If we try to drive again with a timeout, + // there should be NO more events (no duplicate handshakes) + use tokio::time::{timeout, Duration}; + + let result = timeout(Duration::from_millis(100), async { + let ([_local], [_remote]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; + }).await; + + // Should timeout - no more handshake events should occur + assert!(result.is_err(), "Expected no more handshake events, but got some! This means duplicate handshakes occurred."); }) .await .expect("test completed"); From 565660475756ccbe75bc59f9881b3bf2449d3929 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 15 Oct 2025 19:39:26 +0200 Subject: [PATCH 09/16] fix: remove unused debug import in handshake module --- anchor/network/src/handshake/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 7002071dd..d7157ab97 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -16,7 +16,7 @@ use libp2p::{ }, swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm}, }; -use tracing::{debug, trace}; +use tracing::trace; use crate::handshake::{codec::Codec, node_info::NodeInfo}; From b6e8c82751b38271ffbad6d7691278af80c12278 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 15 Oct 2025 19:45:16 +0200 Subject: [PATCH 10/16] feat: add bandwidth metrics to SwarmBuilder Re-added .with_bandwidth_metrics(metrics_registry) to the SwarmBuilder chain to track network bandwidth statistics in Prometheus. This was inadvertently removed during the merge but is important for monitoring network performance. The metrics_registry is now passed to build_swarm() and used to register bandwidth metrics for both inbound and outbound traffic. --- anchor/network/src/network.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 9960631c6..51c5d58c4 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -138,7 +138,13 @@ impl Network { ); let mut network = Network { - swarm: build_swarm(executor.clone(), local_keypair, transport, behaviour)?, + swarm: build_swarm( + executor.clone(), + local_keypair, + transport, + behaviour, + &mut metrics_registry, + )?, subnet_event_receiver, message_rx, peer_id, @@ -719,6 +725,7 @@ fn build_swarm( local_keypair: Keypair, transport: Boxed<(PeerId, StreamMuxerBox)>, behaviour: AnchorBehaviour, + metrics_registry: &mut Registry, ) -> Result, Box> { struct Executor(task_executor::TaskExecutor); impl libp2p::swarm::Executor for Executor { @@ -755,6 +762,7 @@ fn build_swarm( .with_tokio() .with_other_transport(|_key| transport) .expect("infallible") // This operation can't fail because the error type is Infallible. + .with_bandwidth_metrics(metrics_registry) .with_behaviour(|_| behaviour) .expect("infallible") // Again, this can't fail. .with_swarm_config(|_| swarm_config) From 1b82a0ccc28b3ff892d0410d01758363e8c7c4e3 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 15 Oct 2025 20:56:04 +0200 Subject: [PATCH 11/16] refactor: keep unreachable!() in map_out with better documentation The unreachable!() in map_out is the correct and idiomatic approach here. When we match on the 'other' arm, we're handling ToSwarm events that are NOT GenerateEvent (like Dial, NotifyHandler, CloseConnection, etc.). The map_out closure is only called when transforming GenerateEvent variants, which we've already exhaustively handled in the first match arm. Therefore, the closure in map_out should never execute - if it does, it indicates a logic bug in our code. This is the standard pattern used throughout rust-libp2p for wrapping inner behaviours. The improved comment explains WHY it's unreachable, addressing the copilot's concern about potential panics while maintaining the correctness invariant. --- anchor/network/src/handshake/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index d7157ab97..78e8c560e 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -252,9 +252,12 @@ impl NetworkBehaviour for Behaviour { RequestResponseEvent::ResponseSent { .. } => {} }, other => { - // Bubble up all other ToSwarm events + // Bubble up all other ToSwarm events (Dial, NotifyHandler, CloseConnection, + // etc.) These events don't contain GenerateEvent, so + // map_out's closure is never called. This is safe because + // we've exhaustively handled all GenerateEvent variants above. return Poll::Ready( - other.map_out(|_| unreachable!("We already handled GenerateEvent")), + other.map_out(|_| unreachable!("GenerateEvent already handled")), ); } } From df10f13dd2e78cb68147ec847a423cbc407dafda Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 16 Oct 2025 14:49:33 +0200 Subject: [PATCH 12/16] change log level to debug --- anchor/network/src/handshake/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 78e8c560e..97e998b7c 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -275,8 +275,8 @@ impl NetworkBehaviour for Behaviour { #[cfg(test)] mod tests { // Init tracing - static TRACING: LazyLock<()> = LazyLock::new(|| { - let env_filter = tracing_subscriber::EnvFilter::new("trace"); + static DEBUG: LazyLock<()> = LazyLock::new(|| { + let env_filter = tracing_subscriber::EnvFilter::new("debug"); tracing_subscriber::fmt().with_env_filter(env_filter).init(); }); @@ -341,7 +341,7 @@ mod tests { #[tokio::test] async fn handshake_success() { - *TRACING; + *DEBUG; let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); @@ -369,7 +369,7 @@ mod tests { /// the first ConnectionEstablished triggers a handshake initiation. #[tokio::test] async fn concurrent_dials_only_one_handshake() { - *TRACING; + *DEBUG; let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); @@ -413,7 +413,7 @@ mod tests { #[tokio::test] async fn mismatched_networks_handshake_failed() { - *TRACING; + *DEBUG; let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), node_info("test1", "local")); From 15edef74b84769d55e5387fb2ddef9f28d876c54 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 16 Oct 2025 18:27:25 +0200 Subject: [PATCH 13/16] docs: update idle_connection_timeout comment to remove outdated Identify reference --- anchor/network/src/network.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 51c5d58c4..e20b410eb 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -744,16 +744,10 @@ fn build_swarm( .with_notify_handler_buffer_size(notify_handler_buffer_size) .with_per_connection_event_buffer_size(4) .with_dial_concurrency_factor(dial_concurrency_factor) - // Set a non-zero idle connection timeout to prevent premature connection closes + // Set a non-zero idle connection timeout to allow time for handshake completion // - // Without this timeout, libp2p may close idle connections before we can complete - // the handshake sequence (ConnectionEstablished → Identify → SSV handshake). - // This is especially important for: - // - Connections with slow Identify protocol completion - // - Simultaneous dials where resolution takes time - // - Networks with high latency - // - // 30 seconds provides sufficient time for the full handshake flow while still + // libp2p needs time to complete the SSV handshake protocol after connection + // establishment. 30 seconds provides sufficient time for this flow while still // cleaning up truly idle connections. This follows guidance from rust-libp2p // maintainers to always set a non-zero idle timeout. .with_idle_connection_timeout(Duration::from_secs(30)); From 5b1e2408e13a6ab04e83e902ff84fe1b1b84112c Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 3 Nov 2025 14:21:08 +0100 Subject: [PATCH 14/16] refactor: simplify handshake event handling Remove redundant Completed and Failed structs that duplicated Event variant data. Pass Event directly to handlers instead of wrapping in Result. This eliminates unnecessary type conversions and follows idiomatic libp2p patterns for NetworkBehaviour events. --- anchor/network/src/handshake/mod.rs | 28 ++++++++-------------------- anchor/network/src/network.rs | 26 ++++++-------------------- 2 files changed, 14 insertions(+), 40 deletions(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 97e998b7c..099256bcc 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -29,7 +29,7 @@ pub enum Event { }, Failed { peer_id: PeerId, - error: Error, + error: Box, }, } @@ -53,21 +53,6 @@ pub enum Error { Outbound(OutboundFailure), } -/// We successfully completed a handshake. -#[derive(Debug)] -pub struct Completed { - pub peer_id: PeerId, - pub their_info: NodeInfo, -} - -/// The handshake either failed because of shaking with an incompatible peer or because of some -/// network failure. -#[derive(Debug)] -pub struct Failed { - pub peer_id: PeerId, - pub error: Box, -} - impl Behaviour { /// Create a new handshake Behaviour. /// The behaviour automatically initiates handshakes on outbound connections. @@ -94,7 +79,10 @@ impl Behaviour { }); } Err(error) => { - self.events.push_back(Event::Failed { peer_id, error }); + self.events.push_back(Event::Failed { + peer_id, + error: Box::new(error), + }); } } } @@ -240,13 +228,13 @@ impl NetworkBehaviour for Behaviour { RequestResponseEvent::OutboundFailure { peer, error, .. } => { self.events.push_back(Event::Failed { peer_id: peer, - error: Error::Outbound(error), + error: Box::new(Error::Outbound(error)), }); } RequestResponseEvent::InboundFailure { peer, error, .. } => { self.events.push_back(Event::Failed { peer_id: peer, - error: Error::Inbound(error), + error: Box::new(Error::Inbound(error)), }); } RequestResponseEvent::ResponseSent { .. } => {} @@ -327,7 +315,7 @@ mod tests { match event { Event::Failed { peer_id, error } => { assert_eq!(peer_id, expected_peer); - match error { + match *error { Error::NetworkMismatch { ours, theirs } => { assert_eq!(ours, expected_ours); assert_eq!(theirs, expected_theirs); diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 0febd72d4..b844b8f31 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -227,21 +227,7 @@ impl Network { self.on_discovered_peers(peers); } AnchorBehaviourEvent::Handshake(event) => { - use crate::handshake::Event as HandshakeEvent; - match event { - HandshakeEvent::Completed { peer_id, their_info } => { - self.handle_handshake_result(Ok(crate::handshake::Completed { - peer_id, - their_info, - })); - } - HandshakeEvent::Failed { peer_id, error } => { - self.handle_handshake_result(Err(crate::handshake::Failed { - peer_id, - error: Box::new(error), - })); - } - } + self.handle_handshake_result(event); } AnchorBehaviourEvent::PeerManager(peer_manager::Event::Heartbeat(heartbeat)) => { if let Some(actions) = heartbeat.connect_actions { @@ -585,12 +571,12 @@ impl Network { } } - fn handle_handshake_result(&mut self, result: Result) { - match result { - Ok(handshake::Completed { + fn handle_handshake_result(&mut self, event: handshake::Event) { + match event { + handshake::Event::Completed { peer_id, their_info, - }) => { + } => { // Record successful handshake if let Ok(counter) = crate::metrics::HANDSHAKE_SUCCESSFUL.as_ref() { counter.inc(); @@ -606,7 +592,7 @@ impl Network { debug!(%peer_id, ?their_info, "Handshake completed without metadata"); } } - Err(handshake::Failed { peer_id, error }) => { + handshake::Event::Failed { peer_id, error } => { // Determine failure reason for metrics let failure_reason = match error.as_ref() { handshake::Error::NetworkMismatch { .. } => "network_mismatch", From fca0502cb203599fbc678a248b904f3046114891 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 3 Nov 2025 14:40:15 +0100 Subject: [PATCH 15/16] fix: restore connection error logging for network diagnostics Re-add structured logging for connection errors that was removed in a previous refactoring. These errors are distinct from routine connection events: - OutgoingConnectionError: Failed to establish outgoing connection - IncomingConnectionError: Failed to accept incoming connection - ConnectionClosed with cause: Established connection failed at runtime Log errors at debug level and clean closures at trace level following libp2p's own logging patterns. --- anchor/network/src/network.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index b844b8f31..47ee95a41 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -278,6 +278,19 @@ impl Network { SwarmEvent::NewListenAddr { listener_id, address } => { self.on_new_listen_addr(listener_id, address); }, + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + debug!(?peer_id, ?error, "Outgoing connection error"); + }, + SwarmEvent::IncomingConnectionError { error, send_back_addr, .. } => { + debug!(?send_back_addr, ?error, "Incoming connection error"); + }, + SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { + if cause.is_some() { + debug!(?peer_id, ?cause, "Connection closed with error"); + } else { + trace!(?peer_id, "Connection closed"); + } + }, _ => { trace!(event = ?swarm_message, "Unhandled swarm event"); }, From 21f80ce8018743e65ccc628f741237163dadd6f8 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 3 Nov 2025 14:57:58 +0100 Subject: [PATCH 16/16] fix: log handshake response send failures Replace silent failure handling with trace-level logging when handshake response cannot be sent. This occurs when the response channel is already closed (peer disconnected), which is not an error condition but useful for debugging handshake flows. Uses trace level as this is an expected race condition that doesn't require operator attention. --- anchor/network/src/handshake/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 099256bcc..77242ffb9 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -96,7 +96,16 @@ impl Behaviour { trace!(?peer_id, "handling handshake request"); // Send our info back to the peer - let _ = self.inner.send_response(channel, self.node_info.clone()); + if self + .inner + .send_response(channel, self.node_info.clone()) + .is_err() + { + trace!( + ?peer_id, + "Failed to send handshake response (channel closed)" + ); + } // Verify network compatibility and emit event self.verify_and_emit_event(peer_id, request);