Skip to content

Commit 6843f96

Browse files
morrisonlevirealFlowControlclaude
authored
feat(profiling): internal metrics for overhead (#3616)
* feat(profiling): internal metrics for overhead * feat(profiling): move CPU time capture to include serialization for `ddprof_upload` for current profile exported Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * feat(profiling): add CPU time tracking for `ddprof_time` thread Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * feat(profiling): separate CPU time tracking per background thread Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Florian Engelhardt <florian.engelhardt@datadoghq.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 32aaf0a commit 6843f96

File tree

2 files changed

+128
-19
lines changed

2 files changed

+128
-19
lines changed

profiling/src/profiling/mod.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::{Clocks, CLOCKS, TAGS};
2525
use chrono::Utc;
2626
use core::mem::forget;
2727
use core::{ptr, str};
28+
use cpu_time::ThreadTime;
2829
use crossbeam_channel::{Receiver, Sender, TrySendError};
2930
use libdd_profiling::api::{
3031
Function, Label as ApiLabel, Location, Period, Sample, UpscalingInfo, ValueType as ApiValueType,
@@ -36,7 +37,7 @@ use std::borrow::Cow;
3637
use std::collections::HashMap;
3738
use std::hash::Hash;
3839
use std::num::NonZeroI64;
39-
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, Ordering};
40+
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, Ordering};
4041
use std::sync::{Arc, Barrier, OnceLock};
4142
use std::thread::JoinHandle;
4243
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
@@ -61,6 +62,32 @@ const UPLOAD_CHANNEL_CAPACITY: usize = 8;
6162
/// minit, and is destroyed on mshutdown.
6263
static mut PROFILER: OnceLock<Profiler> = OnceLock::new();
6364

