Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ crossterm = "0.29.0"
ratatui = "0.29.0"
itertools = "0.14"
sysinfo = "0.36.1"
indicatif = "0.17"
atty = "0.2"

env_logger = "0.11.3"
self_update = { version = "0.42.0", features = [
Expand Down
45 changes: 37 additions & 8 deletions binaries/cli/src/command/build/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
net::{SocketAddr, TcpStream},
};

use crate::{output::print_log_message, session::DataflowSession};
use crate::{output::print_log_message, progress::ProgressBar, session::DataflowSession};

pub fn build_distributed_dataflow(
session: &mut TcpRequestReplyConnection,
Expand All @@ -23,6 +23,8 @@ pub fn build_distributed_dataflow(
local_working_dir: Option<std::path::PathBuf>,
uv: bool,
) -> eyre::Result<BuildId> {
let pb = ProgressBar::new_spinner("Triggering distributed build...");

let build_id = {
let reply_raw = session
.request(
Expand All @@ -42,11 +44,17 @@ pub fn build_distributed_dataflow(
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildTriggered { build_id } => {
eprintln!("dataflow build triggered: {build_id}");
pb.finish_with_message(format!("Dataflow build triggered: {}", build_id));
build_id
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
ControlRequestReply::Error(err) => {
pb.fail_with_message("Failed to trigger build");
bail!("{err}")
}
other => {
pb.fail_with_message("Unexpected response");
bail!("unexpected start dataflow reply: {other:?}")
}
}
};
Ok(build_id)
Expand All @@ -58,6 +66,8 @@ pub fn wait_until_dataflow_built(
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<BuildId> {
let pb = ProgressBar::new_spinner("Building dataflow on remote machines...");

// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_socket)
Expand All @@ -72,19 +82,29 @@ pub fn wait_until_dataflow_built(
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send build log subscribe request to coordinator")?;

let pb_clone = ProgressBar::new_spinner("Processing build logs...");
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
// Update progress bar with log info
if let Some(node_id) = &log_message.node_id {
let msg = format!("{}: {}", node_id, log_message.message);
pb_clone.set_message(msg);
} else {
pb_clone.set_message(log_message.message.clone());
}
print_log_message(log_message, false, true);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
pb_clone.finish_and_clear();
});

let reply_raw = session
Expand All @@ -96,12 +116,21 @@ pub fn wait_until_dataflow_built(
match result {
ControlRequestReply::DataflowBuildFinished { build_id, result } => match result {
Ok(()) => {
eprintln!("dataflow build finished successfully");
pb.finish_with_message("Dataflow build finished successfully");
Ok(build_id)
}
Err(err) => bail!("{err}"),
Err(err) => {
pb.fail_with_message("Build failed");
bail!("{err}")
}
},
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
ControlRequestReply::Error(err) => {
pb.fail_with_message("Build error");
bail!("{err}")
}
other => {
pb.fail_with_message("Unexpected response");
bail!("unexpected start dataflow reply: {other:?}")
}
}
}
75 changes: 60 additions & 15 deletions binaries/cli/src/command/build/local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, path::PathBuf};
use std::{collections::BTreeMap, path::PathBuf, sync::Arc};

use colored::Colorize;
use dora_core::{
Expand All @@ -8,7 +8,10 @@ use dora_core::{
use dora_message::{common::GitSource, id::NodeId};
use eyre::Context;

use crate::session::DataflowSession;
use crate::{
progress::{MultiProgress, ProgressBar},
session::DataflowSession,
};

pub fn build_dataflow_locally(
dataflow: Descriptor,
Expand Down Expand Up @@ -41,10 +44,15 @@ async fn build_dataflow(
uv,
};
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let node_count = nodes.len() as u64;

let mut git_manager = GitManager::default();
let prev_git_sources = &dataflow_session.git_sources;

// Create multi-progress for showing all nodes being built
let multi = Arc::new(MultiProgress::new());
let overall_pb = multi.add_bar(node_count, "Building nodes");

let mut tasks = Vec::new();

// build nodes
Expand All @@ -57,6 +65,9 @@ async fn build_dataflow(
git_source: prev_source,
});

// Create a progress spinner for this specific node
let node_pb = multi.add_spinner(format!("Building {}", node_id));

let task = builder
.clone()
.build_node(
Expand All @@ -65,6 +76,7 @@ async fn build_dataflow(
prev_git,
LocalBuildLogger {
node_id: node_id.clone(),
progress_bar: Some(node_pb),
},
&mut git_manager,
)
Expand All @@ -82,12 +94,16 @@ async fn build_dataflow(
.with_context(|| format!("failed to build node `{node_id}`"))?;
info.node_working_dirs
.insert(node_id, node.node_working_dir);
overall_pb.inc(1);
}

overall_pb.finish_with_message(format!("Built {} nodes successfully", node_count));
Ok(info)
}

struct LocalBuildLogger {
node_id: NodeId,
progress_bar: Option<ProgressBar>,
}

impl BuildLogger for LocalBuildLogger {
Expand All @@ -98,24 +114,53 @@ impl BuildLogger for LocalBuildLogger {
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String> + Send,
) {
let level = match level.into() {
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR ".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
log::Level::Debug => "DEBUG ".bright_blue(),
log::Level::Trace => "TRACE ".dimmed(),
},
LogLevelOrStdout::Stdout => "stdout".italic().dimmed(),
};
let node = self.node_id.to_string().bold().bright_black();
let message: String = message.into();
println!("{node}: {level} {message}");
let message_str: String = message.into();

// Update progress bar message if available
if let Some(pb) = &self.progress_bar {
let level_indicator = match level.into() {
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR",
log::Level::Warn => "WARN",
log::Level::Info => "",
log::Level::Debug => "DEBUG",
log::Level::Trace => "TRACE",
},
LogLevelOrStdout::Stdout => "",
};

if !level_indicator.is_empty() {
pb.set_message(format!(
"{}: {} - {}",
self.node_id, level_indicator, message_str
));
} else {
pb.set_message(format!("{}: {}", self.node_id, message_str));
}
} else {
// Fallback to println if no progress bar
let level = match level.into() {
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR ".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
log::Level::Debug => "DEBUG ".bright_blue(),
log::Level::Trace => "TRACE ".dimmed(),
},
LogLevelOrStdout::Stdout => "stdout".italic().dimmed(),
};
let node = self.node_id.to_string().bold().bright_black();
println!("{node}: {level} {message_str}");
}
}

async fn try_clone(&self) -> eyre::Result<Self::Clone> {
Ok(LocalBuildLogger {
node_id: self.node_id.clone(),
progress_bar: self
.progress_bar
.as_ref()
.map(|_| ProgressBar::new_spinner(format!("Building {}", self.node_id))),
})
}
}
1 change: 0 additions & 1 deletion binaries/cli/src/command/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use clap::{CommandFactory, ValueEnum};
use clap_complete::Shell;

use crate::command::Executable;
use sysinfo;

#[derive(Debug, clap::Args)]
#[command(after_help = r#"
Expand Down
16 changes: 15 additions & 1 deletion binaries/cli/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::Executable;
use crate::{
common::{handle_dataflow_result, resolve_dataflow, write_events_to},
output::print_log_message,
progress::ProgressBar,
session::DataflowSession,
};
use dora_daemon::{Daemon, LogDestination, flume};
Expand Down Expand Up @@ -45,21 +46,32 @@ pub fn run(dataflow: String, uv: bool) -> eyre::Result<()> {
.wrap_err("failed to set up tracing subscriber")?;
}

let pb = ProgressBar::new_spinner("Initializing dataflow...");

let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

pb.set_message("Starting runtime...");
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;

let (log_tx, log_rx) = flume::bounded(100);
let (log_tx, log_rx) = flume::bounded::<dora_message::common::LogMessage>(100);
let pb_clone = ProgressBar::new_spinner("Running dataflow...");
std::thread::spawn(move || {
for message in log_rx {
// Update progress bar with node messages
if let Some(node_id) = &message.node_id {
pb_clone.set_message(format!("{}: {}", node_id, message.message));
}
print_log_message(message, false, false);
}
pb_clone.finish_and_clear();
});

pb.set_message("Running dataflow...");
let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
Expand All @@ -69,6 +81,8 @@ pub fn run(dataflow: String, uv: bool) -> eyre::Result<()> {
LogDestination::Channel { sender: log_tx },
write_events_to(),
))?;

pb.finish_with_message("Dataflow execution completed");
handle_dataflow_result(result, None)
}

Expand Down
Loading
Loading