33
44//! Vortex table provider metrics.
55use std:: sync:: Arc ;
6+ use std:: time:: Duration ;
67
78use datafusion_datasource:: file_scan_config:: FileScanConfig ;
89use datafusion_datasource:: source:: DataSourceExec ;
@@ -15,9 +16,11 @@ use datafusion_physical_plan::metrics::Gauge;
1516use datafusion_physical_plan:: metrics:: Label as DatafusionLabel ;
1617use datafusion_physical_plan:: metrics:: MetricValue as DatafusionMetricValue ;
1718use datafusion_physical_plan:: metrics:: MetricsSet ;
19+ use datafusion_physical_plan:: metrics:: Time ;
20+ use vortex:: error:: VortexExpect ;
21+ use vortex:: metrics:: Label ;
1822use vortex:: metrics:: Metric ;
19- use vortex:: metrics:: MetricId ;
20- use vortex:: metrics:: Tags ;
23+ use vortex:: metrics:: MetricValue ;
2124
2225use crate :: persistent:: source:: VortexSource ;
2326
@@ -59,7 +62,7 @@ impl ExecutionPlanVisitor for VortexMetricsFinder {
5962 . vx_metrics ( )
6063 . snapshot ( )
6164 . iter ( )
62- . flat_map ( | ( id , metric ) | metric_to_datafusion ( id , metric ) )
65+ . flat_map ( metric_to_datafusion)
6366 {
6467 set. push ( Arc :: new ( metric) ) ;
6568 }
@@ -72,74 +75,80 @@ impl ExecutionPlanVisitor for VortexMetricsFinder {
7275 }
7376}
7477
75- fn metric_to_datafusion ( id : MetricId , metric : & Metric ) -> impl Iterator < Item = DatafusionMetric > {
76- let ( partition, labels) = tags_to_datafusion ( id . tags ( ) ) ;
77- metric_value_to_datafusion ( id . name ( ) , metric)
78+ fn metric_to_datafusion ( metric : & Metric ) -> impl Iterator < Item = DatafusionMetric > {
79+ let ( partition, labels) = labels_to_datafusion ( metric . labels ( ) ) ;
80+ metric_value_to_datafusion ( metric . name ( ) , metric. value ( ) )
7881 . into_iter ( )
7982 . map ( move |metric_value| {
8083 DatafusionMetric :: new_with_labels ( metric_value, partition, labels. clone ( ) )
8184 } )
8285}
8386
84- fn tags_to_datafusion ( tags : & Tags ) -> ( Option < usize > , Vec < DatafusionLabel > ) {
87+ fn labels_to_datafusion ( tags : & [ Label ] ) -> ( Option < usize > , Vec < DatafusionLabel > ) {
8588 tags. iter ( )
86- . fold ( ( None , Vec :: new ( ) ) , |( mut partition, mut labels) , ( k , v ) | {
87- if k == PARTITION_LABEL {
88- partition = v . parse ( ) . ok ( ) ;
89+ . fold ( ( None , Vec :: new ( ) ) , |( mut partition, mut labels) , metric | {
90+ if metric . key ( ) == PARTITION_LABEL {
91+ partition = metric . value ( ) . parse ( ) . ok ( ) ;
8992 } else {
90- labels. push ( DatafusionLabel :: new ( k. to_string ( ) , v. to_string ( ) ) ) ;
93+ labels. push ( DatafusionLabel :: new (
94+ metric. key ( ) . to_string ( ) ,
95+ metric. value ( ) . to_string ( ) ,
96+ ) ) ;
9197 }
9298 ( partition, labels)
9399 } )
94100}
95101
96- fn metric_value_to_datafusion ( name : & str , metric : & Metric ) -> Vec < DatafusionMetricValue > {
102+ fn metric_value_to_datafusion ( name : & str , metric : & MetricValue ) -> Vec < DatafusionMetricValue > {
97103 match metric {
98- Metric :: Counter ( counter) => counter
99- . count ( )
104+ MetricValue :: Counter ( counter) => counter
105+ . value ( )
100106 . try_into ( )
101107 . into_iter ( )
102108 . map ( |count| df_counter ( name. to_string ( ) , count) )
103109 . collect ( ) ,
104- Metric :: Histogram ( hist) => {
110+ MetricValue :: Histogram ( hist) => {
105111 let mut res = Vec :: new ( ) ;
106- if let Ok ( count) = hist. count ( ) . try_into ( ) {
107- res. push ( df_counter ( format ! ( "{name}_count" ) , count) ) ;
108- }
109- let snapshot = hist. snapshot ( ) ;
110- if let Ok ( max) = snapshot. max ( ) . try_into ( ) {
111- res. push ( df_gauge ( format ! ( "{name}_max" ) , max) ) ;
112- }
113- if let Ok ( min) = snapshot. min ( ) . try_into ( ) {
114- res. push ( df_gauge ( format ! ( "{name}_min" ) , min) ) ;
115- }
116- if let Some ( p90) = f_to_u ( snapshot. value ( 0.90 ) ) {
117- res. push ( df_gauge ( format ! ( "{name}_p95" ) , p90) ) ;
118- }
119- if let Some ( p99) = f_to_u ( snapshot. value ( 0.99 ) ) {
120- res. push ( df_gauge ( format ! ( "{name}_p99" ) , p99) ) ;
112+
113+ res. push ( df_counter ( format ! ( "{name}_count" ) , hist. count ( ) ) ) ;
114+
115+ if !hist. is_empty ( ) {
116+ if let Some ( max) = f_to_u ( hist. quantile ( 1.0 ) . vortex_expect ( "must not be empty" ) ) {
117+ res. push ( df_gauge ( format ! ( "{name}_max" ) , max) ) ;
118+ }
119+
120+ if let Some ( min) = f_to_u ( hist. quantile ( 0.0 ) . vortex_expect ( "must not be empty" ) ) {
121+ res. push ( df_gauge ( format ! ( "{name}_min" ) , min) ) ;
122+ }
123+
124+ if let Some ( p95) = f_to_u ( hist. quantile ( 0.95 ) . vortex_expect ( "must not be empty" ) ) {
125+ res. push ( df_gauge ( format ! ( "{name}_p95" ) , p95) ) ;
126+ }
127+ if let Some ( p99) = f_to_u ( hist. quantile ( 0.99 ) . vortex_expect ( "must not be empty" ) ) {
128+ res. push ( df_gauge ( format ! ( "{name}_p99" ) , p99) ) ;
129+ }
121130 }
131+
122132 res
123133 }
124- Metric :: Timer ( timer) => {
134+ MetricValue :: Timer ( timer) => {
125135 let mut res = Vec :: new ( ) ;
126- if let Ok ( count) = timer. count ( ) . try_into ( ) {
127- res. push ( df_counter ( format ! ( "{name}_count" ) , count) ) ;
128- }
129- let snapshot = timer. snapshot ( ) ;
130- if let Ok ( max) = snapshot. max ( ) . try_into ( ) {
131- // NOTE(os): unlike Time metrics, gauges allow custom aggregation
132- res. push ( df_gauge ( format ! ( "{name}_max" ) , max) ) ;
133- }
134- if let Ok ( min) = snapshot. min ( ) . try_into ( ) {
135- res. push ( df_gauge ( format ! ( "{name}_min" ) , min) ) ;
136- }
137- if let Some ( p95) = f_to_u ( snapshot. value ( 0.95 ) ) {
138- res. push ( df_gauge ( format ! ( "{name}_p95" ) , p95) ) ;
139- }
140- if let Some ( p99) = f_to_u ( snapshot. value ( 0.95 ) ) {
141- res. push ( df_gauge ( format ! ( "{name}_p99" ) , p99) ) ;
136+ res. push ( df_counter ( format ! ( "{name}_count" ) , timer. count ( ) ) ) ;
137+
138+ if !timer. is_empty ( ) {
139+ let max = timer. quantile ( 1.0 ) . vortex_expect ( "must not be empty" ) ;
140+ res. push ( df_timer ( format ! ( "{name}_max" ) , max) ) ;
141+
142+ let min = timer. quantile ( 0.0 ) . vortex_expect ( "must not be empty" ) ;
143+ res. push ( df_timer ( format ! ( "{name}_min" ) , min) ) ;
144+
145+ let p95 = timer. quantile ( 0.95 ) . vortex_expect ( "must not be empty" ) ;
146+ res. push ( df_timer ( format ! ( "{name}_p95" ) , p95) ) ;
147+
148+ let p99 = timer. quantile ( 0.99 ) . vortex_expect ( "must not be empty" ) ;
149+ res. push ( df_timer ( format ! ( "{name}_p99" ) , p99) ) ;
142150 }
151+
143152 res
144153 }
145154 // TODO(os): add more metric types when added to VortexMetrics
@@ -165,6 +174,15 @@ fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
165174 }
166175}
167176
177+ fn df_timer ( name : String , value : Duration ) -> DatafusionMetricValue {
178+ let time = Time :: new ( ) ;
179+ time. add_duration ( value) ;
180+ DatafusionMetricValue :: Time {
181+ name : name. into ( ) ,
182+ time,
183+ }
184+ }
185+
168186#[ expect(
169187 clippy:: cast_possible_truncation,
170188 reason = "truncation is checked before cast"
0 commit comments