Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ precedence_bits = "deny"
as_pointer_underscore = "deny"
literal_string_with_formatting_args = "deny"
manual_midpoint = "deny"
ip_constant = "deny"
doc_broken_link = "deny"

# Warm
cast_possible_truncation = "deny"
Expand Down
13 changes: 8 additions & 5 deletions binaries/cuprated/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Contains the blockchain manager, syncer and an interface to mutate the blockchain.
use std::sync::Arc;

use anyhow::{anyhow, Error};
use futures::FutureExt;
use tokio::sync::{mpsc, Notify};
use tower::{BoxError, Service, ServiceExt};
Expand Down Expand Up @@ -35,17 +36,17 @@ pub async fn check_add_genesis(
blockchain_read_handle: &mut BlockchainReadHandle,
blockchain_write_handle: &mut BlockchainWriteHandle,
network: Network,
) {
) -> Result<(), Error> {
// Try to get the chain height, will fail if the genesis block is not in the DB.
if blockchain_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.map_err(|_| anyhow!(PANIC_CRITICAL_SERVICE_ERROR))?
.call(BlockchainReadRequest::ChainHeight)
.await
.is_ok()
{
return;
return Ok(());
}

let genesis = generate_genesis_block(network);
Expand All @@ -56,7 +57,7 @@ pub async fn check_add_genesis(
blockchain_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.map_err(|_| anyhow!(PANIC_CRITICAL_SERVICE_ERROR))?
.call(BlockchainWriteRequest::WriteBlock(
VerifiedBlockInformation {
block_blob: genesis.serialize(),
Expand All @@ -74,7 +75,9 @@ pub async fn check_add_genesis(
},
))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
.map_err(|_| anyhow!(PANIC_CRITICAL_SERVICE_ERROR))?;

Ok(())
}

/// Initializes the consensus services.
Expand Down
3 changes: 2 additions & 1 deletion binaries/cuprated/src/blockchain/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager {
&mut blockchain_write_handle,
Network::Mainnet,
)
.await;
.await
.unwrap();

let mut context_config = ContextConfig::main_net();
context_config.difficulty_cfg.fixed_difficulty = Some(1);
Expand Down
6 changes: 4 additions & 2 deletions binaries/cuprated/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub enum OutputTarget {
}

/// The [`Command`] listener loop.
pub fn command_listener(incoming_commands: mpsc::Sender<Command>) -> ! {
pub fn command_listener(incoming_commands: mpsc::Sender<Command>) -> Result<(), anyhow::Error> {
let mut stdin = io::stdin();
let mut line = String::new();

Expand All @@ -85,9 +85,11 @@ pub fn command_listener(incoming_commands: mpsc::Sender<Command>) -> ! {
.blocking_send(command)
.inspect_err(|err| eprintln!("Failed to send command: {err}")),
),
Err(err) => err.print().unwrap(),
Err(err) => err.print()?,
}
}

Ok(())
}

/// The [`Command`] handler loop.
Expand Down
17 changes: 6 additions & 11 deletions binaries/cuprated/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
time::Duration,
};

use anyhow::anyhow;
use arti_client::KeystoreSelector;
use clap::Parser;
use safelog::DisplayRedacted;
Expand Down Expand Up @@ -70,18 +71,15 @@ const HEADER: &str = r"## ____ _
";

/// Reads the args & config file, returning a [`Config`].
pub fn read_config_and_args() -> Config {
pub fn read_config_and_args() -> Result<Config, anyhow::Error> {
let args = args::Args::parse();
args.do_quick_requests();

let config: Config = if let Some(config_file) = &args.config_file {
// If a config file was set in the args try to read it and exit if we can't.
// If a config file was set in the args try to read it.
match Config::read_from_path(config_file) {
Ok(config) => config,
Err(e) => {
eprintln_red(&format!("Failed to read config from file: {e}"));
std::process::exit(1);
}
Err(e) => return Err(anyhow!("Failed to read config from file: {e}")),
}
} else {
// First attempt to read the config file from the current directory.
Expand All @@ -101,11 +99,10 @@ pub fn read_config_and_args() -> Config {
eprintln_red(DEFAULT_CONFIG_WARNING);
std::thread::sleep(DEFAULT_CONFIG_STARTUP_DELAY);
}
})
.unwrap_or_default()
})?
};

