Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion node/bft/events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub use transmission_request::TransmissionRequest;
mod transmission_response;
pub use transmission_response::TransmissionResponse;

mod unconfirmed_transaction;
pub use unconfirmed_transaction::UnconfirmedTransaction;

mod validators_request;
pub use validators_request::ValidatorsRequest;

Expand Down Expand Up @@ -108,6 +111,7 @@ pub enum Event<N: Network> {
ValidatorsRequest(ValidatorsRequest),
ValidatorsResponse(ValidatorsResponse<N>),
WorkerPing(WorkerPing<N>),
UnconfirmedTransaction(UnconfirmedTransaction<N>),
}

impl<N: Network> From<DisconnectReason> for Event<N> {
Expand Down Expand Up @@ -140,6 +144,7 @@ impl<N: Network> Event<N> {
Self::ValidatorsRequest(event) => event.name(),
Self::ValidatorsResponse(event) => event.name(),
Self::WorkerPing(event) => event.name(),
Self::UnconfirmedTransaction(event) => event.name(),
}
}

Expand All @@ -163,6 +168,7 @@ impl<N: Network> Event<N> {
Self::ValidatorsRequest(..) => 13,
Self::ValidatorsResponse(..) => 14,
Self::WorkerPing(..) => 15,
Self::UnconfirmedTransaction(..) => 16,
}
}
}
Expand All @@ -188,6 +194,7 @@ impl<N: Network> ToBytes for Event<N> {
Self::ValidatorsRequest(event) => event.write_le(writer),
Self::ValidatorsResponse(event) => event.write_le(writer),
Self::WorkerPing(event) => event.write_le(writer),
Self::UnconfirmedTransaction(event) => event.write_le(writer),
}
}
}
Expand Down Expand Up @@ -215,7 +222,8 @@ impl<N: Network> FromBytes for Event<N> {
13 => Self::ValidatorsRequest(ValidatorsRequest::read_le(&mut reader)?),
14 => Self::ValidatorsResponse(ValidatorsResponse::read_le(&mut reader)?),
15 => Self::WorkerPing(WorkerPing::read_le(&mut reader)?),
16.. => return Err(error(format!("Unknown event ID {id}"))),
16 => Self::UnconfirmedTransaction(UnconfirmedTransaction::read_le(&mut reader)?),
17.. => return Err(error(format!("Unknown event ID {id}"))),
};

// Ensure that there are no "dangling" bytes.
Expand Down
100 changes: 100 additions & 0 deletions node/bft/events/src/unconfirmed_transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2019-2025 Provable Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;

use snarkvm::prelude::Transaction;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UnconfirmedTransaction<N: Network> {
pub transaction: Data<Transaction<N>>,
}

impl<N: Network> UnconfirmedTransaction<N> {
/// Initializes a new transmission response event.
pub fn new(transaction: Data<Transaction<N>>) -> Self {
Self { transaction }
}
}

impl<N: Network> From<Data<Transaction<N>>> for UnconfirmedTransaction<N> {
/// Initializes a new transmission response event.
fn from(transaction: Data<Transaction<N>>) -> Self {
Self::new(transaction)
}
}

impl<N: Network> EventTrait for UnconfirmedTransaction<N> {
/// Returns the event name.
#[inline]
fn name(&self) -> Cow<'static, str> {
"UnconfirmedTransaction".into()
}
}

impl<N: Network> ToBytes for UnconfirmedTransaction<N> {
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
self.transaction.write_le(&mut writer)?;
Ok(())
}
}

impl<N: Network> FromBytes for UnconfirmedTransaction<N> {
fn read_le<R: Read>(mut reader: R) -> IoResult<Self> {
let transaction = Data::read_le(&mut reader)?;

Ok(Self { transaction })
}
}

