From 361ebf4c7929793d8d2cd81570aeb390a935ad29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 27 Jun 2025 22:50:11 +0100 Subject: [PATCH 1/4] Error from RPC `send_response` when request doesn't exist on the active inbound requests. And handle the error from the main service to check if it may be a data race or a critical bug --- beacon_node/lighthouse_network/src/rpc/mod.rs | 14 ++-- .../lighthouse_network/src/service/mod.rs | 66 +++++++------------ beacon_node/network/src/service.rs | 10 +-- 3 files changed, 37 insertions(+), 53 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 0619908bb6d..6f84b02fe2b 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, instrument, trace}; use types::{EthSpec, ForkContext}; pub(crate) use handler::{HandlerErr, HandlerEvent}; @@ -199,8 +199,7 @@ impl RPC { } /// Sends an RPC response. - /// - /// The peer must be connected for this to succeed. + /// Returns an `Err` if the request does exist in the active inbound requests list. #[instrument(parent = None, level = "trace", fields(service = "libp2p_rpc"), @@ -212,11 +211,10 @@ impl RPC { peer_id: PeerId, request_id: InboundRequestId, response: RpcResponse, - ) { + ) -> Result<(), RpcResponse> { let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id) else { - error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent"); - return; + return Err(response); }; // Add the request back to active requests if the response is `Success` and requires stream @@ -229,6 +227,7 @@ impl RPC { } self.send_response_inner(peer_id, request_type.protocol(), request_id, response); + Ok(()) } fn send_response_inner( @@ -506,7 +505,8 @@ where RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data: self.seq_number, })), - ); + ) + .expect("Request to exist"); } self.events.push(ToSwarm::GenerateEvent(RPCMessage { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index e2c6f244058..79470918984 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY use crate::rpc::methods::MetadataRequest; use crate::rpc::{ GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage, - RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, - RpcSuccessResponse, RPC, + RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC, }; use crate::types::{ all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, @@ -39,7 +38,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; @@ -1146,35 +1145,27 @@ impl Network { name = "libp2p", skip_all )] - pub fn send_response( + pub fn send_response>>( &mut self, peer_id: PeerId, inbound_request_id: InboundRequestId, - response: Response, + response: T, ) { - self.eth2_rpc_mut() - .send_response(peer_id, inbound_request_id, response.into()) - } + let response = response.into(); + if !self.network_globals.peers.read().is_connected(&peer_id) { + trace!(%peer_id, ?inbound_request_id, %response, "Discarding response, peer is not connected"); + } - /// Inform the peer that their request produced an error. - #[instrument(parent = None, - level = "trace", - fields(service = "libp2p"), - name = "libp2p", - skip_all - )] - pub fn send_error_response( - &mut self, - peer_id: PeerId, - inbound_request_id: InboundRequestId, - error: RpcErrorResponse, - reason: String, - ) { - self.eth2_rpc_mut().send_response( - peer_id, - inbound_request_id, - RpcResponse::Error(error, reason.into()), - ) + if let Err(response) = + self.eth2_rpc_mut() + .send_response(peer_id, inbound_request_id, response) + { + if self.network_globals.peers.read().is_connected(&peer_id) { + error!(%peer_id, ?inbound_request_id, %response, + "Request not found in RPC active requests while peer is still connected" + ); + } + } } /* Peer management functions */ @@ -1460,19 +1451,6 @@ impl Network { name = "libp2p", skip_all )] - fn send_meta_data_response( - &mut self, - _req: MetadataRequest, - inbound_request_id: InboundRequestId, - peer_id: PeerId, - ) { - let metadata = self.network_globals.local_metadata.read().clone(); - // The encoder is responsible for sending the negotiated version of the metadata - let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata))); - self.eth2_rpc_mut() - .send_response(peer_id, inbound_request_id, event); - } - // RPC Propagation methods /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. #[must_use = "return the response"] @@ -1760,9 +1738,13 @@ impl Network { self.peer_manager_mut().ping_request(&peer_id, ping.data); None } - RequestType::MetaData(req) => { + RequestType::MetaData(_req) => { // send the requested meta-data - self.send_meta_data_response(req, inbound_request_id, peer_id); + let metadata = self.network_globals.local_metadata.read().clone(); + // The encoder is responsible for sending the negotiated version of the metadata + let response = + RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata))); + self.send_response(peer_id, inbound_request_id, response); None } RequestType::Goodbye(reason) => { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0a6d5152322..89f71dc3672 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -11,6 +11,7 @@ use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; +use lighthouse_network::rpc::methods::RpcResponse; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::RequestType; use lighthouse_network::service::Network; @@ -627,10 +628,11 @@ impl NetworkService { error, inbound_request_id, reason, - } => { - self.libp2p - .send_error_response(peer_id, inbound_request_id, error, reason); - } + } => self.libp2p.send_response( + peer_id, + inbound_request_id, + RpcResponse::Error(error, reason.into()), + ), NetworkMessage::ValidationResult { propagation_source, message_id, From 9482db0e74d5020b6bff48411bafa4c33a4cff80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 7 Jul 2025 16:15:21 +0100 Subject: [PATCH 2/4] do not remove on disconnect, rather mark the request --- beacon_node/lighthouse_network/src/rpc/mod.rs | 59 +++++++++++++++---- .../lighthouse_network/src/service/mod.rs | 11 +--- 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 6f84b02fe2b..f4b41c8114f 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -98,6 +98,13 @@ pub struct InboundRequestId { substream_id: SubstreamId, } +// An Active inbound request received via Rpc. +struct ActiveInboundRequest { + pub peer_id: PeerId, + pub request_type: RequestType, + pub peer_disconnected: bool, +} + impl InboundRequestId { /// Creates an _unchecked_ [`InboundRequestId`]. /// @@ -150,7 +157,7 @@ pub struct RPC { /// Rate limiter for our own requests. outbound_request_limiter: SelfRateLimiter, /// Active inbound requests that are awaiting a response. - active_inbound_requests: HashMap)>, + active_inbound_requests: HashMap>, /// Queue of events to be processed. events: Vec>, fork_context: Arc, @@ -208,22 +215,37 @@ impl RPC { )] pub fn send_response( &mut self, - peer_id: PeerId, request_id: InboundRequestId, response: RpcResponse, ) -> Result<(), RpcResponse> { - let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id) + let Some(ActiveInboundRequest { + peer_id, + request_type, + peer_disconnected, + }) = self.active_inbound_requests.remove(&request_id) else { return Err(response); }; + if peer_disconnected { + trace!(%peer_id, ?request_id, %response, + "Discarding response, peer is no longer connected"); + return Ok(()); + } + // Add the request back to active requests if the response is `Success` and requires stream // termination. if request_type.protocol().terminator().is_some() && matches!(response, RpcResponse::Success(_)) { - self.active_inbound_requests - .insert(request_id, (peer_id, request_type.clone())); + self.active_inbound_requests.insert( + request_id, + ActiveInboundRequest { + peer_id, + request_type: request_type.clone(), + peer_disconnected: false, + }, + ); } self.send_response_inner(peer_id, request_type.protocol(), request_id, response); @@ -424,9 +446,10 @@ where self.events.push(error_msg); } - self.active_inbound_requests.retain( - |_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id, - ); + self.active_inbound_requests + .values_mut() + .filter(|request| request.peer_id == peer_id) + .for_each(|request| request.peer_disconnected = true); if let Some(limiter) = self.response_limiter.as_mut() { limiter.peer_disconnected(peer_id); @@ -467,7 +490,14 @@ where .active_inbound_requests .iter() .filter( - |(_inbound_request_id, (request_peer_id, active_request_type))| { + |( + _inbound_request_id, + ActiveInboundRequest { + peer_id: request_peer_id, + request_type: active_request_type, + .. + }, + )| { *request_peer_id == peer_id && active_request_type.protocol() == request_type.protocol() }, @@ -493,14 +523,19 @@ where } // Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests. - self.active_inbound_requests - .insert(request_id, (peer_id, request_type.clone())); + self.active_inbound_requests.insert( + request_id, + ActiveInboundRequest { + peer_id, + request_type: request_type.clone(), + peer_disconnected: false, + }, + ); // If we received a Ping, we queue a Pong response. if let RequestType::Ping(_) = request_type { trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong"); self.send_response( - peer_id, request_id, RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data: self.seq_number, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 79470918984..59ca5478b14 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1151,14 +1151,9 @@ impl Network { inbound_request_id: InboundRequestId, response: T, ) { - let response = response.into(); - if !self.network_globals.peers.read().is_connected(&peer_id) { - trace!(%peer_id, ?inbound_request_id, %response, "Discarding response, peer is not connected"); - } - - if let Err(response) = - self.eth2_rpc_mut() - .send_response(peer_id, inbound_request_id, response) + if let Err(response) = self + .eth2_rpc_mut() + .send_response(inbound_request_id, response.into()) { if self.network_globals.peers.read().is_connected(&peer_id) { error!(%peer_id, ?inbound_request_id, %response, From 29686e878db47181d9764459077333caa0e4ff95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 8 Jul 2025 00:44:45 +0100 Subject: [PATCH 3/4] address Akihito review --- beacon_node/lighthouse_network/src/rpc/mod.rs | 14 +++++++------- beacon_node/lighthouse_network/src/service/mod.rs | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index f4b41c8114f..46e1889bafe 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -227,12 +227,6 @@ impl RPC { return Err(response); }; - if peer_disconnected { - trace!(%peer_id, ?request_id, %response, - "Discarding response, peer is no longer connected"); - return Ok(()); - } - // Add the request back to active requests if the response is `Success` and requires stream // termination. if request_type.protocol().terminator().is_some() @@ -243,11 +237,17 @@ impl RPC { ActiveInboundRequest { peer_id, request_type: request_type.clone(), - peer_disconnected: false, + peer_disconnected, }, ); } + if peer_disconnected { + trace!(%peer_id, ?request_id, %response, + "Discarding response, peer is no longer connected"); + return Ok(()); + } + self.send_response_inner(peer_id, request_type.protocol(), request_id, response); Ok(()) } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 59ca5478b14..0f5745a3a27 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1157,7 +1157,7 @@ impl Network { { if self.network_globals.peers.read().is_connected(&peer_id) { error!(%peer_id, ?inbound_request_id, %response, - "Request not found in RPC active requests while peer is still connected" + "Request not found in RPC active requests" ); } } From 093b6598e918cdce9693ea89d89768109a19c2a5 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Wed, 9 Jul 2025 07:30:59 +0900 Subject: [PATCH 4/4] Respect peer_disconnected when counting active requests --- beacon_node/lighthouse_network/src/rpc/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 46e1889bafe..9807387a172 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -495,11 +495,12 @@ where ActiveInboundRequest { peer_id: request_peer_id, request_type: active_request_type, - .. + peer_disconnected, }, )| { *request_peer_id == peer_id && active_request_type.protocol() == request_type.protocol() + && !peer_disconnected }, ) .count()