args.apply_args(config)
Ok(args.apply_args(config))
}

config_struct! {
Expand Down Expand Up @@ -210,8 +207,6 @@ impl Config {
"Failed to parse config file at: {}",
file.as_ref().to_string_lossy()
));
eprintln_red(&format!("{e}"));
std::process::exit(1);
})?)
}

Expand Down
9 changes: 6 additions & 3 deletions binaries/cuprated/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{
mem::forget,
sync::OnceLock,
};

use anyhow::Error;
use tracing::{
instrument::WithSubscriber, level_filters::LevelFilter, subscriber::Interest, Metadata,
};
Expand Down Expand Up @@ -83,7 +85,7 @@ impl<S> Filter<S> for CupratedTracingFilter {
}

/// Initialize [`tracing`] for logging to stdout and to a file.
pub fn init_logging(config: &Config) {
pub fn init_logging(config: &Config) -> Result<(), Error> {
// initialize the stdout filter, set `STDOUT_FILTER_HANDLE` and create the layer.
let (stdout_filter, stdout_handle) = ReloadLayer::new(CupratedTracingFilter {
level: config.tracing.stdout.level,
Expand All @@ -101,8 +103,7 @@ pub fn init_logging(config: &Config) {
tracing_appender::rolling::Builder::new()
.rotation(Rotation::DAILY)
.max_log_files(appender_config.max_log_files)
.build(logs_path(&config.fs.data_directory, config.network()))
.unwrap(),
.build(logs_path(&config.fs.data_directory, config.network()))?,
);

// TODO: drop this when we shutdown.
Expand All @@ -125,6 +126,8 @@ pub fn init_logging(config: &Config) {
.with(appender_layer)
.with(stdout_layer)
.init();

Ok(())
}

/// Modify the stdout [`CupratedTracingFilter`].
Expand Down
70 changes: 41 additions & 29 deletions binaries/cuprated/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
reason = "TODO: remove after v1.0.0"
)]

use std::{mem, sync::Arc};
use std::{mem, process::ExitCode, sync::Arc};

use p2p::initialize_zones_p2p;
use anyhow::{anyhow, Error};
use tokio::sync::mpsc;
use tower::{Service, ServiceExt};
use tracing::{error, info, level_filters::LevelFilter};
Expand All @@ -36,7 +36,9 @@ use txpool::IncomingTxHandler;
use crate::{
config::Config,
constants::PANIC_CRITICAL_SERVICE_ERROR,
logging::eprintln_red,
logging::CupratedTracingFilter,
p2p::initialize_zones_p2p,
tor::{initialize_tor_if_enabled, TorMode},
};

Expand All @@ -54,32 +56,42 @@ mod tor;
mod txpool;
mod version;

fn main() {
fn main() -> ExitCode {
match main_inner() {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
eprintln_red(&e.to_string());
ExitCode::FAILURE
}
}
}

fn main_inner() -> Result<(), Error> {
// Initialize the killswitch.
killswitch::init_killswitch();

// Initialize global static `LazyLock` data.
statics::init_lazylock_statics();

let config = config::read_config_and_args();
let config = config::read_config_and_args()?;

blockchain::set_fast_sync_hashes(config.fast_sync, config.network());

// Initialize logging.
logging::init_logging(&config);
logging::init_logging(&config)?;

//Printing configuration
info!("{config}");

// Initialize the thread-pools

init_global_rayon_pool(&config);
init_global_rayon_pool(&config)?;

let rt = init_tokio_rt(&config);
let rt = init_tokio_rt(&config)?;

let db_thread_pool = cuprate_database_service::init_thread_pool(
cuprate_database_service::ReaderThreads::Number(config.storage.reader_threads),
);
)?;

// Start the blockchain & tx-pool databases.

Expand All @@ -89,12 +101,12 @@ fn main() {
Arc::clone(&db_thread_pool),
)
.inspect_err(|e| error!("Blockchain database error: {e}"))
.expect(DATABASE_CORRUPT_MSG);
.map_err(|_| anyhow!(DATABASE_CORRUPT_MSG))?;

let (txpool_read_handle, txpool_write_handle, _) =
cuprate_txpool::service::init_with_pool(&config.txpool_config(), db_thread_pool)
.inspect_err(|e| error!("Txpool database error: {e}"))
.expect(DATABASE_CORRUPT_MSG);
.map_err(|_| anyhow!(DATABASE_CORRUPT_MSG))?;

// Initialize async tasks.

Expand All @@ -103,24 +115,24 @@ fn main() {
blockchain_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.map_err(|_| anyhow!(PANIC_CRITICAL_SERVICE_ERROR))?
.call(BlockchainWriteRequest::FlushAltBlocks)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
.map_err(|_| anyhow!(PANIC_CRITICAL_SERVICE_ERROR))?;

// Check add the genesis block to the blockchain.
blockchain::check_add_genesis(
&mut blockchain_read_handle,
&mut blockchain_write_handle,
config.network(),
)
.await;
.await?;

// Start the context service and the block/tx verifier.
let context_svc =
blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config())
.await
.unwrap();
.map_err(|e| anyhow!(e))?;

// Bootstrap or configure Tor if enabled.
let tor_context = initialize_tor_if_enabled(&config).await;
Expand All @@ -133,7 +145,7 @@ fn main() {
txpool_read_handle.clone(),
tor_context,
)
.await;
.await?;

// Create the incoming tx handler service.
let tx_handler = IncomingTxHandler::init(
Expand Down Expand Up @@ -173,40 +185,40 @@ fn main() {
context_svc.clone(),
txpool_read_handle,
tx_handler,
);
)?;

// Start the command listener.
if std::io::IsTerminal::is_terminal(&std::io::stdin()) {
let (command_tx, command_rx) = mpsc::channel(1);
std::thread::spawn(|| commands::command_listener(command_tx));

// Wait on the io_loop, spawned on a separate task as this improves performance.
tokio::spawn(commands::io_loop(command_rx, context_svc))
.await
.unwrap();
tokio::spawn(commands::io_loop(command_rx, context_svc)).await?;
} else {
// If no STDIN, await OS exit signal.
info!("Terminal/TTY not detected, disabling STDIN commands");
tokio::signal::ctrl_c().await.unwrap();
tokio::signal::ctrl_c().await?;
}
});

