Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ crossterm = "0.29.0"
ratatui = "0.29.0"
itertools = "0.14"
sysinfo = "0.36.1"
indicatif = "0.17"

env_logger = "0.11.3"
self_update = { version = "0.42.0", features = [
Expand Down
45 changes: 37 additions & 8 deletions binaries/cli/src/command/build/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
net::{SocketAddr, TcpStream},
};

use crate::{output::print_log_message, session::DataflowSession};
use crate::{output::print_log_message, progress::ProgressBar, session::DataflowSession};

pub fn build_distributed_dataflow(
session: &mut TcpRequestReplyConnection,
Expand All @@ -23,6 +23,8 @@ pub fn build_distributed_dataflow(
local_working_dir: Option<std::path::PathBuf>,
uv: bool,
) -> eyre::Result<BuildId> {
let pb = ProgressBar::new_spinner("Triggering distributed build...");

let build_id = {
let reply_raw = session
.request(
Expand All @@ -42,11 +44,17 @@ pub fn build_distributed_dataflow(
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildTriggered { build_id } => {
eprintln!("dataflow build triggered: {build_id}");
pb.finish_with_message(format!("Dataflow build triggered: {}", build_id));
build_id
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
ControlRequestReply::Error(err) => {
pb.fail_with_message("Failed to trigger build");
bail!("{err}")
}
other => {
pb.fail_with_message("Unexpected response");
bail!("unexpected start dataflow reply: {other:?}")
}
}
};
Ok(build_id)
Expand All @@ -58,6 +66,8 @@ pub fn wait_until_dataflow_built(
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<BuildId> {
let pb = ProgressBar::new_spinner("Building dataflow on remote machines...");

// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_socket)
Expand All @@ -72,19 +82,29 @@ pub fn wait_until_dataflow_built(
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send build log subscribe request to coordinator")?;

let pb_clone = ProgressBar::new_spinner("Processing build logs...");
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
// Update progress bar with log info
if let Some(node_id) = &log_message.node_id {
let msg = format!("{}: {}", node_id, log_message.message);
pb_clone.set_message(msg);
} else {
pb_clone.set_message(log_message.message.clone());
}
print_log_message(log_message, false, true);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
pb_clone.finish_and_clear();
});

let reply_raw = session
Expand All @@ -96,12 +116,21 @@ pub fn wait_until_dataflow_built(
match result {
ControlRequestReply::DataflowBuildFinished { build_id, result } => match result {
Ok(()) => {
eprintln!("dataflow build finished successfully");
pb.finish_with_message("Dataflow build finished successfully");
Ok(build_id)
}
Err(err) => bail!("{err}"),
Err(err) => {
pb.fail_with_message("Build failed");
bail!("{err}")
}
},
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
ControlRequestReply::Error(err) => {
pb.fail_with_message("Build error");
bail!("{err}")
}
other => {
pb.fail_with_message("Unexpected response");
bail!("unexpected start dataflow reply: {other:?}")
}
}
}
56 changes: 51 additions & 5 deletions binaries/cli/src/command/build/local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, path::PathBuf};
use std::{collections::BTreeMap, path::PathBuf, sync::Arc};

use colored::Colorize;
use dora_core::{
Expand All @@ -8,7 +8,10 @@ use dora_core::{
use dora_message::{common::GitSource, id::NodeId};
use eyre::Context;

use crate::session::DataflowSession;
use crate::{
progress::{MultiProgress, ProgressBar},
session::DataflowSession,
};

pub fn build_dataflow_locally(
dataflow: Descriptor,
Expand Down Expand Up @@ -41,10 +44,15 @@ async fn build_dataflow(
uv,
};
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let node_count = nodes.len() as u64;

let mut git_manager = GitManager::default();
let prev_git_sources = &dataflow_session.git_sources;

// Create multi-progress for showing all nodes being built
let multi = Arc::new(MultiProgress::new());
let overall_pb = multi.add_bar(node_count, "Building nodes");

let mut tasks = Vec::new();

// build nodes
Expand All @@ -57,6 +65,9 @@ async fn build_dataflow(
git_source: prev_source,
});

// Create a progress spinner for this specific node
let node_pb = Arc::new(multi.add_spinner(format!("Building {}", node_id)));

let task = builder
.clone()
.build_node(
Expand All @@ -65,6 +76,8 @@ async fn build_dataflow(
prev_git,
LocalBuildLogger {
node_id: node_id.clone(),
progress_bar: Some(node_pb),
multi: Some(multi.clone()),
},
&mut git_manager,
)
Expand All @@ -82,12 +95,17 @@ async fn build_dataflow(
.with_context(|| format!("failed to build node `{node_id}`"))?;
info.node_working_dirs
.insert(node_id, node.node_working_dir);
overall_pb.inc(1);
}

overall_pb.finish_with_message(format!("Built {} nodes successfully", node_count));
Ok(info)
}

struct LocalBuildLogger {
node_id: NodeId,
progress_bar: Option<Arc<ProgressBar>>,
multi: Option<Arc<MultiProgress>>,
}

impl BuildLogger for LocalBuildLogger {
Expand All @@ -98,7 +116,11 @@ impl BuildLogger for LocalBuildLogger {
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String> + Send,
) {
let level = match level.into() {
let message_str: String = message.into();
let log_level = level.into();

// Always print log messages to terminal so users can scroll back
let level_colored = match log_level {
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR ".red(),
log::Level::Warn => "WARN ".yellow(),
Expand All @@ -109,13 +131,37 @@ impl BuildLogger for LocalBuildLogger {
LogLevelOrStdout::Stdout => "stdout".italic().dimmed(),
};
let node = self.node_id.to_string().bold().bright_black();
let message: String = message.into();
println!("{node}: {level} {message}");
println!("{node}: {level_colored} {message_str}");

// Also update progress bar message if available
if let Some(pb) = &self.progress_bar {
let level_indicator = match log_level {
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR",
log::Level::Warn => "WARN",
log::Level::Info => "",
log::Level::Debug => "DEBUG",
log::Level::Trace => "TRACE",
},
LogLevelOrStdout::Stdout => "",
};

if !level_indicator.is_empty() {
pb.set_message(format!(
"{}: {} - {}",
self.node_id, level_indicator, message_str
));
} else {
pb.set_message(format!("{}: {}", self.node_id, message_str));
}
}
}

async fn try_clone(&self) -> eyre::Result<Self::Clone> {
Ok(LocalBuildLogger {
node_id: self.node_id.clone(),
progress_bar: self.progress_bar.clone(),
multi: self.multi.clone(),
})
}
}
1 change: 0 additions & 1 deletion binaries/cli/src/command/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use clap::{CommandFactory, ValueEnum};
use clap_complete::Shell;

use crate::command::Executable;
use sysinfo;

#[derive(Debug, clap::Args)]
#[command(after_help = r#"
Expand Down
16 changes: 15 additions & 1 deletion binaries/cli/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::Executable;
use crate::{
common::{handle_dataflow_result, resolve_dataflow, write_events_to},
output::print_log_message,
progress::ProgressBar,
session::DataflowSession,
};
use dora_daemon::{Daemon, LogDestination, flume};
Expand Down Expand Up @@ -45,21 +46,32 @@ pub fn run(dataflow: String, uv: bool) -> eyre::Result<()> {
.wrap_err("failed to set up tracing subscriber")?;
}

let pb = ProgressBar::new_spinner("Initializing dataflow...");

let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

pb.set_message("Starting runtime...");
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;

let (log_tx, log_rx) = flume::bounded(100);
let (log_tx, log_rx) = flume::bounded::<dora_message::common::LogMessage>(100);
let pb_clone = ProgressBar::new_spinner("Running dataflow...");
std::thread::spawn(move || {
for message in log_rx {
// Update progress bar with node messages
if let Some(node_id) = &message.node_id {
pb_clone.set_message(format!("{}: {}", node_id, message.message));
}
print_log_message(message, false, false);
}
pb_clone.finish_and_clear();
});

pb.set_message("Running dataflow...");
let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
Expand All @@ -69,6 +81,8 @@ pub fn run(dataflow: String, uv: bool) -> eyre::Result<()> {
LogDestination::Channel { sender: log_tx },
write_events_to(),
))?;

pb.finish_with_message("Dataflow execution completed");
handle_dataflow_result(result, None)
}

Expand Down
Loading
Loading