#[cfg(test)]
pub mod prop_tests {
use crate::UnconfirmedTransaction;
use snarkvm::{
console::prelude::{FromBytes, ToBytes},
ledger::narwhal::Data,
prelude::Transaction,
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use proptest::{
collection,
prelude::{BoxedStrategy, Strategy, any},
prop_oneof,
};
use test_strategy::proptest;

type CurrentNetwork = snarkvm::prelude::MainnetV0;

pub fn any_transaction() -> BoxedStrategy<Data<Transaction<CurrentNetwork>>> {
prop_oneof![(collection::vec(any::<u8>(), 512..=512)).prop_map(|bytes| (Data::Buffer(Bytes::from(bytes)))),]
.boxed()
}

pub fn any_unconfirmed_transaction() -> BoxedStrategy<UnconfirmedTransaction<CurrentNetwork>> {
any_transaction().prop_map(UnconfirmedTransaction::new).boxed()
}

#[proptest]
fn serialize_deserialize(
#[strategy(any_unconfirmed_transaction())] original: UnconfirmedTransaction<CurrentNetwork>,
) {
let mut buf = BytesMut::default().writer();
UnconfirmedTransaction::write_le(&original, &mut buf).unwrap();

let deserialized = UnconfirmedTransaction::read_le(buf.into_inner().reader()).unwrap();
assert_eq!(original, deserialized);
}
}
45 changes: 42 additions & 3 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ use snarkvm::{
console::prelude::*,
ledger::{
committee::Committee,
narwhal::{BatchHeader, Data},
narwhal::{BatchHeader, Data, Transmission, TransmissionID},
},
prelude::{Address, Field},
prelude::{Address, Field, Transaction},
};

use colored::Colorize;
Expand Down Expand Up @@ -741,6 +741,35 @@ impl<N: Network> Gateway<N> {
}
Ok(true)
}
Event::UnconfirmedTransaction(event) => {
// Calculate the transmission checksum.
let checksum = event.transaction.to_checksum::<N>()?;
// Perform the deferred non-blocking deserialization of the transaction.
let transaction = match event.transaction.deserialize().await {
Ok(transaction) => transaction,
Err(error) => bail!("[UnconfirmedTransaction] {error}"),
};
// Construct the transmission ID.
let transmission_id = TransmissionID::Transaction(transaction.id(), checksum);
// Construct the transmission.
let transmission = Transmission::Transaction(Data::Object(transaction));

// Determine the worker ID.
let Ok(worker_id) = assign_to_worker(transmission_id, self.num_workers()) else {
warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", transmission_id);
return Ok(true);
};
// Send the unconfirmed transmission to the worker.
if let Some(sender) = self.get_worker_sender(worker_id) {
// Send the unconfirmed transmission to the worker.
if let Err(error) =
sender.tx_unconfirmed_transmission.send((peer_ip, transmission_id, transmission)).await
{
warn!("{CONTEXT} Unable to send unconfirmed transmission to worker {worker_id} - {error}");
}
}
Ok(true)
}
Event::ValidatorsRequest(_) => {
let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
connected_peers.shuffle(&mut rand::thread_rng());
Expand Down Expand Up @@ -1125,6 +1154,12 @@ impl<N: Network> Gateway<N> {
self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
}

/// Broadcast an unconfirmed transaction so other validators can cache it.
pub fn broadcast_unconfirmed_transaction(&self, transaction: Data<Transaction<N>>) {
let event = Event::UnconfirmedTransaction(crate::events::UnconfirmedTransaction { transaction });
Transport::broadcast(self, event);
}

// Update the dynamic validator whitelist.
fn update_validator_whitelist(&self) {
if let Err(e) =
Expand Down Expand Up @@ -1613,7 +1648,11 @@ impl<N: Network> Gateway<N> {
fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
// Retrieve the components of the challenge request.
let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
let current_block_height = self.ledger.latest_block_height();
let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
if consensus_version >= ConsensusVersion::V12 {
log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
}

let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);

Expand Down
15 changes: 12 additions & 3 deletions node/bft/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,32 @@ pub struct WorkerSender<N: Network> {
pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>,
pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>,
pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>,
pub tx_unconfirmed_transmission: mpsc::Sender<(SocketAddr, TransmissionID<N>, Transmission<N>)>,
}

#[derive(Debug)]
pub struct WorkerReceiver<N: Network> {
pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>,
pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>,
pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>,
pub rx_unconfirmed_transmission: mpsc::Receiver<(SocketAddr, TransmissionID<N>, Transmission<N>)>,
}

/// Initializes the worker channels.
pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) {
let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE);

let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response };
let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response };
let (tx_unconfirmed_transmission, rx_unconfirmed_transmission) = mpsc::channel(MAX_CHANNEL_SIZE);

let sender =
WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response, tx_unconfirmed_transmission };
let receiver = WorkerReceiver {
rx_worker_ping,
rx_transmission_request,
rx_transmission_response,
rx_unconfirmed_transmission,
};

(sender, receiver)
}
Expand Down
7 changes: 7 additions & 0 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,13 @@ impl<N: Network> Storage<N> {
);
}

/// Caches the given `transmission` in storage.
///
/// Returns whether the transaction is already present in the cache.
pub fn cache_transmission(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
self.transmissions.cache_transmission(transmission_id, transmission)
}

/// Inserts the given unprocessed `certificate` into storage.
///
/// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`.
Expand Down
27 changes: 25 additions & 2 deletions node/bft/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl<N: Network> Worker<N> {
|| self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
}

/// Returns the transmission if it exists in the ready queue, proposed batch, storage.
/// Returns the transmission if it exists in the ready queue, storage or proposed batch.
///
/// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions
/// in the ledger are not guaranteed to be invalid for the current batch.
Expand Down Expand Up @@ -252,6 +252,13 @@ impl<N: Network> Worker<N> {
self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
}
}

/// Inserts the unconfirmed transmission into the storage.
///
/// Returns whether the transaction is already present in the cache.
fn cache_transmission(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
self.storage.cache_transmission(transmission_id, transmission)
}
}

impl<N: Network> Worker<N> {
Expand Down Expand Up @@ -426,7 +433,12 @@ impl<N: Network> Worker<N> {
impl<N: Network> Worker<N> {
/// Starts the worker handlers.
fn start_handlers(&self, receiver: WorkerReceiver<N>) {
let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
let WorkerReceiver {
mut rx_worker_ping,
mut rx_transmission_request,
mut rx_transmission_response,
mut rx_unconfirmed_transmission,
} = receiver;

// Start the pending queue expiration loop.
let self_ = self.clone();
Expand Down Expand Up @@ -472,6 +484,17 @@ impl<N: Network> Worker<N> {
});
}
});

// Process the unconfirmed transmissions.
let self_ = self.clone();
self.spawn(async move {
while let Some((_peer_ip, transmission_id, transmission)) = rx_unconfirmed_transmission.recv().await {
// NOTE: to improve the chance of a transaction landing, besides
// caching incoming transactions we can also consider adding it
// to the mempool.
self_.cache_transmission(transmission_id, transmission);
}
});
}

/// Sends a transmission request to the specified peer.
Expand Down
Loading