Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion node/bft/events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub use transmission_request::TransmissionRequest;
mod transmission_response;
pub use transmission_response::TransmissionResponse;

mod unconfirmed_transaction;
pub use unconfirmed_transaction::UnconfirmedTransaction;

mod validators_request;
pub use validators_request::ValidatorsRequest;

Expand Down Expand Up @@ -108,6 +111,7 @@ pub enum Event<N: Network> {
ValidatorsRequest(ValidatorsRequest),
ValidatorsResponse(ValidatorsResponse<N>),
WorkerPing(WorkerPing<N>),
UnconfirmedTransaction(UnconfirmedTransaction<N>),
}

impl<N: Network> From<DisconnectReason> for Event<N> {
Expand Down Expand Up @@ -140,6 +144,7 @@ impl<N: Network> Event<N> {
Self::ValidatorsRequest(event) => event.name(),
Self::ValidatorsResponse(event) => event.name(),
Self::WorkerPing(event) => event.name(),
Self::UnconfirmedTransaction(event) => event.name(),
}
}

Expand All @@ -163,6 +168,7 @@ impl<N: Network> Event<N> {
Self::ValidatorsRequest(..) => 13,
Self::ValidatorsResponse(..) => 14,
Self::WorkerPing(..) => 15,
Self::UnconfirmedTransaction(..) => 16,
}
}
}
Expand All @@ -188,6 +194,7 @@ impl<N: Network> ToBytes for Event<N> {
Self::ValidatorsRequest(event) => event.write_le(writer),
Self::ValidatorsResponse(event) => event.write_le(writer),
Self::WorkerPing(event) => event.write_le(writer),
Self::UnconfirmedTransaction(event) => event.write_le(writer),
}
}
}
Expand Down Expand Up @@ -215,7 +222,8 @@ impl<N: Network> FromBytes for Event<N> {
13 => Self::ValidatorsRequest(ValidatorsRequest::read_le(&mut reader)?),
14 => Self::ValidatorsResponse(ValidatorsResponse::read_le(&mut reader)?),
15 => Self::WorkerPing(WorkerPing::read_le(&mut reader)?),
16.. => return Err(error(format!("Unknown event ID {id}"))),
16 => Self::UnconfirmedTransaction(UnconfirmedTransaction::read_le(&mut reader)?),
17.. => return Err(error(format!("Unknown event ID {id}"))),
};

// Ensure that there are no "dangling" bytes.
Expand Down
100 changes: 100 additions & 0 deletions node/bft/events/src/unconfirmed_transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2019-2025 Provable Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;

use snarkvm::prelude::Transaction;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UnconfirmedTransaction<N: Network> {
pub transaction: Data<Transaction<N>>,
}

impl<N: Network> UnconfirmedTransaction<N> {
/// Initializes a new transmission response event.
pub fn new(transaction: Data<Transaction<N>>) -> Self {
Self { transaction }
}
}

impl<N: Network> From<Data<Transaction<N>>> for UnconfirmedTransaction<N> {
/// Initializes a new transmission response event.
fn from(transaction: Data<Transaction<N>>) -> Self {
Self::new(transaction)
}
}

impl<N: Network> EventTrait for UnconfirmedTransaction<N> {
/// Returns the event name.
#[inline]
fn name(&self) -> Cow<'static, str> {
"UnconfirmedTransaction".into()
}
}

impl<N: Network> ToBytes for UnconfirmedTransaction<N> {
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
self.transaction.write_le(&mut writer)?;
Ok(())
}
}

impl<N: Network> FromBytes for UnconfirmedTransaction<N> {
fn read_le<R: Read>(mut reader: R) -> IoResult<Self> {
let transaction = Data::read_le(&mut reader)?;

Ok(Self { transaction })
}
}

