diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 0619908bb6d..9807387a172 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, instrument, trace}; use types::{EthSpec, ForkContext}; pub(crate) use handler::{HandlerErr, HandlerEvent}; @@ -98,6 +98,13 @@ pub struct InboundRequestId { substream_id: SubstreamId, } +// An Active inbound request received via Rpc. +struct ActiveInboundRequest { + pub peer_id: PeerId, + pub request_type: RequestType, + pub peer_disconnected: bool, +} + impl InboundRequestId { /// Creates an _unchecked_ [`InboundRequestId`]. /// @@ -150,7 +157,7 @@ pub struct RPC { /// Rate limiter for our own requests. outbound_request_limiter: SelfRateLimiter, /// Active inbound requests that are awaiting a response. - active_inbound_requests: HashMap)>, + active_inbound_requests: HashMap>, /// Queue of events to be processed. events: Vec>, fork_context: Arc, @@ -199,8 +206,7 @@ impl RPC { } /// Sends an RPC response. - /// - /// The peer must be connected for this to succeed. + /// Returns an `Err` if the request does exist in the active inbound requests list. #[instrument(parent = None, level = "trace", fields(service = "libp2p_rpc"), @@ -209,14 +215,16 @@ impl RPC { )] pub fn send_response( &mut self, - peer_id: PeerId, request_id: InboundRequestId, response: RpcResponse, - ) { - let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id) + ) -> Result<(), RpcResponse> { + let Some(ActiveInboundRequest { + peer_id, + request_type, + peer_disconnected, + }) = self.active_inbound_requests.remove(&request_id) else { - error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent"); - return; + return Err(response); }; // Add the request back to active requests if the response is `Success` and requires stream @@ -224,11 +232,24 @@ impl RPC { if request_type.protocol().terminator().is_some() && matches!(response, RpcResponse::Success(_)) { - self.active_inbound_requests - .insert(request_id, (peer_id, request_type.clone())); + self.active_inbound_requests.insert( + request_id, + ActiveInboundRequest { + peer_id, + request_type: request_type.clone(), + peer_disconnected, + }, + ); + } + + if peer_disconnected { + trace!(%peer_id, ?request_id, %response, + "Discarding response, peer is no longer connected"); + return Ok(()); } self.send_response_inner(peer_id, request_type.protocol(), request_id, response); + Ok(()) } fn send_response_inner( @@ -425,9 +446,10 @@ where self.events.push(error_msg); } - self.active_inbound_requests.retain( - |_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id, - ); + self.active_inbound_requests + .values_mut() + .filter(|request| request.peer_id == peer_id) + .for_each(|request| request.peer_disconnected = true); if let Some(limiter) = self.response_limiter.as_mut() { limiter.peer_disconnected(peer_id); @@ -468,9 +490,17 @@ where .active_inbound_requests .iter() .filter( - |(_inbound_request_id, (request_peer_id, active_request_type))| { + |( + _inbound_request_id, + ActiveInboundRequest { + peer_id: request_peer_id, + request_type: active_request_type, + peer_disconnected, + }, + )| { *request_peer_id == peer_id && active_request_type.protocol() == request_type.protocol() + && !peer_disconnected }, ) .count() @@ -494,19 +524,25 @@ where } // Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests. - self.active_inbound_requests - .insert(request_id, (peer_id, request_type.clone())); + self.active_inbound_requests.insert( + request_id, + ActiveInboundRequest { + peer_id, + request_type: request_type.clone(), + peer_disconnected: false, + }, + ); // If we received a Ping, we queue a Pong response. if let RequestType::Ping(_) = request_type { trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong"); self.send_response( - peer_id, request_id, RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data: self.seq_number, })), - ); + ) + .expect("Request to exist"); } self.events.push(ToSwarm::GenerateEvent(RPCMessage { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index e2c6f244058..0f5745a3a27 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY use crate::rpc::methods::MetadataRequest; use crate::rpc::{ GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage, - RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, - RpcSuccessResponse, RPC, + RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC, }; use crate::types::{ all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, @@ -39,7 +38,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; @@ -1146,35 +1145,22 @@ impl Network { name = "libp2p", skip_all )] - pub fn send_response( + pub fn send_response>>( &mut self, peer_id: PeerId, inbound_request_id: InboundRequestId, - response: Response, + response: T, ) { - self.eth2_rpc_mut() - .send_response(peer_id, inbound_request_id, response.into()) - } - - /// Inform the peer that their request produced an error. - #[instrument(parent = None, - level = "trace", - fields(service = "libp2p"), - name = "libp2p", - skip_all - )] - pub fn send_error_response( - &mut self, - peer_id: PeerId, - inbound_request_id: InboundRequestId, - error: RpcErrorResponse, - reason: String, - ) { - self.eth2_rpc_mut().send_response( - peer_id, - inbound_request_id, - RpcResponse::Error(error, reason.into()), - ) + if let Err(response) = self + .eth2_rpc_mut() + .send_response(inbound_request_id, response.into()) + { + if self.network_globals.peers.read().is_connected(&peer_id) { + error!(%peer_id, ?inbound_request_id, %response, + "Request not found in RPC active requests" + ); + } + } } /* Peer management functions */ @@ -1460,19 +1446,6 @@ impl Network { name = "libp2p", skip_all )] - fn send_meta_data_response( - &mut self, - _req: MetadataRequest, - inbound_request_id: InboundRequestId, - peer_id: PeerId, - ) { - let metadata = self.network_globals.local_metadata.read().clone(); - // The encoder is responsible for sending the negotiated version of the metadata - let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata))); - self.eth2_rpc_mut() - .send_response(peer_id, inbound_request_id, event); - } - // RPC Propagation methods /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. #[must_use = "return the response"] @@ -1760,9 +1733,13 @@ impl Network { self.peer_manager_mut().ping_request(&peer_id, ping.data); None } - RequestType::MetaData(req) => { + RequestType::MetaData(_req) => { // send the requested meta-data - self.send_meta_data_response(req, inbound_request_id, peer_id); + let metadata = self.network_globals.local_metadata.read().clone(); + // The encoder is responsible for sending the negotiated version of the metadata + let response = + RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata))); + self.send_response(peer_id, inbound_request_id, response); None } RequestType::Goodbye(reason) => { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0a6d5152322..89f71dc3672 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -11,6 +11,7 @@ use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; +use lighthouse_network::rpc::methods::RpcResponse; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::RequestType; use lighthouse_network::service::Network; @@ -627,10 +628,11 @@ impl NetworkService { error, inbound_request_id, reason, - } => { - self.libp2p - .send_error_response(peer_id, inbound_request_id, error, reason); - } + } => self.libp2p.send_response( + peer_id, + inbound_request_id, + RpcResponse::Error(error, reason.into()), + ), NetworkMessage::ValidationResult { propagation_source, message_id,