-
Notifications
You must be signed in to change notification settings - Fork 17
Description
Context
We have an asynchronous service which uses iceberg-compaction to compact tables that we manage. We aim to run the job once per hour per table. These are tables created by customers, and we have no control into the data that is stored. We are using the SmallFiles planning config, with the BinPack grouping strategy. Here's an example of the planning config we use in prod.
const MB: u64 = 1024 * 1024;
let target_file_size_mb = job.config.configured_size_mb();
let group_filters = GroupFiltersBuilder::default()
.min_group_file_count(5)
.build()
.unwrap();
let planning_config = SmallFilesConfigBuilder::default()
.target_file_size_bytes(target_file_size_mb * MB)
.small_file_threshold_bytes(target_file_size_mb * 0.7) // 70% of target
.max_parallelism(16)
.min_size_per_partition(512 * MB)
.grouping_strategy(GroupingStrategy::BinPack(BinPackConfig::new(
target_file_size_mb * 8, // group size == 8x target_file_size
)))
.group_filters(group_filters)
.build()
.unwrap();Problem
We have observed compaction run again and again for tables which have not added or removed data, even for days at a time. We would expect the compaction job to produce files that are either larger than our configured max size, or fewer than the min_group_file_count, so that subsequent jobs would not run at all. This is impacting tables which are partitioned.
Example
Here are some stats from a table (which we happen to own). As we can see, it is about 6.9MB, and it has 367 partitions. This job runs hourly, even though the underlying table has not changed.
input_files_count: 367, output_files_count: 367, input_total_bytes: 6915998, output_total_bytes: 6916118,
input_files_count: 367, output_files_count: 367, input_total_bytes: 6916006, output_total_bytes: 6915998
input_files_count: 367, output_files_count: 367, input_total_bytes: 6916038, output_total_bytes: 6916006,
input_files_count: 367, output_files_count: 367, input_total_bytes: 6916006, output_total_bytes: 6916038,
input_files_count: 367, output_files_count: 367, input_total_bytes: 6916046, output_total_bytes: 6916006,
6.9MB is trivial, but it demonstrates the problem in general. It is impacting much larger tables as well.
Root Cause
The compaction library's grouping strategy does not first group by partition before applying filters. Partitioning is only applied which writes occur.
Output files are correctly partitioned. In the case of some of our tables, those partitioned files will be smaller than the configured target file size. And although we have the min_group_file_count set to 5, it applies globally to all "small files" in the table, and not merely to tables within a partition.
Verification of the Problem
I wrote an integration test in the iceberg-compaction library that reproduces the problem exactly. I'll open the PR with the integ test first to help demonstrate the issue (I'm also more than willing to contribute a fix as a next step).
Our Expectation
Our expectation is that the min_group_file_count will apply the filter to groups of files which are first grouped by partition. In the case of the table with 367 files, we would expect 367 file groups with a single file in each. All groups would be excluded because they only have one file (less than the min threshold of 5 files).
In Spark's compaction library, file groups are separated by partition before they are sorted into groups. From the documentation of the max-file-group-size-bytes configuration (link), it states:
The entire rewrite operation is broken down into pieces based on partitioning and within partitions based on size into file-groups.
We expect the same filtering logic to apply here.