Skip to content

Commit 5e893ab

Browse files
authored
fix: predicate cache stats calculation (#19561)
## Which issue does this PR close? - Closes #19334 ## Rationale for this change Explained in issue ## What changes are included in this PR? Changed counter to gauge to set value provided by arrow-rs correctly ## Are these changes tested? Yes, have added a new test which fails with old code as it reports compounded metric ## Are there any user-facing changes? There are user-facing changes, but it's more of a fix to a metric reported to the user. No change to API itself.
1 parent 9208f12 commit 5e893ab

File tree

3 files changed

+31
-10
lines changed

3 files changed

+31
-10
lines changed

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,22 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
644644
.await
645645
}
646646

647+
#[tokio::test]
648+
async fn predicate_cache_stats_issue_19561() -> datafusion_common::Result<()> {
649+
let mut config = SessionConfig::new();
650+
config.options_mut().execution.parquet.pushdown_filters = true;
651+
// force to get multiple batches to trigger repeated metric compound bug
652+
config.options_mut().execution.batch_size = 1;
653+
let ctx = SessionContext::new_with_config(config);
654+
// The cache is on by default, and used when filter pushdown is enabled
655+
PredicateCacheTest {
656+
expected_inner_records: 8,
657+
expected_records: 4,
658+
}
659+
.run(&ctx)
660+
.await
661+
}
662+
647663
#[tokio::test]
648664
async fn predicate_cache_pushdown_default_selections_only()
649665
-> datafusion_common::Result<()> {

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion_physical_plan::metrics::{
19-
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics,
19+
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricType, PruningMetrics,
2020
RatioMergeStrategy, RatioMetrics, Time,
2121
};
2222

@@ -79,11 +79,16 @@ pub struct ParquetFileMetrics {
7979
/// Parquet.
8080
///
8181
/// This is the expensive path (IO + Decompression + Decoding).
82-
pub predicate_cache_inner_records: Count,
82+
///
83+
/// We use a Gauge here as arrow-rs reports absolute numbers rather
84+
/// than incremental readings, we want a `set` operation here rather
85+
/// than `add`. Earlier it was `Count`, which led to this issue:
86+
/// github.com/apache/datafusion/issues/19334
87+
pub predicate_cache_inner_records: Gauge,
8388
/// Predicate Cache: number of records read from the cache. This is the
8489
/// number of rows that were stored in the cache after evaluating predicates
8590
/// reused for the output.
86-
pub predicate_cache_records: Count,
91+
pub predicate_cache_records: Gauge,
8792
}
8893

8994
impl ParquetFileMetrics {
@@ -169,11 +174,11 @@ impl ParquetFileMetrics {
169174

170175
let predicate_cache_inner_records = MetricBuilder::new(metrics)
171176
.with_new_label("filename", filename.to_string())
172-
.counter("predicate_cache_inner_records", partition);
177+
.gauge("predicate_cache_inner_records", partition);
173178

174179
let predicate_cache_records = MetricBuilder::new(metrics)
175180
.with_new_label("filename", filename.to_string())
176-
.counter("predicate_cache_records", partition);
181+
.gauge("predicate_cache_records", partition);
177182

178183
Self {
179184
files_ranges_pruned_statistics,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_physical_expr_common::physical_expr::{
4747
PhysicalExpr, is_dynamic_physical_expr,
4848
};
4949
use datafusion_physical_plan::metrics::{
50-
Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics,
50+
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
5151
};
5252
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
5353

@@ -682,15 +682,15 @@ impl FileOpener for ParquetOpener {
682682
/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
683683
fn copy_arrow_reader_metrics(
684684
arrow_reader_metrics: &ArrowReaderMetrics,
685-
predicate_cache_inner_records: &Count,
686-
predicate_cache_records: &Count,
685+
predicate_cache_inner_records: &Gauge,
686+
predicate_cache_records: &Gauge,
687687
) {
688688
if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
689-
predicate_cache_inner_records.add(v);
689+
predicate_cache_inner_records.set(v);
690690
}
691691

692692
if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
693-
predicate_cache_records.add(v);
693+
predicate_cache_records.set(v);
694694
}
695695
}
696696

0 commit comments

Comments
 (0)