Skip to content

Commit 073174b

Browse files
authored
feat: Show the number of matched Parquet pages in DataSourceExec (#19977)
## Which issue does this PR close? - Closes #19875. ## Rationale for this change Show the number of matched (and pruned) pages in the explain analyze plan to help make decisions about file optimization. Example: ```sql DataSourceExec: ..., metrics=[ ... page_index_rows_pruned=1.00 K total → 100 matched, page_index_pages_pruned=100 total → 10 matched, ... ] ``` ## What changes are included in this PR? - Added `page_index_pages_pruned` metric to DataSourceExec. - Updated and extended existing tests. ## Are these changes tested? Yes. ## Are there any user-facing changes? New metric in the explain plans.
1 parent adddd4c commit 073174b

File tree

8 files changed

+91
-29
lines changed

8 files changed

+91
-29
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ mod tests {
995995
assert_eq!(read, 1, "Expected 1 rows to match the predicate");
996996
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
997997
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
998+
assert_eq!(get_value(&metrics, "page_index_pages_pruned"), 1);
998999
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
9991000
// If we filter with a value that is completely out of the range of the data
10001001
// we prune at the row group level.
@@ -1168,10 +1169,16 @@ mod tests {
11681169
// There are 4 rows pruned in each of batch2, batch3, and
11691170
// batch4 for a total of 12. batch1 had no pruning as c2 was
11701171
// filled in as null
1171-
let (page_index_pruned, page_index_matched) =
1172+
let (page_index_rows_pruned, page_index_rows_matched) =
11721173
get_pruning_metric(&metrics, "page_index_rows_pruned");
1173-
assert_eq!(page_index_pruned, 12);
1174-
assert_eq!(page_index_matched, 6);
1174+
assert_eq!(page_index_rows_pruned, 12);
1175+
assert_eq!(page_index_rows_matched, 6);
1176+
1177+
// each page has 2 rows, so the num of pages is 1/2 the number of rows
1178+
let (page_index_pages_pruned, page_index_pages_matched) =
1179+
get_pruning_metric(&metrics, "page_index_pages_pruned");
1180+
assert_eq!(page_index_pages_pruned, 6);
1181+
assert_eq!(page_index_pages_matched, 3);
11751182
}
11761183

11771184
#[tokio::test]
@@ -1734,6 +1741,7 @@ mod tests {
17341741
Some(3),
17351742
Some(4),
17361743
Some(5),
1744+
Some(6), // last page with only one row
17371745
]));
17381746
let batch1 = create_batch(vec![("int", c1.clone())]);
17391747