Ok::<(), Error>(())
})?;

Ok(())
}

/// Initialize the [`tokio`] runtime.
fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
fn init_tokio_rt(config: &Config) -> Result<tokio::runtime::Runtime, Error> {
Ok(tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.tokio.threads)
.thread_name("cuprated-tokio")
.enable_all()
.build()
.unwrap()
.build()?)
}

/// Initialize the global [`rayon`] thread-pool.
fn init_global_rayon_pool(config: &Config) {
rayon::ThreadPoolBuilder::new()
fn init_global_rayon_pool(config: &Config) -> Result<(), Error> {
Ok(rayon::ThreadPoolBuilder::new()
.num_threads(config.rayon.threads)
.thread_name(|index| format!("cuprated-rayon-{index}"))
.build_global()
.unwrap();
.build_global()?)
}
4 changes: 2 additions & 2 deletions binaries/cuprated/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub async fn initialize_zones_p2p(
mut blockchain_read_handle: BlockchainReadHandle,
txpool_read_handle: TxpoolReadHandle,
tor_ctx: TorContext,
) -> (NetworkInterfaces, Vec<Sender<IncomingTxHandler>>) {
) -> Result<(NetworkInterfaces, Vec<Sender<IncomingTxHandler>>), anyhow::Error> {
// Start clearnet P2P.
let (clearnet, incoming_tx_handler_tx) = {
// If proxy is set
Expand Down Expand Up @@ -157,7 +157,7 @@ pub async fn initialize_zones_p2p(
tx_handler_subscribers.push(incoming_tx_handler_tx);
}

(network_interfaces, tx_handler_subscribers)
Ok((network_interfaces, tx_handler_subscribers))
}

/// Starts the P2P network zone, returning a [`NetworkInterface`] to interact with it.
Expand Down
Loading