Skip to content

Commit 6f5269b

Browse files
authored
Add opentelemetry to the daemon (#1303)
This PR adds Opentelemetry to the daemon and make it possible to receive node failure information within opentelemetry even if the node panic and don't have time itself to send the error logs.
2 parents 1ba7809 + 667b969 commit 6f5269b

File tree

6 files changed

+131
-33
lines changed

6 files changed

+131
-33
lines changed

binaries/cli/src/command/daemon.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ use dora_core::topics::{
55
};
66

77
use dora_daemon::LogDestination;
8-
#[cfg(feature = "tracing")]
9-
use dora_tracing::TracingBuilder;
10-
118
use eyre::Context;
129
use std::{
1310
net::{IpAddr, SocketAddr},
@@ -40,28 +37,37 @@ pub struct Daemon {
4037

4138
impl Executable for Daemon {
4239
fn execute(self) -> eyre::Result<()> {
40+
let rt = Builder::new_multi_thread()
41+
.enable_all()
42+
.build()
43+
.context("tokio runtime failed")?;
44+
4345
#[cfg(feature = "tracing")]
44-
{
46+
let _guard = {
47+
let _enter = rt.enter();
48+
4549
let name = "dora-daemon";
4650
let filename = self
4751
.machine_id
4852
.as_ref()
4953
.map(|id| format!("{name}-{id}"))
5054
.unwrap_or(name.to_string());
51-
let mut builder = TracingBuilder::new(name);
52-
if !self.quiet {
53-
builder = builder.with_stdout("info,zenoh=warn", false);
54-
}
55-
builder = builder.with_file(filename, LevelFilter::INFO)?;
56-
builder
57-
.build()
58-
.wrap_err("failed to set up tracing subscriber")?;
59-
}
55+
let quiet = self.quiet;
6056

61-
let rt = Builder::new_multi_thread()
62-
.enable_all()
63-
.build()
64-
.context("tokio runtime failed")?;
57+
let stdout_filter = if !quiet {
58+
Some(std::env::var("RUST_LOG").unwrap_or("info".to_string()))
59+
} else {
60+
None
61+
};
62+
63+
dora_tracing::init_tracing_subscriber(
64+
name,
65+
stdout_filter.as_deref(),
66+
Some(&filename),
67+
LevelFilter::INFO,
68+
)
69+
.context("failed to initialize tracing")?
70+
};
6571
rt.block_on(async {
6672
match self.run_dataflow {
6773
Some(dataflow_path) => {

binaries/cli/src/command/run.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use crate::{
1212
session::DataflowSession,
1313
};
1414
use dora_daemon::{Daemon, LogDestination, flume};
15-
use dora_tracing::TracingBuilder;
1615
use eyre::Context;
1716
use tokio::runtime::Builder;
1817

@@ -36,22 +35,27 @@ pub fn run_func(dataflow: String, uv: bool) -> eyre::Result<()> {
3635
}
3736

3837
pub fn run(dataflow: String, uv: bool) -> eyre::Result<()> {
38+
let rt = Builder::new_multi_thread()
39+
.enable_all()
40+
.build()
41+
.context("tokio runtime failed")?;
42+
3943
#[cfg(feature = "tracing")]
40-
{
41-
let log_level = std::env::var("RUST_LOG").ok().unwrap_or("info".to_string());
42-
TracingBuilder::new("run")
43-
.with_stdout(log_level, false)
44-
.build()
45-
.wrap_err("failed to set up tracing subscriber")?;
46-
}
44+
let _guard = {
45+
let _enter = rt.enter();
46+
let env_log = std::env::var("RUST_LOG").unwrap_or("info".to_string());
47+
dora_tracing::init_tracing_subscriber(
48+
"dora-run",
49+
Some(&env_log),
50+
None,
51+
tracing::metadata::LevelFilter::INFO,
52+
)
53+
.context("failed to initialize tracing")?
54+
};
4755

4856
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
4957
let dataflow_session =
5058
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;
51-
let rt = Builder::new_multi_thread()
52-
.enable_all()
53-
.build()
54-
.context("tokio runtime failed")?;
5559

5660
let (log_tx, log_rx) = flume::bounded(100);
5761
std::thread::spawn(move || {

binaries/daemon/src/spawn/prepared.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,16 @@ impl PreparedNode {
138138
let restart = restart && !restart_disabled;
139139
let success = exit_status.is_success();
140140

141+
if !success {
142+
let _span = tracing::error_span!(
143+
"node_failure",
144+
node_id = %self.node.id,
145+
dataflow_id = %self.dataflow_id
146+
)
147+
.entered();
148+
tracing::error!("node exited with error: {:?}", exit_status);
149+
}
150+
141151
let event = DoraEvent::SpawnedNodeResult {
142152
dataflow_id: self.dataflow_id,
143153
node_id: self.node.id.clone(),

libraries/extensions/telemetry/tracing/src/lib.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,24 @@ impl TracingBuilder {
6363
/// it uses [std::io::stdout] which is synchronous
6464
/// and might block the logging thread.
6565
pub fn with_stdout(mut self, filter: impl AsRef<str>, json: bool) -> Self {
66-
let parsed = EnvFilter::builder()
66+
let mut parsed = EnvFilter::builder()
6767
.parse_lossy(filter)
6868
.add_directive("hyper=off".parse().unwrap())
6969
.add_directive("tonic=off".parse().unwrap())
70+
.add_directive("tokio=off".parse().unwrap())
71+
.add_directive("process_wrap=off".parse().unwrap())
7072
.add_directive("h2=off".parse().unwrap())
7173
.add_directive("reqwest=off".parse().unwrap());
74+
let env_log = std::env::var("RUST_LOG").unwrap_or_default();
75+
if !env_log.contains("dora_daemon") {
76+
parsed = parsed.add_directive("dora_daemon=info".parse().unwrap());
77+
}
78+
if !env_log.contains("dora_core") {
79+
parsed = parsed.add_directive("dora_core=warn".parse().unwrap());
80+
}
81+
if !env_log.contains("zenoh") {
82+
parsed = parsed.add_directive("zenoh=warn".parse().unwrap());
83+
}
7284
let env_filter = EnvFilter::from_default_env().or(parsed);
7385
let layer = tracing_subscriber::fmt::layer()
7486
.compact()
@@ -135,11 +147,17 @@ impl TracingBuilder {
135147

136148
self.guard = Some(guard);
137149
self.layers.push(MetricsLayer::new(meter_provider).boxed());
138-
let filter_otel = EnvFilter::new("trace")
150+
let mut filter_otel = EnvFilter::new("trace")
139151
.add_directive("hyper=off".parse().unwrap())
140152
.add_directive("tonic=off".parse().unwrap())
153+
.add_directive("tokio=off".parse().unwrap())
154+
.add_directive("process_wrap=off".parse().unwrap())
141155
.add_directive("h2=off".parse().unwrap())
142156
.add_directive("reqwest=off".parse().unwrap());
157+
let env_log = std::env::var("RUST_LOG").unwrap_or_default();
158+
if !env_log.contains("dora_daemon") {
159+
filter_otel = filter_otel.add_directive("dora_daemon=debug".parse().unwrap());
160+
}
143161
self.layers.push(
144162
OpenTelemetryLayer::new(tracer)
145163
.with_filter(filter_otel)
@@ -192,3 +210,63 @@ impl Drop for OtelGuard {
192210
self.tracer_provider.shutdown().ok();
193211
}
194212
}
213+
214+
/// Initialize tracing with OTLP (if configured) or stdout/file logging.
215+
///
216+
/// This function should be called after creating a tokio runtime and calling `runtime.enter()`.
217+
///
218+
/// # Parameters
219+
/// - `name`: Service name for tracing
220+
/// - `stdout_filter`: Optional RUST_LOG-style filter for stdout logging (e.g., "info", "debug")
221+
/// - `file_name`: Optional filename for file logging (will be placed in `out/` directory)
222+
/// - `file_filter`: Level filter for file logging (only used if `file_name` is Some)
223+
///
224+
/// # Returns
225+
/// Returns `Option<OtelGuard>` which must be kept alive for the duration of the program.
226+
/// When dropped, the guard will flush and shutdown telemetry providers.
227+
///
228+
/// # Example
229+
/// ```no_run
230+
/// use dora_tracing::init_tracing_subscriber;
231+
/// use tracing::level_filters::LevelFilter;
232+
///
233+
/// // Note: This function requires a tokio runtime context to be active
234+
/// // when using OTLP tracing. Use runtime.enter() before calling.
235+
/// let _guard = init_tracing_subscriber(
236+
/// "my-service",
237+
/// Some("info"),
238+
/// Some("my-service"),
239+
/// LevelFilter::INFO,
240+
/// ).unwrap();
241+
/// ```
242+
pub fn init_tracing_subscriber(
243+
name: &str,
244+
stdout_filter: Option<&str>,
245+
file_name: Option<&str>,
246+
file_filter: LevelFilter,
247+
) -> eyre::Result<Option<OtelGuard>> {
248+
let mut builder = TracingBuilder::new(name);
249+
let guard: Option<OtelGuard>;
250+
251+
if std::env::var("DORA_OTLP_ENDPOINT").is_ok() || std::env::var("DORA_JAEGER_TRACING").is_ok() {
252+
builder = builder
253+
.with_otlp_tracing()
254+
.wrap_err("failed to set up OTLP tracing")?;
255+
guard = builder.guard.take();
256+
} else {
257+
if let Some(filter) = stdout_filter {
258+
builder = builder.with_stdout(filter, false);
259+
}
260+
guard = None;
261+
}
262+
263+
if let Some(filename) = file_name {
264+
builder = builder.with_file(filename, file_filter)?;
265+
}
266+
267+
builder
268+
.build()
269+
.wrap_err("failed to set up tracing subscriber")?;
270+
271+
Ok(guard)
272+
}

libraries/extensions/telemetry/tracing/src/telemetry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl Extractor for MetadataMap<'_> {
3939
/// docker run -d -p 4317:4317 -p 4318:4318 -p 16686:16686 jaegertracing/all-in-one:latest
4040
/// ```
4141
///
42-
pub fn init_tracing(name: &str, endpoint: &str) -> sdktrace::SdkTracerProvider {
42+
pub fn init_tracing(_name: &str, endpoint: &str) -> sdktrace::SdkTracerProvider {
4343
let exporter = opentelemetry_otlp::SpanExporter::builder()
4444
.with_tonic()
4545
.with_endpoint(endpoint)

libraries/message/src/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{borrow::Cow, collections::BTreeMap};
44
use aligned_vec::{AVec, ConstAlign};
55
use chrono::{DateTime, Utc};
66
use eyre::Context as _;
7-
use serde::{Deserialize, Deserializer};
7+
use serde::Deserialize;
88
use uuid::Uuid;
99

1010
use crate::{BuildId, DataflowId, daemon_to_daemon::InterDaemonEvent, id::NodeId};

0 commit comments

Comments
 (0)