65+
pub static STACK_WALK_COUNT: AtomicU64 = AtomicU64::new(0);
66+
pub static STACK_WALK_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
67+
pub static DDPROF_TIME_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
68+
pub static DDPROF_UPLOAD_CPU_TIME_NS: AtomicU64 = AtomicU64::new(0);
69+
70+
fn cpu_time_delta_ns(now: ThreadTime, prev: ThreadTime) -> u64 {
71+
match now.as_duration().checked_sub(prev.as_duration()) {
72+
Some(duration) => duration.as_nanos().try_into().unwrap_or(u64::MAX),
73+
None => 0,
74+
}
75+
}
76+
77+
pub(crate) fn update_cpu_time_counter(last: &mut Option<ThreadTime>, counter: &AtomicU64) {
78+
let Some(prev) = last.take() else {
79+
*last = ThreadTime::try_now().ok();
80+
return;
81+
};
82+
if let Ok(now) = ThreadTime::try_now() {
83+
let elapsed_ns = cpu_time_delta_ns(now, prev);
84+
counter.fetch_add(elapsed_ns, Ordering::Relaxed);
85+
*last = Some(now);
86+
} else {
87+
*last = Some(prev);
88+
}
89+
}
90+
6491
/// Order this array this way:
6592
/// 1. Always enabled types.
6693
/// 2. On by default types.
@@ -569,6 +596,7 @@ impl TimeCollector {
569596
let upload_tick = crossbeam_channel::tick(self.upload_period);
570597
let never = crossbeam_channel::never();
571598
let mut running = true;
599+
let mut last_cpu = ThreadTime::try_now().ok();
572600

573601
while running {
574602
// The crossbeam_channel::select! doesn't have the ability to
@@ -594,6 +622,7 @@ impl TimeCollector {
594622
Self::handle_resource_message(message, &mut profiles),
595623
ProfilerMessage::Cancel => {
596624
// flush what we have before exiting
625+
update_cpu_time_counter(&mut last_cpu, &DDPROF_TIME_CPU_TIME_NS);
597626
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
598627
running = false;
599628
},
@@ -631,6 +660,7 @@ impl TimeCollector {
631660

632661
recv(upload_tick) -> message => {
633662
if message.is_ok() {
663+
update_cpu_time_counter(&mut last_cpu, &DDPROF_TIME_CPU_TIME_NS);
634664
last_wall_export = self.handle_timeout(&mut profiles, &last_wall_export);
635665
}
636666
},
@@ -903,13 +933,29 @@ impl Profiler {
903933
unsafe { (*system_settings).profiling_timeline_enabled }
904934
}
905935

936+
fn collect_stack_sample_timed(
937+
&self,
938+
execute_data: *mut zend_execute_data,
939+
) -> Result<Backtrace, CollectStackSampleError> {
940+
let start = ThreadTime::try_now().ok();
941+
let result = collect_stack_sample(execute_data);
942+
STACK_WALK_COUNT.fetch_add(1, Ordering::Relaxed);
943+
if let Some(start) = start {
944+
if let Ok(end) = ThreadTime::try_now() {
945+
let elapsed_ns = cpu_time_delta_ns(end, start);
946+
STACK_WALK_CPU_TIME_NS.fetch_add(elapsed_ns, Ordering::Relaxed);
947+
}
948+
}
949+
result
950+
}
951+
906952
/// Collect a stack sample with elapsed wall time. Collects CPU time if
907953
/// it's enabled and available.
908954
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "debug"))]
909955
pub fn collect_time(&self, execute_data: *mut zend_execute_data, interrupt_count: u32) {
910956
// todo: should probably exclude the wall and CPU time used by collecting the sample.
911957
let interrupt_count = interrupt_count as i64;
912-
let result = collect_stack_sample(execute_data);
958+
let result = self.collect_stack_sample_timed(execute_data);
913959
match result {
914960
Ok(frames) => {
915961
let depth = frames.len();
@@ -957,7 +1003,7 @@ impl Profiler {
9571003
alloc_size: i64,
9581004
interrupt_count: Option<u32>,
9591005
) {
960-
let result = collect_stack_sample(execute_data);
1006+
let result = self.collect_stack_sample_timed(execute_data);
9611007
match result {
9621008
Ok(frames) => {
9631009
let depth = frames.len();
@@ -1010,7 +1056,7 @@ impl Profiler {
10101056
exception: String,
10111057
message: Option<String>,
10121058
) {
1013-
let result = collect_stack_sample(execute_data);
1059+
let result = self.collect_stack_sample_timed(execute_data);
10141060
match result {
10151061
Ok(frames) => {
10161062
let depth = frames.len();
@@ -1410,7 +1456,7 @@ impl Profiler {
14101456
where
14111457
F: FnOnce(&mut SampleValues),
14121458
{
1413-
let result = collect_stack_sample(execute_data);
1459+
let result = self.collect_stack_sample_timed(execute_data);
14141460
match result {
14151461
Ok(frames) => {
14161462
let depth = frames.len();

profiling/src/profiling/uploader.rs

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use crate::config::AgentEndpoint;
2-
use crate::profiling::{UploadMessage, UploadRequest};
2+
use crate::profiling::{
3+
update_cpu_time_counter, UploadMessage, UploadRequest, DDPROF_TIME_CPU_TIME_NS,
4+
DDPROF_UPLOAD_CPU_TIME_NS, STACK_WALK_COUNT, STACK_WALK_CPU_TIME_NS,
5+
};
36
use crate::{PROFILER_NAME_STR, PROFILER_VERSION_STR};
47
use chrono::{DateTime, Utc};
8+
use cpu_time::ThreadTime;
59
use crossbeam_channel::{select, Receiver};
610
use libdd_common::Endpoint;
711
use log::{debug, info, warn};
@@ -14,7 +18,6 @@ use std::sync::{Arc, Barrier};
1418
use crate::allocation::{ALLOCATION_PROFILING_COUNT, ALLOCATION_PROFILING_SIZE};
1519
#[cfg(feature = "debug_stats")]
1620
use crate::exception::EXCEPTION_PROFILING_EXCEPTION_COUNT;
17-
#[cfg(feature = "debug_stats")]
1821
use std::sync::atomic::Ordering;
1922

2023
pub struct Uploader {
@@ -44,13 +47,41 @@ impl Uploader {
4447

4548
/// This function will not only create the internal metadata JSON representation, but is also
4649
/// in charge to reset all those counters back to 0.
47-
#[cfg(feature = "debug_stats")]
4850
fn create_internal_metadata() -> Option<serde_json::Value> {
49-
Some(json!({
50-
"exceptions_count": EXCEPTION_PROFILING_EXCEPTION_COUNT.swap(0, Ordering::Relaxed),
51-
"allocations_count": ALLOCATION_PROFILING_COUNT.swap(0, Ordering::Relaxed),
52-
"allocations_size": ALLOCATION_PROFILING_SIZE.swap(0, Ordering::Relaxed),
53-
}))
51+
let capacity = 4 + cfg!(feature = "debug_stats") as usize * 3;
52+
let mut metadata = serde_json::Map::with_capacity(capacity);
53+
metadata.insert(
54+
"stack_walk_count".to_string(),
55+
json!(STACK_WALK_COUNT.swap(0, Ordering::Relaxed)),
56+
);
57+
metadata.insert(
58+
"stack_walk_cpu_time_ns".to_string(),
59+
json!(STACK_WALK_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
60+
);
61+
metadata.insert(
62+
"ddprof_time_cpu_time_ns".to_string(),
63+
json!(DDPROF_TIME_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
64+
);
65+
metadata.insert(
66+
"ddprof_upload_cpu_time_ns".to_string(),
67+
json!(DDPROF_UPLOAD_CPU_TIME_NS.swap(0, Ordering::Relaxed)),
68+
);
69+
#[cfg(feature = "debug_stats")]
70+
{
71+
metadata.insert(
72+
"exceptions_count".to_string(),
73+
json!(EXCEPTION_PROFILING_EXCEPTION_COUNT.swap(0, Ordering::Relaxed)),
74+
);
75+
metadata.insert(
76+
"allocations_count".to_string(),
77+
json!(ALLOCATION_PROFILING_COUNT.swap(0, Ordering::Relaxed)),
78+
);
79+
metadata.insert(
80+
"allocations_size".to_string(),
81+
json!(ALLOCATION_PROFILING_SIZE.swap(0, Ordering::Relaxed)),
82+
);
83+
}
84+
Some(serde_json::Value::Object(metadata))
5485
}
5586

5687
fn create_profiler_info(&self) -> Option<serde_json::Value> {
@@ -62,7 +93,11 @@ impl Uploader {
6293
Some(metadata)
6394
}
6495

65-
fn upload(&self, message: Box<UploadRequest>) -> anyhow::Result<u16> {
96+
fn upload(
97+
&self,
98+
message: Box<UploadRequest>,
99+
last_cpu: &mut Option<ThreadTime>,
100+
) -> anyhow::Result<u16> {
66101
let index = message.index;
67102
let profile = message.profile;
68103

@@ -83,16 +118,18 @@ impl Uploader {
83118
let serialized =
84119
profile.serialize_into_compressed_pprof(Some(message.end_time), message.duration)?;
85120
exporter.set_timeout(10000); // 10 seconds in milliseconds
121+
122+
// Capture CPU time up to this point. Note: metadata generation, exporter
123+
// building, and HTTP request time will be attributed to the next profile.
124+
update_cpu_time_counter(last_cpu, &DDPROF_UPLOAD_CPU_TIME_NS);
125+
86126
let request = exporter.build(
87127
serialized,
88128
&[],
89129
&[],
90130
None,
91131
None,
92-
#[cfg(feature = "debug_stats")]
93132
Self::create_internal_metadata(),
94-
#[cfg(not(feature = "debug_stats"))]
95-
None,
96133
self.create_profiler_info(),
97134
)?;
98135
debug!("Sending profile to: {agent_endpoint}");
@@ -106,7 +143,7 @@ impl Uploader {
106143
*/
107144
let pprof_filename = &self.output_pprof;
108145
let mut i = 0;
109-
146+
let mut last_cpu = ThreadTime::try_now().ok();
110147
loop {
111148
/* Since profiling uploads are going over the Internet and not just
112149
* the local network, it would be ideal if they were the lowest
@@ -132,7 +169,7 @@ impl Uploader {
132169
std::fs::write(&name, r.buffer).expect("write to succeed");
133170
info!("Successfully wrote profile to {name}");
134171
},
135-
None => match self.upload(request) {
172+
None => match self.upload(request, &mut last_cpu) {
136173
Ok(status) => {
137174
if status >= 400 {
138175
warn!("Unexpected HTTP status when sending profile (HTTP {status}).")
@@ -169,6 +206,10 @@ mod tests {
169206
#[test]
170207
fn test_create_internal_metadata() {
171208
// Set up all counters with known values
209+
STACK_WALK_COUNT.store(7, Ordering::Relaxed);
210+
STACK_WALK_CPU_TIME_NS.store(9000, Ordering::Relaxed);
211+
DDPROF_TIME_CPU_TIME_NS.store(1234, Ordering::Relaxed);
212+
DDPROF_UPLOAD_CPU_TIME_NS.store(5678, Ordering::Relaxed);
172213
EXCEPTION_PROFILING_EXCEPTION_COUNT.store(42, Ordering::Relaxed);
173214
ALLOCATION_PROFILING_COUNT.store(100, Ordering::Relaxed);
174215
ALLOCATION_PROFILING_SIZE.store(1024, Ordering::Relaxed);
@@ -181,6 +222,28 @@ mod tests {
181222
let metadata = metadata.unwrap();
182223

183224
// The metadata should contain all counts
225+
assert_eq!(
226+
metadata.get("stack_walk_count").and_then(|v| v.as_u64()),
227+
Some(7)
228+
);
229+
assert_eq!(
230+
metadata
231+
.get("stack_walk_cpu_time_ns")
232+
.and_then(|v| v.as_u64()),
233+
Some(9000)
234+
);
235+
assert_eq!(
236+
metadata
237+
.get("ddprof_time_cpu_time_ns")
238+
.and_then(|v| v.as_u64()),
239+
Some(1234)
240+
);
241+
assert_eq!(
242+
metadata
243+
.get("ddprof_upload_cpu_time_ns")
244+
.and_then(|v| v.as_u64()),
245+
Some(5678)
246+
);
184247

185248
assert_eq!(
186249
metadata.get("exceptions_count").and_then(|v| v.as_u64()),

0 commit comments

Comments
 (0)