Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 24 additions & 31 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ where
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<SubstreamId>,

/// Pending outbound substream requests.
requested_streams: VecDeque<(Id, RequestType<E>)>,

/// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID.
current_inbound_substream_id: SubstreamId,

Expand Down Expand Up @@ -233,6 +236,7 @@ where
outbound_substreams: FnvHashMap::default(),
inbound_substreams_delay: DelayQueue::new(),
outbound_substreams_delay: DelayQueue::new(),
requested_streams: Default::default(),
current_inbound_substream_id: SubstreamId(0),
current_outbound_substream_id: SubstreamId(0),
state: HandlerState::Active,
Expand Down Expand Up @@ -330,7 +334,7 @@ where
type ToBehaviour = HandlerEvent<Id, E>;
type InboundProtocol = RPCProtocol<E>;
type OutboundProtocol = OutboundRequestContainer<E>;
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
Expand All @@ -353,13 +357,10 @@ where
!matches!(self.state, HandlerState::Deactivated)
}

#[allow(deprecated)]
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
Expand Down Expand Up @@ -782,17 +783,17 @@ where
if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1;
let (id, req) = self.dial_queue.remove(0);
self.requested_streams.push_back((id, req.clone()));
self.dial_queue.shrink_to_fit();
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
OutboundRequestContainer {
req: req.clone(),
req,
fork_context: self.fork_context.clone(),
max_rpc_size: self.listen_protocol().upgrade().max_rpc_size,
},
(),
)
.map_info(|()| (id, req)),
),
});
}

Expand All @@ -815,27 +816,20 @@ where
Poll::Pending
}

#[allow(deprecated)]
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol,
info: _,
}) => self.on_fully_negotiated_inbound(protocol),
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info,
}) => self.on_fully_negotiated_outbound(protocol, info),
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
self.on_dial_upgrade_error(info, error)
protocol, ..
}) => self.on_fully_negotiated_outbound(protocol),
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.on_dial_upgrade_error(error)
}
_ => {
// NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on
Expand Down Expand Up @@ -944,11 +938,11 @@ where
self.current_inbound_substream_id.0 += 1;
}

fn on_fully_negotiated_outbound(
&mut self,
substream: OutboundFramed<Stream, E>,
(id, request): (Id, RequestType<E>),
) {
fn on_fully_negotiated_outbound(&mut self, substream: OutboundFramed<Stream, E>) {
let (id, request) = self
.requested_streams
.pop_front()
.expect("opened a stream without and Id and RequesType");
self.dial_negotiated -= 1;
// Reset any io-retries counter.
self.outbound_io_error_retries = 0;
Expand Down Expand Up @@ -1002,15 +996,14 @@ where
self.current_outbound_substream_id.0 += 1;
}
}
fn on_dial_upgrade_error(
&mut self,
request_info: (Id, RequestType<E>),
error: StreamUpgradeError<RPCError>,
) {
fn on_dial_upgrade_error(&mut self, error: StreamUpgradeError<RPCError>) {
// This dialing is now considered failed
self.dial_negotiated -= 1;

let (id, req) = request_info;
let (id, req) = self
.requested_streams
.pop_front()
.expect("opened a stream without an Id and RequestType");

// map the error
let error = match error {
Expand Down
Loading