Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 70 additions & 5 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,9 @@ impl<E: EthSpec> PeerManager<E> {
}

/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) {
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) -> bool {
let mut invalid_meta_data = false;
let mut updated_cgc = false;

if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() {
Expand All @@ -729,12 +730,16 @@ impl<E: EthSpec> PeerManager<E> {
debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata");
}

let known_custody_group_count = peer_info
.meta_data()
.and_then(|meta_data| meta_data.custody_group_count().copied().ok());

let custody_group_count_opt = meta_data.custody_group_count().copied().ok();
peer_info.set_meta_data(meta_data);

if self.network_globals.spec.is_peer_das_scheduled() {
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
// prioritize PeerDAS peers.
// Gracefully ignore metadata/v2 peers.
// We only send metadata v3 requests when PeerDAS is scheduled
if let Some(custody_group_count) = custody_group_count_opt {
match self.compute_peer_custody_groups(peer_id, custody_group_count) {
Ok(custody_groups) => {
Expand All @@ -755,6 +760,8 @@ impl<E: EthSpec> PeerManager<E> {
})
.collect();
peer_info.set_custody_subnets(custody_subnets);

updated_cgc = Some(custody_group_count) != known_custody_group_count;
}
Err(err) => {
debug!(
Expand All @@ -777,6 +784,8 @@ impl<E: EthSpec> PeerManager<E> {
if invalid_meta_data {
self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager)
}

updated_cgc
}

/// Updates the gossipsub scores for all known peers in gossipsub.
Expand Down Expand Up @@ -1487,6 +1496,15 @@ impl<E: EthSpec> PeerManager<E> {
pub fn remove_trusted_peer(&mut self, enr: Enr) {
self.trusted_peers.remove(&enr);
}

#[cfg(test)]
fn custody_subnet_count_for_peer(&self, peer_id: &PeerId) -> Option<usize> {
self.network_globals
.peers
.read()
.peer_info(peer_id)
.map(|peer_info| peer_info.custody_subnets_iter().count())
}
}

enum ConnectingType {
Expand All @@ -1507,8 +1525,9 @@ enum ConnectingType {
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc::MetaDataV3;
use crate::NetworkConfig;
use types::MainnetEthSpec as E;
use types::{ChainSpec, ForkName, MainnetEthSpec as E};

async fn build_peer_manager(target_peer_count: usize) -> PeerManager<E> {
build_peer_manager_with_trusted_peers(vec![], target_peer_count).await
Expand All @@ -1517,6 +1536,15 @@ mod tests {
async fn build_peer_manager_with_trusted_peers(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
) -> PeerManager<E> {
let spec = Arc::new(E::default_spec());
build_peer_manager_with_opts(trusted_peers, target_peer_count, spec).await
}

async fn build_peer_manager_with_opts(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
spec: Arc<ChainSpec>,
) -> PeerManager<E> {
let config = config::Config {
target_peer_count,
Expand All @@ -1527,7 +1555,6 @@ mod tests {
target_peers: target_peer_count,
..Default::default()
});
let spec = Arc::new(E::default_spec());
let globals = NetworkGlobals::new_test_globals(trusted_peers, network_config, spec);
PeerManager::new(config, Arc::new(globals)).unwrap()
}
Expand Down Expand Up @@ -1878,6 +1905,44 @@ mod tests {
assert!(peers_should_have_removed.is_empty());
}

#[tokio::test]
/// Test a metadata response should update custody subnets
async fn test_peer_manager_update_custody_subnets() {
// PeerDAS is enabled from Fulu.
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let mut peer_manager = build_peer_manager_with_opts(vec![], 1, spec).await;
let pubkey = Keypair::generate_secp256k1().public();
let peer_id = PeerId::from_public_key(&pubkey);
peer_manager.inject_connect_ingoing(
&peer_id,
Multiaddr::empty().with_p2p(peer_id).unwrap(),
None,
);

// A newly connected peer should have no custody subnets before metadata is received.
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(0));

// Metadata should update the custody subnets.
let peer_cgc = 4;
let meta_data = MetaData::V3(MetaDataV3 {
seq_number: 0,
attnets: Default::default(),
syncnets: Default::default(),
custody_group_count: peer_cgc,
});
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data.clone());
assert!(cgc_updated);
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));

// Make another update and assert that CGC is not updated.
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data);
assert!(!cgc_updated);
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));
}

#[tokio::test]
/// Test the pruning logic to remove grouped subnet peers
async fn test_peer_manager_prune_grouped_subnet_peers() {
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage};
use crate::rpc::{
methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage},
MetaData,
};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use types::{
Expand Down Expand Up @@ -134,6 +137,8 @@ pub enum AppRequestId {
// `RPCCodedResponse`.
#[derive(Debug, Clone, PartialEq)]
pub enum Response<E: EthSpec> {
/// A Metadata message.
MetaData(Arc<MetaData<E>>, /* updated_cgc */ bool),
/// A Status message.
Status(StatusMessage),
/// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch.
Expand Down Expand Up @@ -185,6 +190,7 @@ impl<E: EthSpec> std::convert::From<Response<E>> for RpcResponse<E> {
Some(d) => RpcResponse::Success(RpcSuccessResponse::DataColumnsByRange(d)),
None => RpcResponse::StreamTermination(ResponseTermination::DataColumnsByRange),
},
Response::MetaData(m, _) => RpcResponse::Success(RpcSuccessResponse::MetaData(m)),
Response::Status(s) => RpcResponse::Success(RpcSuccessResponse::Status(s)),
Response::LightClientBootstrap(b) => {
RpcResponse::Success(RpcSuccessResponse::LightClientBootstrap(b))
Expand Down
14 changes: 11 additions & 3 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ impl<E: EthSpec> Network<E> {
return None;
}

// The METADATA and PING RPC responses are handled within the behaviour and not propagated
// The PING RPC responses are handled within the behaviour and not propagated
match event.message {
Err(handler_err) => {
match handler_err {
Expand Down Expand Up @@ -1858,9 +1858,17 @@ impl<E: EthSpec> Network<E> {
None
}
RpcSuccessResponse::MetaData(meta_data) => {
self.peer_manager_mut()
let updated_cgc = self
.peer_manager_mut()
.meta_data_response(&peer_id, meta_data.as_ref().clone());
None
// Send event after calling into peer_manager so the PeerDB is updated.
// Manually build the response here, as we want to propagate the METADATA
// message upwards, even though it's initiated from network.
Some(NetworkEvent::ResponseReceived {
peer_id,
app_request_id: id,
response: Response::MetaData(meta_data, updated_cgc),
})
}
/* Network propagated protocols */
RpcSuccessResponse::Status(msg) => {
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ impl<T: BeaconChainTypes> Router<T> {
response: Response<T::EthSpec>,
) {
match response {
Response::MetaData(_meta_data, updated_cgc) => {
self.on_meta_data_response(peer_id, updated_cgc)
}
Response::Status(status_message) => {
debug!(%peer_id, ?status_message,"Received Status Response");
self.handle_beacon_processor_send_result(
Expand Down Expand Up @@ -553,6 +556,12 @@ impl<T: BeaconChainTypes> Router<T> {
)
}

pub fn on_meta_data_response(&mut self, peer_id: PeerId, updated_cgc: bool) {
if updated_cgc {
self.send_to_sync(SyncMessage::UpdatedPeerCgc(peer_id));
}
}

/// Handle a `BlocksByRange` response from the peer.
/// A `beacon_block` behaves as a stream which is terminated on a `None` response.
pub fn on_blocks_by_range_response(
Expand Down
20 changes: 20 additions & 0 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub enum SyncMessage<E: EthSpec> {
head_slot: Option<Slot>,
},

/// Peer manager has received a MetaData of a peer with a new or updated CGC value.
UpdatedPeerCgc(PeerId),

/// A block has been received from the RPC.
RpcBlock {
sync_request_id: SyncRequestId,
Expand Down Expand Up @@ -476,6 +479,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}

fn updated_peer_cgc(&mut self, _peer_id: PeerId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to resume by range request as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, I resume range sync aswell

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I'll run some tests today to confirm this fixes the issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like this fixes the issue. I still don't see range requests until a few minutes later until we add a new finalized chain.

Right after startup, waiting for custody peers

Feb 12 04:33:07.181 DEBG Waiting for peers to be available on sampling column subnets, chain: 1, service: range_sync, service: sync, module: network::sync::range_sync::chain:1057

Got peer metadata response after 15s

Feb 12 04:33:22.271 DEBG Obtained peer's metadata, new_seq_no: 6, peer_id: 16Uiu2HAmJyaVGkRGR9ACqompkoge8T2x4KFH4KbzDkh7zz6uN2JX, service: libp2p, module: lighthouse_network::peer_manager:732

No range requests until ~5 mins later

Feb 12 04:38:07.303 DEBG Finalization sync peer joined, peer_id: 16Uiu2HAmJyaVGkRGR9ACqompkoge8T2x4KFH4KbzDkh7zz6uN2JX, service: range_sync, service: sync, module: network::sync::range_sync::range:143
Feb 12 04:38:07.305 DEBG New chain added to sync, id: 2, from: 38, to: 1071, end_root: 0x3be00d7ce6e52f7938fd588d909055f72469d0f09ce545d7b23077f2d6b40e8a, current_target: 38, batches: 0, peers: 1, state: Stopped, sync_type: Finalized, peer_id: 16Uiu2HAmJyaVGkRGR9ACqompk
oge8T2x4KFH4KbzDkh7zz6uN2JX, service: range_sync, service: sync, module: network::sync::range_sync::chain_collection:506
Feb 12 04:38:07.306 DEBG Sync RPC request sent, id: 4/3/RangeSync/39/1, peer: 16Uiu2HAmAAZ5wP6fvpe1b9tWNmgA2Wn8MsrNYVhkw5WohcAoaHKR, epoch: 39, slots: 32, method: BlocksByRange, service: sync, module: network::sync::network_context:788
Feb 12 04:38:07.307 DEBG Sync RPC request sent, id: 5/3/RangeSync/39/1, peer: 16Uiu2HAm9PijSZpm5QUphXRoBtkhUZPkGJ4Rgxk4Bny91oZPYZLG, columns: [74, 30, 39, 19, 63, 41, 52, 47, 58], epoch: 39, slots: 32, method: DataColumnsByRange, service: sync, module: network::sync::network_context:870

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you saw this log? Received updated peer CGC message

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall seeing this. I think I was using the right locally-built image, but can be worth re-testing to confirm if you have time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dapplion could you retest this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've retested this, and it doesn't seem to trigger retry after obtaining the peers metadata, and the Received updated peer CGC message log was not observed.

May 06 07:16:46.854 DEBUG Waiting for peers to be available on custody column subnets chain: 1service: "range_sync"
...
# obtained peers metadata
May 06 07:17:01.389 DEBUG Obtained peer's metadata                      peer_id: 16Uiu2HAkz7SLRbDFNscs6RFDrD37oB5yCR9UaBcVpAiy1G54BuQW, new_seq_no: 6, service: "network"
May 06 07:17:01.393 DEBUG Obtained peer's metadata                      peer_id: 16Uiu2HAmKjMmG6VJG2oi5Gd7x4iszsv4Fm9JZWeEad62kTVF9dke, new_seq_no: 132, service: "network"
...
# no range request until ~5 mins later when a new chain is added
May 06 07:21:46.397 DEBUG New chain added to sync                       peer_id: "16Uiu2HAkz7SLRbDFNscs6RFDrD37oB5yCR9UaBcVpAiy1G54BuQW", sync_type: Finalized, id: 5, start_epoch: 0, target_head_slot: 193, target_head_root: 0xc1802494a0935c8bee449a412f982cc1903ec791565d
9cf9eaab233f2cd2bc90, component: "range_sync"
May 06 07:21:46.397 DEBUG Sync RPC request sent                         method: "DataColumnsByRange", slots: 32, epoch: 1, columns: [8, 69, 99, 90, 9, 84, 24, 71, 12, 121, 50, 68, 127, 66, 36, 26, 41, 107, 47, 52, 108, 98, 70, 100, 54, 35, 21, 60, 49, 120, 10, 72, 44, 93, 67, 0, 122, 19, 43, 62, 97, 115, 59, 95, 48, 11, 101, 116, 34, 20, 25, 18, 3, 124, 110, 57, 46, 83, 77, 105, 81, 106, 1, 45, 104, 22, 29, 53, 6, 32, 13, 119, 4, 78, 63, 86, 92, 7, 114, 73, 16, 17, 109, 28, 75, 102, 80, 40, 79, 51, 125, 38, 85, 89, 42, 39, 126, 113, 87, 96, 37, 88, 64, 112, 14, 118, 76, 117, 58, 82, 27, 94, 74, 23, 30, 111, 15, 2, 123, 33, 55, 5, 65, 91, 31, 56], peer: 16Uiu2HAmKjMmG6VJG2oi5Gd7x4iszsv4Fm9JZWeEad62kTVF9dke, id: 8/6/RangeSync/1/1, chain: 1, service: "range_sync"
May 06 07:21:46.397 DEBUG Sync RPC request sent                         method: "DataColumnsByRange", slots: 32, epoch: 1, columns: [61, 103], peer: 16Uiu2HAkz7SLRbDFNscs6RFDrD37oB5yCR9UaBcVpAiy1G54BuQW, id: 9/6/RangeSync/1/1, chain: 1, service: "range_sync"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Try to make progress on custody requests that are waiting for peers
for (id, result) in self.network.continue_custody_by_root_requests() {
self.on_custody_by_root_result(id, result);
}

// Attempt to resume range sync too
self.range_sync.resume(&mut self.network);
}

/// Handles RPC errors related to requests that were emitted from the sync manager.
fn inject_error(&mut self, peer_id: PeerId, sync_request_id: SyncRequestId, error: RPCError) {
trace!("Sync manager received a failed RPC");
Expand Down Expand Up @@ -750,6 +763,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => {
self.add_peers_force_range_sync(&peers, head_root, head_slot);
}
SyncMessage::UpdatedPeerCgc(peer_id) => {
debug!(
peer_id = ?peer_id,
"Received updated peer CGC message"
);
self.updated_peer_cgc(peer_id);
}
SyncMessage::RpcBlock {
sync_request_id,
peer_id,
Expand Down
Loading