#[cfg(test)]
pub mod prop_tests {
use crate::UnconfirmedTransaction;
use snarkvm::{
console::prelude::{FromBytes, ToBytes},
ledger::narwhal::Data,
prelude::Transaction,
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use proptest::{
collection,
prelude::{BoxedStrategy, Strategy, any},
prop_oneof,
};
use test_strategy::proptest;

type CurrentNetwork = snarkvm::prelude::MainnetV0;

pub fn any_transaction() -> BoxedStrategy<Data<Transaction<CurrentNetwork>>> {
prop_oneof![(collection::vec(any::<u8>(), 512..=512)).prop_map(|bytes| (Data::Buffer(Bytes::from(bytes)))),]
.boxed()
}

pub fn any_unconfirmed_transaction() -> BoxedStrategy<UnconfirmedTransaction<CurrentNetwork>> {
any_transaction().prop_map(UnconfirmedTransaction::new).boxed()
}

#[proptest]
fn serialize_deserialize(
#[strategy(any_unconfirmed_transaction())] original: UnconfirmedTransaction<CurrentNetwork>,
) {
let mut buf = BytesMut::default().writer();
UnconfirmedTransaction::write_le(&original, &mut buf).unwrap();

let deserialized = UnconfirmedTransaction::read_le(buf.into_inner().reader()).unwrap();
assert_eq!(original, deserialized);
}
}
47 changes: 44 additions & 3 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ use snarkvm::{
console::prelude::*,
ledger::{
committee::Committee,
narwhal::{BatchHeader, Data},
narwhal::{BatchHeader, Data, Transmission, TransmissionID},
},
prelude::{Address, Field},
prelude::{Address, Field, Transaction},
};

use colored::Colorize;
Expand Down Expand Up @@ -738,6 +738,35 @@ impl<N: Network> Gateway<N> {
}
Ok(true)
}
Event::UnconfirmedTransaction(event) => {
// Perform the deferred non-blocking deserialization of the transaction.
let transaction = match event.transaction.deserialize().await {
Ok(transaction) => transaction,
Err(error) => bail!("[UnconfirmedTransaction] {error}"),
};
// Calculate the transmission checksum.
let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this operation involves plenty of redundant work (since we're serializing a transaction that was just deserialized, even if it was in a blob form); it should be possible to optimize it so that the checksum can be calculated based on the event

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Construct the transmission ID.
let transmission_id = TransmissionID::Transaction(transaction.id(), checksum);
// Construct the transmission.
let transmission = Transmission::Transaction(Data::Object(transaction));

// Determine the worker ID.
let Ok(worker_id) = assign_to_worker(transmission_id, self.num_workers()) else {
warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", transmission_id);
return Ok(true);
};
// Send the unconfirmed transmission to the worker.
if let Some(sender) = self.get_worker_sender(worker_id) {
// Send the unconfirmed transmission to the worker.
if let Err(error) =
sender.tx_unconfirmed_transmission.send((peer_ip, transmission_id, transmission)).await
{
warn!("{CONTEXT} Unable to send unconfirmed transmission to worker {worker_id} - {error}");
}
}
Ok(true)
}
Event::ValidatorsRequest(_) => {
let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
connected_peers.shuffle(&mut rand::thread_rng());
Expand Down Expand Up @@ -1105,6 +1134,14 @@ impl<N: Network> Gateway<N> {
fn handle_banned_ips(&self) {
self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
}

/// Broadcast an unconfirmed transaction so other validators can cache it.
pub fn broadcast_unconfirmed_transaction(&self, transaction: Transaction<N>) {
let event = Event::UnconfirmedTransaction(crate::events::UnconfirmedTransaction {
transaction: Data::Object(transaction),
});
Transport::broadcast(self, event);
}
}

#[async_trait]
Expand Down Expand Up @@ -1585,7 +1622,11 @@ impl<N: Network> Gateway<N> {
fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
// Retrieve the components of the challenge request.
let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
let current_block_height = self.ledger.latest_block_height();
let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
if consensus_version >= ConsensusVersion::V12 {
log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
}

let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);

Expand Down
15 changes: 12 additions & 3 deletions node/bft/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,32 @@ pub struct WorkerSender<N: Network> {
pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>,
pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>,
pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>,
pub tx_unconfirmed_transmission: mpsc::Sender<(SocketAddr, TransmissionID<N>, Transmission<N>)>,
}

#[derive(Debug)]
pub struct WorkerReceiver<N: Network> {
pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>,
pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>,
pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>,
pub rx_unconfirmed_transmission: mpsc::Receiver<(SocketAddr, TransmissionID<N>, Transmission<N>)>,
}

/// Initializes the worker channels.
pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) {
let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE);

let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response };
let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response };
let (tx_unconfirmed_transmission, rx_unconfirmed_transmission) = mpsc::channel(MAX_CHANNEL_SIZE);

let sender =
WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response, tx_unconfirmed_transmission };
let receiver = WorkerReceiver {
rx_worker_ping,
rx_transmission_request,
rx_transmission_response,
rx_unconfirmed_transmission,
};

(sender, receiver)
}
Expand Down
7 changes: 7 additions & 0 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,13 @@ impl<N: Network> Storage<N> {
);
}

/// Caches the given `transmission` in storage.
///
/// Returns whether the transaction is already present in the cache.
pub fn cache_transmission(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
self.transmissions.cache_transmission(transmission_id, transmission)
}

/// Inserts the given unprocessed `certificate` into storage.
///
/// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`.
Expand Down
27 changes: 25 additions & 2 deletions node/bft/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl<N: Network> Worker<N> {
|| self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
}

/// Returns the transmission if it exists in the ready queue, proposed batch, storage.
/// Returns the transmission if it exists in the ready queue, storage or proposed batch.
///
/// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions
/// in the ledger are not guaranteed to be invalid for the current batch.
Expand Down Expand Up @@ -252,6 +252,13 @@ impl<N: Network> Worker<N> {
self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
}
}

/// Inserts the unconfirmed transmission into the storage.
///
/// Returns whether the transaction is already present in the cache.
fn cache_transmission(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
self.storage.cache_transmission(transmission_id, transmission)
}
}

impl<N: Network> Worker<N> {
Expand Down Expand Up @@ -426,7 +433,12 @@ impl<N: Network> Worker<N> {
impl<N: Network> Worker<N> {
/// Starts the worker handlers.
fn start_handlers(&self, receiver: WorkerReceiver<N>) {
let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
let WorkerReceiver {
mut rx_worker_ping,
mut rx_transmission_request,
mut rx_transmission_response,
mut rx_unconfirmed_transmission,
} = receiver;

// Start the pending queue expiration loop.
let self_ = self.clone();
Expand Down Expand Up @@ -472,6 +484,17 @@ impl<N: Network> Worker<N> {
});
}
});

// Process the unconfirmed transmissions.
let self_ = self.clone();
self.spawn(async move {
while let Some((_peer_ip, transmission_id, transmission)) = rx_unconfirmed_transmission.recv().await {
// NOTE: to improve the chance of a transaction landing, besides
// caching incoming transactions we can also consider adding it
// to the mempool.
self_.cache_transmission(transmission_id, transmission);
}
});
}

/// Sends a transmission request to the specified peer.
Expand Down
Loading