@@ -1742,7 +1750,7 @@ mod tests {
17421750
let rt = RoundTrip::new()
17431751
.with_predicate(filter)
17441752
.with_page_index_predicate()
1745-
.round_trip(vec![batch1])
1753+
.round_trip(vec![batch1.clone()])
17461754
.await;
17471755

17481756
let metrics = rt.parquet_exec.metrics().unwrap();
@@ -1755,14 +1763,40 @@ mod tests {
17551763
| 5 |
17561764
+-----+
17571765
");
1758-
let (page_index_pruned, page_index_matched) =
1766+
let (page_index_rows_pruned, page_index_rows_matched) =
17591767
get_pruning_metric(&metrics, "page_index_rows_pruned");
1760-
assert_eq!(page_index_pruned, 4);
1761-
assert_eq!(page_index_matched, 2);
1768+
assert_eq!(page_index_rows_pruned, 5);
1769+
assert_eq!(page_index_rows_matched, 2);
17621770
assert!(
17631771
get_value(&metrics, "page_index_eval_time") > 0,
17641772
"no eval time in metrics: {metrics:#?}"
17651773
);
1774+
1775+
// each page has 2 rows, so the num of pages is 1/2 the number of rows
1776+
let (page_index_pages_pruned, page_index_pages_matched) =
1777+
get_pruning_metric(&metrics, "page_index_pages_pruned");
1778+
assert_eq!(page_index_pages_pruned, 3);
1779+
assert_eq!(page_index_pages_matched, 1);
1780+
1781+
// test with a filter that matches the page with one row
1782+
let filter = col("int").eq(lit(6_i32));
1783+
let rt = RoundTrip::new()
1784+
.with_predicate(filter)
1785+
.with_page_index_predicate()
1786+
.round_trip(vec![batch1])
1787+
.await;
1788+
1789+
let metrics = rt.parquet_exec.metrics().unwrap();
1790+
1791+
let (page_index_rows_pruned, page_index_rows_matched) =
1792+
get_pruning_metric(&metrics, "page_index_rows_pruned");
1793+
assert_eq!(page_index_rows_pruned, 6);
1794+
assert_eq!(page_index_rows_matched, 1);
1795+
1796+
let (page_index_pages_pruned, page_index_pages_matched) =
1797+
get_pruning_metric(&metrics, "page_index_pages_pruned");
1798+
assert_eq!(page_index_pages_pruned, 3);
1799+
assert_eq!(page_index_pages_matched, 1);
17661800
}
17671801

17681802
/// Returns a string array with contents:

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -879,12 +879,13 @@ async fn parquet_explain_analyze() {
879879
let i_rowgroup_stat = formatted.find("row_groups_pruned_statistics").unwrap();
880880
let i_rowgroup_bloomfilter =
881881
formatted.find("row_groups_pruned_bloom_filter").unwrap();
882-
let i_page = formatted.find("page_index_rows_pruned").unwrap();
882+
let i_page_rows = formatted.find("page_index_rows_pruned").unwrap();
883+
let i_page_pages = formatted.find("page_index_pages_pruned").unwrap();
883884

884885
assert!(
885886
(i_file < i_rowgroup_stat)
886887
&& (i_rowgroup_stat < i_rowgroup_bloomfilter)
887-
&& (i_rowgroup_bloomfilter < i_page),
888+
&& (i_rowgroup_bloomfilter < i_page_pages && i_page_pages < i_page_rows),
888889
"The parquet pruning metrics should be displayed in an order of: file range -> row group statistics -> row group bloom filter -> page index."
889890
);
890891
}

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ pub struct ParquetFileMetrics {
6565
pub bloom_filter_eval_time: Time,
6666
/// Total rows filtered or matched by parquet page index
6767
pub page_index_rows_pruned: PruningMetrics,
68+
/// Total pages filtered or matched by parquet page index
69+
pub page_index_pages_pruned: PruningMetrics,
6870
/// Total time spent evaluating parquet page index filters
6971
pub page_index_eval_time: Time,
7072
/// Total time spent reading and parsing metadata from the footer
@@ -121,6 +123,11 @@ impl ParquetFileMetrics {
121123
.with_type(MetricType::SUMMARY)
122124
.pruning_metrics("page_index_rows_pruned", partition);
123125

126+
let page_index_pages_pruned = MetricBuilder::new(metrics)
127+
.with_new_label("filename", filename.to_string())
128+
.with_type(MetricType::SUMMARY)
129+
.pruning_metrics("page_index_pages_pruned", partition);
130+
124131
let bytes_scanned = MetricBuilder::new(metrics)
125132
.with_new_label("filename", filename.to_string())
126133
.with_type(MetricType::SUMMARY)
@@ -191,6 +198,7 @@ impl ParquetFileMetrics {
191198
pushdown_rows_matched,
192199
row_pushdown_eval_time,
193200
page_index_rows_pruned,
201+
page_index_pages_pruned,
194202
statistics_eval_time,
195203
bloom_filter_eval_time,
196204
page_index_eval_time,

datafusion/datasource-parquet/src/page_filter.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ impl PagePruningAccessPlanFilter {
189189
let mut total_skip = 0;
190190
// track the total number of rows that should not be skipped
191191
let mut total_select = 0;
192+
// track the total number of pages that should be skipped
193+
let mut total_pages_skip = 0;
194+
// track the total number of pages that should not be skipped
195+
let mut total_pages_select = 0;
192196

193197
// for each row group specified in the access plan
194198
let row_group_indexes = access_plan.row_group_indexes();
@@ -226,10 +230,12 @@ impl PagePruningAccessPlanFilter {
226230
file_metrics,
227231
);
228232

229-
let Some(selection) = selection else {
233+
let Some((selection, total_pages, matched_pages)) = selection else {
230234
trace!("No pages pruned in prune_pages_in_one_row_group");
231235
continue;
232236
};
237+
total_pages_select += matched_pages;
238+
total_pages_skip += total_pages - matched_pages;
233239

234240
debug!(
235241
"Use filter and page index to create RowSelection {:?} from predicate: {:?}",
@@ -278,6 +284,12 @@ impl PagePruningAccessPlanFilter {
278284
file_metrics
279285
.page_index_rows_pruned
280286
.add_matched(total_select);
287+
file_metrics
288+
.page_index_pages_pruned
289+
.add_pruned(total_pages_skip);
290+
file_metrics
291+
.page_index_pages_pruned
292+
.add_matched(total_pages_select);
281293
access_plan
282294
}
283295

@@ -297,7 +309,8 @@ fn update_selection(
297309
}
298310
}
299311

300-
/// Returns a [`RowSelection`] for the rows in this row group to scan.
312+
/// Returns a [`RowSelection`] for the rows in this row group to scan, in addition to the number of
313+
/// total and matched pages.
301314
///
302315
/// This Row Selection is formed from the page index and the predicate skips row
303316
/// ranges that can be ruled out based on the predicate.
@@ -310,7 +323,7 @@ fn prune_pages_in_one_row_group(
310323
converter: StatisticsConverter<'_>,
311324
parquet_metadata: &ParquetMetaData,
312325
metrics: &ParquetFileMetrics,
313-
) -> Option<RowSelection> {
326+
) -> Option<(RowSelection, usize, usize)> {
314327
let pruning_stats =
315328
PagesPruningStatistics::try_new(row_group_index, converter, parquet_metadata)?;
316329

@@ -362,7 +375,11 @@ fn prune_pages_in_one_row_group(
362375
RowSelector::skip(sum_row)
363376
};
364377
vec.push(selector);
365-
Some(RowSelection::from(vec))
378+
379+
let total_pages = values.len();
380+
let matched_pages = values.iter().filter(|v| **v).count();
381+
382+
Some((RowSelection::from(vec), total_pages, matched_pages))
366383
}
367384

368385
/// Implement [`PruningStatistics`] for one column's PageIndex (column_index + offset_index)

datafusion/physical-expr-common/src/metrics/value.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -984,20 +984,21 @@ impl MetricValue {
984984
"files_ranges_pruned_statistics" => 4,
985985
"row_groups_pruned_statistics" => 5,
986986
"row_groups_pruned_bloom_filter" => 6,
987-
"page_index_rows_pruned" => 7,
988-
_ => 8,
987+
"page_index_pages_pruned" => 7,
988+
"page_index_rows_pruned" => 8,
989+
_ => 9,
989990
},
990-
Self::SpillCount(_) => 9,
991-
Self::SpilledBytes(_) => 10,
992-
Self::SpilledRows(_) => 11,
993-
Self::CurrentMemoryUsage(_) => 12,
994-
Self::Count { .. } => 13,
995-
Self::Gauge { .. } => 14,
996-
Self::Time { .. } => 15,
997-
Self::Ratio { .. } => 16,
998-
Self::StartTimestamp(_) => 17, // show timestamps last
999-
Self::EndTimestamp(_) => 18,
1000-
Self::Custom { .. } => 19,
991+
Self::SpillCount(_) => 10,
992+
Self::SpilledBytes(_) => 11,
993+
Self::SpilledRows(_) => 12,
994+
Self::CurrentMemoryUsage(_) => 13,
995+
Self::Count { .. } => 14,
996+
Self::Gauge { .. } => 15,
997+
Self::Time { .. } => 16,
998+
Self::Ratio { .. } => 17,
999+
Self::StartTimestamp(_) => 18, // show timestamps last
1000+
Self::EndTimestamp(_) => 19,
1001+
Self::Custom { .. } => 20,
10011002
}
10021003
}
10031004

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ Plan with Metrics
104104
03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, <slt:ignore>]
105105
04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>, selectivity=100% (10/10)]
106106
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
107-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, <slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_rows_pruned=10 total → 10 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18% (210/1.16 K)]
107+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, <slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=10 total → 10 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18% (210/1.16 K)]
108108

