11use dashmap:: DashMap ;
2+ use openssl:: hash:: MessageDigest ;
3+ use openssl:: pkey:: { PKey , Private } ;
4+ use openssl:: rsa:: Rsa ;
5+ use openssl:: sign:: Signer ;
6+
27use processor:: { DropOnFinish , Senders , WorkItem } ;
38use qbft:: {
49 Completed , ConfigBuilder , ConfigBuilderError , DefaultLeaderFunction , InstanceHeight , Message ,
510 WrappedQbftMessage ,
611} ;
712use slot_clock:: SlotClock ;
8- use ssv_types:: consensus:: { BeaconVote , QbftData , ValidatorConsensusData } ;
13+ use ssv_types:: consensus:: { BeaconVote , QbftData , UnsignedSSVMessage , ValidatorConsensusData } ;
14+ use std:: error:: Error ;
915
16+ use ssv_types:: message:: SignedSSVMessage ;
1017use ssv_types:: OperatorId as QbftOperatorId ;
1118use ssv_types:: { Cluster , ClusterId , OperatorId } ;
19+ use ssz:: Encode ;
1220use std:: fmt:: Debug ;
1321use std:: hash:: Hash ;
1422use std:: sync:: Arc ;
@@ -27,6 +35,7 @@ mod tests;
2735const QBFT_INSTANCE_NAME : & str = "qbft_instance" ;
2836const QBFT_MESSAGE_NAME : & str = "qbft_message" ;
2937const QBFT_CLEANER_NAME : & str = "qbft_cleaner" ;
38+ const QBFT_SIGNER_NAME : & str = "qbft_signer" ;
3039
3140/// Number of slots to keep before the current slot
3241const QBFT_RETAIN_SLOTS : u64 = 1 ;
@@ -94,9 +103,10 @@ pub struct QbftManager<T: SlotClock + 'static> {
94103 validator_consensus_data_instances : Map < ValidatorInstanceId , ValidatorConsensusData > ,
95104 // All of the QBFT instances that are voting on beacon data
96105 beacon_vote_instances : Map < CommitteeInstanceId , BeaconVote > ,
97- // Takes messages from qbft instances and sends them to be signed
98- // TODO!(). This will be the network channel for passing signatures from processor -> network
99- qbft_out : mpsc:: Sender < Message > ,
106+ // Private key used for signing messages
107+ pkey : Arc < PKey < Private > > ,
108+ // Channel to pass signed messages along to the network
109+ network_tx : mpsc:: UnboundedSender < SignedSSVMessage > ,
100110}
101111
102112impl < T : SlotClock > QbftManager < T > {
@@ -105,15 +115,19 @@ impl<T: SlotClock> QbftManager<T> {
105115 processor : Senders ,
106116 operator_id : OperatorId ,
107117 slot_clock : T ,
108- qbft_out : mpsc:: Sender < Message > ,
118+ key : Rsa < Private > ,
119+ network_tx : mpsc:: UnboundedSender < SignedSSVMessage > ,
109120 ) -> Result < Arc < Self > , QbftError > {
121+ let pkey = Arc :: new ( PKey :: from_rsa ( key) . expect ( "Failed to create PKey from RSA" ) ) ;
122+
110123 let manager = Arc :: new ( QbftManager {
111124 processor,
112125 operator_id,
113126 slot_clock,
114127 validator_consensus_data_instances : DashMap :: new ( ) ,
115128 beacon_vote_instances : DashMap :: new ( ) ,
116- qbft_out,
129+ pkey,
130+ network_tx,
117131 } ) ;
118132
119133 // Start a long running task that will clean up old instances
@@ -147,7 +161,7 @@ impl<T: SlotClock> QbftManager<T> {
147161
148162 // Get or spawn a new qbft instance. This will return the sender that we can use to send
149163 // new messages to the specific instance
150- let sender = D :: get_or_spawn_instance ( self , id, self . qbft_out . clone ( ) ) ;
164+ let sender = D :: get_or_spawn_instance ( self , id) ;
151165 self . processor . urgent_consensus . send_immediate (
152166 move |drop_on_finish : DropOnFinish | {
153167 // A message to initialize this instance
@@ -173,7 +187,7 @@ impl<T: SlotClock> QbftManager<T> {
173187 id : D :: Id ,
174188 data : WrappedQbftMessage ,
175189 ) -> Result < ( ) , QbftError > {
176- let sender = D :: get_or_spawn_instance ( self , id, self . qbft_out . clone ( ) ) ;
190+ let sender = D :: get_or_spawn_instance ( self , id) ;
177191 self . processor . urgent_consensus . send_immediate (
178192 move |drop_on_finish : DropOnFinish | {
179193 let _ = sender. send ( QbftMessage {
@@ -214,7 +228,6 @@ pub trait QbftDecidable<T: SlotClock + 'static>: QbftData<Hash = Hash256> + Send
214228 fn get_or_spawn_instance (
215229 manager : & QbftManager < T > ,
216230 id : Self :: Id ,
217- qbft_out : mpsc:: Sender < Message > ,
218231 ) -> UnboundedSender < QbftMessage < Self > > {
219232 let map = Self :: get_map ( manager) ;
220233 let ret = match map. entry ( id) {
@@ -224,10 +237,15 @@ pub trait QbftDecidable<T: SlotClock + 'static>: QbftData<Hash = Hash256> + Send
224237 // with the reeiver
225238 let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
226239 let tx = entry. insert ( tx) ;
227- let _ = manager
228- . processor
229- . permitless
230- . send_async ( Box :: pin ( qbft_instance ( rx, qbft_out) ) , QBFT_INSTANCE_NAME ) ;
240+ let _ = manager. processor . permitless . send_async (
241+ Box :: pin ( qbft_instance (
242+ rx,
243+ manager. network_tx . clone ( ) ,
244+ manager. pkey . clone ( ) ,
245+ manager. processor . clone ( ) ,
246+ ) ) ,
247+ QBFT_INSTANCE_NAME ,
248+ ) ;
231249 tx. clone ( )
232250 }
233251 } ;
@@ -282,7 +300,9 @@ enum QbftInstance<D: QbftData<Hash = Hash256>, S: FnMut(Message)> {
282300
283301async fn qbft_instance < D : QbftData < Hash = Hash256 > > (
284302 mut rx : UnboundedReceiver < QbftMessage < D > > ,
285- tx : mpsc:: Sender < Message > ,
303+ network_tx : mpsc:: UnboundedSender < SignedSSVMessage > ,
304+ pkey : Arc < PKey < Private > > ,
305+ processor : Senders ,
286306) {
287307 // Signal a new instance that is uninitialized
288308 let mut instance = QbftInstance :: Uninitialized {
@@ -325,18 +345,24 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
325345 QbftInstance :: Uninitialized { message_buffer } => {
326346 // Create a new instance and receive any buffered messages
327347 let mut instance = Box :: new ( Qbft :: new ( config, initial, |message| {
328- match tx. try_send ( message) {
329- Ok ( ( ) ) => ( ) ,
330- Err ( TrySendError :: Full ( msg) ) => {
331- // Queue is full - drop message under constrained bandwidth
332- warn ! ( ?msg, "Dropping QBFT message due to full queue" ) ;
333- }
334- Err ( TrySendError :: Closed ( _) ) => {
335- // Channel closed - critical failure or shutdown
336- error ! ( "QBFT message channel closed - initiating shutdown" ) ;
337- // todo!() need some sort of shutdown
338- }
339- }
348+ let ( id, unsigned) = message. desugar ( ) ;
349+ let serialized = unsigned. as_ssz_bytes ( ) ;
350+ let pkey = pkey. clone ( ) ;
351+ let network_tx = network_tx. clone ( ) ;
352+
353+ processor
354+ . urgent_consensus
355+ . send_blocking (
356+ move || {
357+ if let Err ( e) = sign_and_send_message (
358+ pkey, id, unsigned, serialized, network_tx,
359+ ) {
360+ error ! ( "Signing failed: {}" , e) ;
361+ }
362+ } ,
363+ QBFT_SIGNER_NAME ,
364+ )
365+ . unwrap_or_else ( |e| warn ! ( "Failed to send to processor: {}" , e) ) ;
340366 } ) ) ;
341367 for message in message_buffer {
342368 instance. receive ( message) ;
@@ -417,6 +443,33 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
417443 }
418444}
419445
446+ // Sign a message and send it to the network via the network_tx
447+ fn sign_and_send_message (
448+ pkey : Arc < PKey < Private > > ,
449+ id : OperatorId ,
450+ unsigned : UnsignedSSVMessage ,
451+ serialized : Vec < u8 > ,
452+ network_tx : UnboundedSender < SignedSSVMessage > ,
453+ ) -> Result < ( ) , Box < dyn Error > > {
454+ // Create the signature
455+ let mut signer = Signer :: new ( MessageDigest :: sha256 ( ) , & pkey) ?;
456+ signer. update ( & serialized) ?;
457+ let sig = signer. sign_to_vec ( ) ?;
458+
459+ // Build the signed ssv message, then serialize it and send to the network
460+ let signed = SignedSSVMessage :: new (
461+ vec ! [ sig] ,
462+ vec ! [ * id] ,
463+ unsigned. ssv_message ,
464+ unsigned. full_data ,
465+ ) ?;
466+ network_tx
467+ . send ( signed)
468+ . map_err ( |e| format ! ( "Failed to send signed ssv message to network: {}" , e) ) ?;
469+
470+ Ok ( ( ) )
471+ }
472+
420473#[ derive( Debug , Clone ) ]
421474pub enum QbftError {
422475 QueueClosedError ,
0 commit comments