Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod cli;
pub mod config;

use anchor_validator_store::sync_committee_service::SyncCommitteeService;
use anchor_validator_store::metadata_service::MetadataService;
use anchor_validator_store::AnchorValidatorStore;
use beacon_node_fallback::{
start_fallback_updater_service, ApiTopic, BeaconNodeFallback, CandidateBeaconNode,
Expand Down Expand Up @@ -49,6 +49,7 @@ use validator_services::block_service::BlockServiceBuilder;
use validator_services::duties_service;
use validator_services::duties_service::DutiesServiceBuilder;
use validator_services::preparation_service::PreparationServiceBuilder;
use validator_services::sync_committee_service::SyncCommitteeService;
use zeroize::Zeroizing;

/// The filename within the `validators` directory that contains the slashing protection DB.
Expand Down Expand Up @@ -492,6 +493,15 @@ impl Client {
executor.clone(),
);

let metadata_service = MetadataService::new(
duties_service.clone(),
validator_store.clone(),
slot_clock.clone(),
beacon_nodes.clone(),
executor.clone(),
spec.clone(),
);

// We use `SLOTS_PER_EPOCH` as the capacity of the block notification channel, because
// we don't expect notifications to be delayed by more than a single slot, let alone a
// whole epoch!
Expand All @@ -512,6 +522,10 @@ impl Client {
.start_update_service(&spec)
.map_err(|e| format!("Unable to start sync committee service: {}", e))?;

metadata_service
.start_update_service()
.map_err(|e| format!("Unable to start metadata service: {}", e))?;

preparation_service
.start_update_service(&spec)
.map_err(|e| format!("Unable to start preparation service: {}", e))?;
Expand Down
36 changes: 31 additions & 5 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::sleep;
use tracing::error;
use tracing::{debug, error, trace, warn};
use types::{Hash256, PublicKeyBytes, SecretKey, Signature, Slot};

const COLLECTOR_NAME: &str = "signature_collector";
Expand Down Expand Up @@ -125,9 +125,11 @@ impl SignatureCollectorManager {
let manager = self.clone();
self.processor.urgent_consensus.send_blocking(
move || {
trace!(root = ?validator_signing_data.root, "Signing...");
let partial_signature = validator_signing_data
.share
.sign(validator_signing_data.root);
trace!(root = ?validator_signing_data.root, "Signed");

let message = PartialSignatureMessage {
partial_signature,
Expand Down Expand Up @@ -172,6 +174,12 @@ impl SignatureCollectorManager {
// Enter the signature we just signed for this validator.
signatures.push(message.clone());

debug!(
have = signatures.len(),
need = validators.len(),
"Checking if we have all signatures to send"
);

// If we collected the correct amount of signatures...
if signatures.len() == validators.len() {
let signatures = entry.remove().signatures;
Expand Down Expand Up @@ -254,13 +262,18 @@ impl SignatureCollectorManager {
move |drop_on_finish| {
let sender =
manager.get_or_spawn(message.signing_root, message.validator_index, slot);
let _ = sender.send(CollectorMessage {
if let Err(err) = sender.send(CollectorMessage {
kind: CollectorMessageKind::PartialSignature {
operator_id: message.signer,
signature: Box::new(message.partial_signature),
},
_drop_on_finish: drop_on_finish,
});
}) {
error!(
?err,
"failed to send partial signature to collector instance"
);
}
},
COLLECTOR_MESSAGE_NAME,
)?;
Expand Down Expand Up @@ -289,6 +302,11 @@ impl SignatureCollectorManager {
.processor
.permitless
.send_async(Box::pin(signature_collector(rx)), COLLECTOR_NAME);
debug!(
?signing_root,
?validator_index,
"Spawned signature collector"
);
tx
}
}
Expand Down Expand Up @@ -362,6 +380,7 @@ struct CollectorMessage {
_drop_on_finish: DropOnFinish,
}

#[derive(Debug)]
enum CollectorMessageKind {
/// A new task is waiting for the result of this collector instance.
RegisterNotifier {
Expand Down Expand Up @@ -416,14 +435,17 @@ async fn signature_collector(mut rx: mpsc::UnboundedReceiver<CollectorMessage>)
let mut threshold = None;

while let Some(message) = rx.recv().await {
debug!(msg=?message.kind, "Signature collector received message");
match message.kind {
CollectorMessageKind::RegisterNotifier {
notify,
threshold: new_threshold,
} => {
if let Some(full_signature) = &full_signature {
// We already got a reconstructed signature, send it immediately.
let _ = notify.send(full_signature.clone());
if let Err(err) = notify.send(full_signature.clone()) {
warn!(?err, "Failed to send recovered signature");
}
} else {
// Register the notifier and threshold.
notifiers.push(notify);
Expand Down Expand Up @@ -479,8 +501,12 @@ async fn signature_collector(mut rx: mpsc::UnboundedReceiver<CollectorMessage>)
}
};

debug!(?signature, "Successfully recovered signature");

for notifier in mem::take(&mut notifiers) {
let _ = notifier.send(Arc::clone(&signature));
if let Err(err) = notifier.send(Arc::clone(&signature)) {
warn!(?err, "Failed to send recovered signature");
}
}
full_signature = Some(signature);
}
Expand Down
Loading