109109
statement ok
110110
set datafusion.explain.analyze_level = dev;

datafusion/sqllogictest/test_files/limit_pruning.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary;
6363
query TT
6464
explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3;
6565
----
66-
Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=3 total → 3 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]
66+
Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_rows_pruned=3 total → 3 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]
6767

6868
# limit_pruned_row_groups=0 total → 0 matched
6969
# because of order by, scan needs to preserve sort, so limit pruning is disabled
@@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species > 'M' AND s >= 50 orde
7272
----
7373
Plan with Metrics
7474
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
75-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=9 total → 9 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]
75+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, page_index_rows_pruned=9 total → 9 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]
7676

7777
statement ok
7878
drop table tracking_data;

docs/source/user-guide/explain-usage.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ Again, reading from bottom up:
226226
When predicate pushdown is enabled, `DataSourceExec` with `ParquetSource` gains the following metrics:
227227

228228
- `page_index_rows_pruned`: number of rows evaluated by page index filters. The metric reports both how many rows were considered in total and how many matched (were not pruned).
229+
- `page_index_pages_pruned`: number of pages evaluated by page index filters. The metric reports both how many pages were considered in total and how many matched (were not pruned).
229230
- `row_groups_pruned_bloom_filter`: number of row groups evaluated by Bloom Filters, reporting both total checked groups and groups that matched.
230231
- `row_groups_pruned_statistics`: number of row groups evaluated by row-group statistics (min/max), reporting both total checked groups and groups that matched.
231232
- `limit_pruned_row_groups`: number of row groups pruned by the limit.

0 commit comments

Comments
 (0)