diff --git a/Cargo.lock b/Cargo.lock index 0fa4f900a..644090c9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1939,37 +1939,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "discv5" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e6b70634e26c909d1edbb3142b3eaf3b89da0e52f284f00ca7c80d9901ad9e" -dependencies = [ - "aes 0.8.4", - "aes-gcm", - "alloy-rlp", - "arrayvec", - "ctr 0.9.2", - "delay_map 0.4.0", - "enr 0.12.1", - "fnv", - "futures", - "hashlink 0.9.1", - "hex", - "hkdf", - "lazy_static", - "lru", - "more-asserts", - "parking_lot 0.12.3", - "rand", - "smallvec", - "socket2 0.5.8", - "tokio", - "tracing", - "uint 0.10.0", - "zeroize", -] - [[package]] name = "discv5" version = "0.9.0" @@ -5295,7 +5264,7 @@ version = "0.1.0" dependencies = [ "async-channel", "dirs 5.0.1", - "discv5 0.8.0", + "discv5 0.9.0", "futures", "libp2p", "lighthouse_network 0.2.0 (git+https://github.com/sigp/lighthouse?branch=unstable)", diff --git a/Cargo.toml b/Cargo.toml index aa1e61d69..6faa7e66e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ derive_more = { version = "1.0.0", features = ["full"] } async-channel = "1.9" axum = "0.7.7" clap = { version = "4.5.15", features = ["derive", "wrap_help"] } -discv5 = "0.8.0" +discv5 = "0.9.0" dirs = "5.0.1" either = "1.13.0" futures = "0.3.30" diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index 053c86fd0..944f438ae 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -329,6 +329,14 @@ pub struct Anchor { help_heading = FLAG_HEADER )] help: Option, + #[clap( + long, + global = true, + value_delimiter = ',', + help = "One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network", + display_order = 0 + )] + pub boot_nodes_enr: Vec, } pub fn get_color_style() -> Styles { diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 9cb8e1ead..8f8a12a17 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -131,6 +131,25 @@ pub fn from_cli(cli_args: &Anchor) -> Result { */ config.network.listen_addresses = parse_listening_addresses(cli_args)?; + for addr in cli_args.boot_nodes_enr.clone() { + match addr.parse() { + Ok(enr) => config.network.boot_nodes_enr.push(enr), + Err(_) => { + // parsing as ENR failed, try as Multiaddr + // let multi: Multiaddr = addr + // .parse() + // .map_err(|_| format!("Not valid as ENR nor Multiaddr: {}", addr))?; + // if !multi.iter().any(|proto| matches!(proto, Protocol::Udp(_))) { + // slog::error!(log, "Missing UDP in Multiaddr {}", multi.to_string()); + // } + // if !multi.iter().any(|proto| matches!(proto, Protocol::P2p(_))) { + // slog::error!(log, "Missing P2P in Multiaddr {}", multi.to_string()); + // } + // multiaddrs.push(multi); + } + } + } + config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone(); config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone(); diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index f52459a5d..baa2dd177 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,3 +1,4 @@ +use crate::discovery::Discovery; use libp2p::swarm::NetworkBehaviour; use libp2p::{gossipsub, identify, ping}; @@ -9,4 +10,6 @@ pub struct AnchorBehaviour { pub ping: ping::Behaviour, /// The routing pub-sub mechanism for Anchor. pub gossipsub: gossipsub::Behaviour, + /// Discv5 Discovery protocol. + pub discovery: Discovery, } diff --git a/anchor/network/src/config.rs b/anchor/network/src/config.rs index 921c6630a..3587da760 100644 --- a/anchor/network/src/config.rs +++ b/anchor/network/src/config.rs @@ -55,6 +55,9 @@ pub struct Config { /// Disables peer scoring altogether. pub disable_peer_scoring: bool, + /// Disables the discovery protocol from starting. + pub disable_discovery: bool, + /// Disables quic support. pub disable_quic_support: bool, @@ -94,6 +97,7 @@ impl Default for Config { boot_nodes_enr: vec![], boot_nodes_multiaddr: vec![], disable_peer_scoring: false, + disable_discovery: false, disable_quic_support: false, topics: vec![], } diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs new file mode 100644 index 000000000..74640fb23 --- /dev/null +++ b/anchor/network/src/discovery.rs @@ -0,0 +1,468 @@ +use std::collections::HashMap; +use std::future::Future; +use std::net::Ipv4Addr; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use discv5::enr::{CombinedKey, NodeId}; +use discv5::libp2p_identity::{Keypair, PeerId}; +use discv5::multiaddr::Multiaddr; +use discv5::{Discv5, Enr}; +use futures::stream::FuturesUnordered; +use futures::FutureExt; +use futures::{StreamExt, TryFutureExt}; +use libp2p::core::transport::PortUse; +use libp2p::core::Endpoint; +use libp2p::swarm::dummy::ConnectionHandler; +use libp2p::swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, +}; +use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; +use lighthouse_network::discovery::DiscoveredPeers; +use lighthouse_network::{CombinedKeyExt, Subnet}; +use tokio::sync::mpsc; +use tracing::{debug, error, warn}; + +use lighthouse_network::EnrExt; + +use crate::Config; + +/// The number of closest peers to search for when doing a regular peer search. +/// +/// We could reduce this constant to speed up queries however at the cost of security. It will +/// make it easier to peers to eclipse this node. Kademlia suggests a value of 16. +pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; + +#[derive(Debug, Clone, PartialEq)] +struct SubnetQuery { + subnet: Subnet, + min_ttl: Option, + retries: usize, +} + +#[derive(Debug, Clone, PartialEq)] +enum QueryType { + /// We are searching for subnet peers. + Subnet(Vec), + /// We are searching for more peers without ENR or time constraints. + FindPeers, +} + +/// The result of a query. +struct QueryResult { + query_type: QueryType, + result: Result, discv5::QueryError>, +} + +// Awaiting the event stream future +enum EventStream { + /// Awaiting an event stream to be generated. This is required due to the poll nature of + /// `Discovery` + Awaiting( + Pin, discv5::Error>> + Send>>, + ), + /// The future has completed. + Present(mpsc::Receiver), + // The future has failed or discv5 has been disabled. There are no events from discv5. + InActive, +} + +pub struct Discovery { + /// The handle for the underlying discv5 Server. + /// + /// This is behind a Reference counter to allow for futures to be spawned and polled with a + /// static lifetime. + discv5: Discv5, + + /// Indicates if we are actively searching for peers. We only allow a single FindPeers query at + /// a time, regardless of the query concurrency. + find_peer_active: bool, + + /// Active discovery queries. + active_queries: FuturesUnordered + Send>>>, + + /// The discv5 event stream. + event_stream: EventStream, + + /// Indicates if the discovery service has been started. When the service is disabled, this is + /// always false. + pub started: bool, +} + +impl Discovery { + pub async fn new(local_keypair: Keypair, network_config: &Config) -> Result { + let _enr_dir = match network_config.network_dir.to_str() { + Some(path) => String::from(path), + None => String::from(""), + }; + + // TODO handle local enr + + let discv5_listen_config = + discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); + + // discv5 configuration + let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build(); + + // convert the keypair into an ENR key + let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair)?; + + let enr = build_enr(&enr_key, network_config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, discv5_config) + .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in network_config.boot_nodes_enr.clone() { + // TODO if bootnode_enr.node_id() == local_node_id { + // // If we are a boot node, ignore adding it to the routing table + // continue; + // } + debug!( + node_id = %bootnode_enr.node_id(), + peer_id = %bootnode_enr.peer_id(), + ip = ?bootnode_enr.ip4(), + udp = ?bootnode_enr.udp4(), + tcp = ?bootnode_enr.tcp4(), + quic = ?bootnode_enr.quic4(), + "Adding node to routing table", + ); + + let repr = bootnode_enr.to_string(); + if let Err(e) = discv5.add_enr(bootnode_enr) { + error!( + addr = repr, + error = e.to_string(), + "Could not add peer to the local routing table" + ) + }; + } + + // Start the discv5 service and obtain an event stream + let event_stream = if !network_config.disable_discovery { + discv5.start().map_err(|e| e.to_string()).await?; + debug!("Discovery service started"); + EventStream::Awaiting(Box::pin(discv5.event_stream())) + } else { + EventStream::InActive + }; + + if !network_config.boot_nodes_multiaddr.is_empty() { + // TODO info!(log, "Contacting Multiaddr boot-nodes for their ENR"); + } + + // get futures for requesting the Enrs associated to these multiaddr and wait for their + // completion + let mut fut_coll = network_config + .boot_nodes_multiaddr + .iter() + .map(|addr| addr.to_string()) + // request the ENR for this multiaddr and keep the original for logging + .map(|addr| { + futures::future::join( + discv5.request_enr(addr.clone()), + futures::future::ready(addr), + ) + }) + .collect::>(); + + while let Some((result, original_addr)) = fut_coll.next().await { + match result { + Ok(enr) => { + debug!( + node_id = %enr.node_id(), + peer_id = %enr.peer_id(), + ip = ?enr.ip4(), + udp = ?enr.udp4(), + tcp = ?enr.tcp4(), + quic = ?enr.quic4(), + "Adding node to routing table" + ); + let _ = discv5.add_enr(enr).map_err(|e| { + error!( + addr = original_addr.to_string(), + error = e.to_string(), + "Could not add peer to the local routing table" + ) + }); + } + Err(e) => { + error!( + multiaddr = original_addr.to_string(), + error = e.to_string(), + "Error getting mapping to ENR" + ) + } + } + } + + // TODO let update_ports = UpdatePorts { + // tcp4: config.enr_tcp4_port.is_none(), + // tcp6: config.enr_tcp6_port.is_none(), + // quic4: config.enr_quic4_port.is_none(), + // quic6: config.enr_quic6_port.is_none(), + // }; + + Ok(Self { + // cached_enrs: LruCache::new(ENR_CACHE_CAPACITY), + // network_globals, + find_peer_active: false, + // queued_queries: VecDeque::with_capacity(10), + active_queries: FuturesUnordered::new(), + discv5, + event_stream, + started: !network_config.disable_discovery, + // update_ports, + // log, + // enr_dir, + // spec: Arc::new(spec.clone()), + }) + } + + /// This adds a new `FindPeers` query to the queue if one doesn't already exist. + /// The `target_peers` parameter informs discovery to end the query once the target is found. + /// The maximum this can be is 16. + pub fn discover_peers(&mut self, target_peers: usize) { + // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. + if !self.started || self.find_peer_active { + return; + } + // Immediately start a FindNode query + let target_peers = std::cmp::min(FIND_NODE_QUERY_CLOSEST_PEERS, target_peers); + // TODO debug!(self.log, "Starting a peer discovery request"; "target_peers" => target_peers ); + self.find_peer_active = true; + self.start_query(QueryType::FindPeers, target_peers, |_| true); + } + + /// Search for a specified number of new peers using the underlying discovery mechanism. + /// + /// This can optionally search for peers for a given predicate. Regardless of the predicate + /// given, this will only search for peers on the same enr_fork_id as specified in the local + /// ENR. + fn start_query( + &mut self, + query: QueryType, + target_peers: usize, + _additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, + ) { + // let enr_fork_id = match self.local_enr().eth2() { + // Ok(v) => v, + // Err(e) => { + // // TODO crit!(self.log, "Local ENR has no fork id"; "error" => e); + // return; + // } + // }; + + // predicate for finding ssv nodes with a valid tcp port + let ssv_node_predicate = move |enr: &Enr| { + if let Some(Ok(is_ssv)) = enr.get_decodable("ssv") { + is_ssv && enr.tcp4().is_some() || enr.tcp6().is_some() + } else { + false + } + }; + + // General predicate + let predicate: Box bool + Send> = + //Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); + Box::new(move |enr: &Enr| ssv_node_predicate(enr)); + + // Build the future + let query_future = self + .discv5 + // Generate a random target node id. + .find_node_predicate(NodeId::random(), predicate, target_peers) + .map(|v| QueryResult { + query_type: query, + result: v, + }); + + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); + } + + /// Process the completed QueryResult returned from discv5. + fn process_completed_queries( + &mut self, + query: QueryResult, + ) -> Option>> { + match query.query_type { + QueryType::FindPeers => { + self.find_peer_active = false; + match query.result { + Ok(r) if r.is_empty() => { + debug!("Discovery query yielded no results."); + } + Ok(r) => { + debug!(peers_found = r.len(), "Discovery query completed"); + let results = r + .into_iter() + .map(|enr| { + // cache the found ENR's + //self.cached_enrs.put(enr.peer_id(), enr.clone()); + (enr, None) + }) + .collect(); + return Some(results); + } + Err(e) => { + warn!(error = %e, "Discovery query failed"); + } + } + } + _ => { + // TODO handle subnet queries + } + } + None + } + + /// Drives the queries returning any results from completed queries. + fn poll_queries(&mut self, cx: &mut Context) -> Option>> { + while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) { + let result = self.process_completed_queries(query_result); + if result.is_some() { + return result; + } + } + None + } +} + +impl NetworkBehaviour for Discovery { + // Discovery is not a real NetworkBehaviour... + type ConnectionHandler = ConnectionHandler; + type ToSwarm = DiscoveredPeers; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: Endpoint, + _port_use: PortUse, + ) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(c) => { + debug!("Connection established: {:?}", c); + } + _ => { + // TODO handle other events + } + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + _event: THandlerOutEvent, + ) { + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if !self.started { + return Poll::Pending; + } + + // Process the query queue + //self.process_queue(); + + // Drive the queries and return any results from completed queries + if let Some(peers) = self.poll_queries(cx) { + // return the result to the peer manager + return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers })); + } + Poll::Pending + } +} + +/// Builds a anchor ENR given a `network::Config`. +pub fn build_enr(enr_key: &CombinedKey, config: &Config) -> Result { + let mut builder = discv5::enr::Enr::builder(); + let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; + + if let Some(ip) = maybe_ipv4_address { + builder.ip4(*ip); + } + + if let Some(ip) = maybe_ipv6_address { + builder.ip6(*ip); + } + + if let Some(udp4_port) = config.enr_udp4_port { + builder.udp4(udp4_port.get()); + } + + if let Some(udp6_port) = config.enr_udp6_port { + builder.udp6(udp6_port.get()); + } + + // Add QUIC fields to the ENR. + // Since QUIC is used as an alternative transport for the libp2p protocols, + // the related fields should only be added when both QUIC and libp2p are enabled + if !config.disable_quic_support { + // If we are listening on ipv4, add the quic ipv4 port. + if let Some(quic4_port) = config.enr_quic4_port.or_else(|| { + config + .listen_addresses + .v4() + .and_then(|v4_addr| v4_addr.quic_port.try_into().ok()) + }) { + builder.add_value(QUIC_ENR_KEY, &quic4_port.get()); + } + + // If we are listening on ipv6, add the quic ipv6 port. + if let Some(quic6_port) = config.enr_quic6_port.or_else(|| { + config + .listen_addresses + .v6() + .and_then(|v6_addr| v6_addr.quic_port.try_into().ok()) + }) { + builder.add_value(QUIC6_ENR_KEY, &quic6_port.get()); + } + } + + // If the ENR port is not set, and we are listening over that ip version, use the listening port instead. + let tcp4_port = config.enr_tcp4_port.or_else(|| { + config + .listen_addresses + .v4() + .and_then(|v4_addr| v4_addr.tcp_port.try_into().ok()) + }); + if let Some(tcp4_port) = tcp4_port { + builder.tcp4(tcp4_port.get()); + } + + let tcp6_port = config.enr_tcp6_port.or_else(|| { + config + .listen_addresses + .v6() + .and_then(|v6_addr| v6_addr.tcp_port.try_into().ok()) + }); + if let Some(tcp6_port) = tcp6_port { + builder.tcp6(tcp6_port.get()); + } + + builder + .build(enr_key) + .map_err(|e| format!("Could not build Local ENR: {:?}", e)) +} diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 161c09600..777af44e7 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -2,6 +2,7 @@ mod behaviour; mod config; +mod discovery; mod keypair_utils; mod network; mod transport; @@ -10,3 +11,5 @@ mod types; pub use config::Config; pub use lighthouse_network::{ListenAddr, ListenAddress}; pub use network::Network; + +pub type Enr = discv5::enr::Enr; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index dc0325a96..803438f76 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,8 +1,7 @@ -use crate::behaviour::AnchorBehaviour; -use crate::behaviour::AnchorBehaviourEvent; -use crate::keypair_utils::load_private_key; -use crate::transport::build_transport; -use crate::Config; +use std::num::{NonZeroU8, NonZeroUsize}; +use std::pin::Pin; +use std::time::Duration; + use futures::StreamExt; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; @@ -11,13 +10,20 @@ use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; use libp2p::swarm::SwarmEvent; use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; +use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; -use std::num::{NonZeroU8, NonZeroUsize}; -use std::pin::Pin; -use std::time::Duration; use task_executor::TaskExecutor; use tracing::{info, log}; +use crate::behaviour::AnchorBehaviour; +use crate::behaviour::AnchorBehaviourEvent; +use crate::discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}; +use crate::keypair_utils::load_private_key; +use crate::transport::build_transport; +use crate::Config; + +use lighthouse_network::EnrExt; + pub struct Network { swarm: Swarm, peer_id: PeerId, @@ -29,7 +35,7 @@ impl Network { pub async fn try_new(config: &Config, executor: TaskExecutor) -> Result { let local_keypair: Keypair = load_private_key(&config.network_dir); let transport = build_transport(local_keypair.clone(), !config.disable_quic_support); - let behaviour = build_anchor_behaviour(local_keypair.clone()); + let behaviour = build_anchor_behaviour(local_keypair.clone(), config).await; let peer_id = local_keypair.public().to_peer_id(); let mut network = Network { @@ -85,6 +91,22 @@ impl Network { AnchorBehaviourEvent::Gossipsub(_ge) => { // TODO handle gossipsub events }, + // Inform the peer manager about discovered peers. + // + // The peer manager will subsequently decide which peers need to be dialed and then dial + // them. + AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { + //self.peer_manager_mut().peers_discovered(peers); + log::debug!("Discovered peers: {:?}", peers); + for (enr, _) in peers { + for tcp in enr.multiaddr_tcp() { + log::trace!("Dialing peer: {:?}", tcp); + if let Err(e) = self.swarm.dial(tcp.clone()) { + log::error!("Error dialing peer {}: {}", tcp, e); + } + } + } + } // TODO handle other behaviour events _ => { log::debug!("Unhandled behaviour event: {:?}", behaviour_event); @@ -102,7 +124,10 @@ impl Network { } } -fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { +async fn build_anchor_behaviour( + local_keypair: Keypair, + network_config: &Config, +) -> AnchorBehaviour { // TODO setup discv5 let identify = { let local_public_key = local_keypair.public(); @@ -140,12 +165,24 @@ fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { .unwrap(); let gossipsub = - gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair), config).unwrap(); + gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair.clone()), config) + .unwrap(); + + let discovery = { + // Build and start the discovery sub-behaviour + let mut discovery = Discovery::new(local_keypair.clone(), network_config) + .await + .unwrap(); + // start searching for peers + discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); + discovery + }; AnchorBehaviour { identify, ping: ping::Behaviour::default(), gossipsub, + discovery, } } @@ -204,9 +241,10 @@ fn build_swarm( #[cfg(test)] mod test { + use task_executor::TaskExecutor; + use crate::network::Network; use crate::Config; - use task_executor::TaskExecutor; #[tokio::test] async fn create_network() {