Skip to content

Commit e6049de

Browse files
authored
Make default ListingFilesCache table scoped (#19616)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Builds on #19388 - Closes #19573 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> This PR explores one way to make `ListFilesCache` table scoped. A session level cache is still used, but the cache key is made a "table-scoped" path, for which a new struct ``` pub struct TableScopedPath(pub Option<TableReference>, pub Path); ``` is defined. `TableReference` comes from `CreateExternalTable` passed to `ListingTableFactory::create`. Additionally, when a table is dropped, all entries related to a table is dropped by modifying `SessionContext::find_and_deregister` method. Testing (change on adding `list_files_cache()` for cli is included for easier testing). - Testing cache reuse on a single table. ``` > \object_store_profiling summary ObjectStore Profile mode set to Summary > create external table test stored as parquet location 's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/'; 0 row(s) fetched. Elapsed 14.878 seconds. Object Store Profiling Instrumented Object Store: instrument_mode: Summary, inner: AmazonS3(overturemaps-us-west-2) Summaries: +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Operation | Metric | min | max | avg | sum | count | +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Get | duration | 0.030597s | 0.209235s | 0.082396s | 36.254189s | 440 | | Get | size | 204782 B | 857230 B | 497304.88 B | 218814144 B | 440 | | List | duration | 0.192037s | 0.192037s | 0.192037s | 0.192037s | 1 | | List | size | | | | | 1 | +-----------+----------+-----------+-----------+-------------+-------------+-------+ > select table, path, unnest(metadata_list) from list_files_cache() limit 1; +-------+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | table | path | UNNEST(list_files_cache().metadata_list) | +-------+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | test | release/2025-12-17.0/theme=base | {file_path: release/2025-12-17.0/theme=base/type=bathymetry/part-00000-dd0f2f50-b436-4710-996f-f1b06181a3a1-c000.zstd.parquet, file_modified: 2025-12-17T22:32:50, file_size_bytes: 40280159, e_tag: "15090401f8f936c3f83bb498cb99a41d-3", version: NULL} | +-------+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.058 seconds. Object Store Profiling > select count(*) from test where type = 'infrastructure'; +-----------+ | count(*) | +-----------+ | 142969564 | +-----------+ 1 row(s) fetched. Elapsed 0.028 seconds. Object Store Profiling ``` - Test separate cache entries are created for two tables with same path ``` > create external table test2 stored as parquet location 's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/'; 0 row(s) fetched. Elapsed 14.798 seconds. Object Store Profiling Instrumented Object Store: instrument_mode: Summary, inner: AmazonS3(overturemaps-us-west-2) Summaries: +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Operation | Metric | min | max | avg | sum | count | +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Get | duration | 0.030238s | 0.350465s | 0.073670s | 32.414654s | 440 | | Get | size | 204782 B | 857230 B | 497304.88 B | 218814144 B | 440 | | List | duration | 0.133334s | 0.133334s | 0.133334s | 0.133334s | 1 | | List | size | | | | | 1 | +-----------+----------+-----------+-----------+-------------+-------------+-------+ > select count(*) from test2 where type = 'bathymetry'; +----------+ | count(*) | +----------+ | 59963 | +----------+ 1 row(s) fetched. Elapsed 0.009 seconds. Object Store Profiling > select table, path from list_files_cache(); +-------+---------------------------------+ | table | path | +-------+---------------------------------+ | test | release/2025-12-17.0/theme=base | | test2 | release/2025-12-17.0/theme=base | +-------+---------------------------------+ 2 row(s) fetched. Elapsed 0.004 seconds. ``` - Test cache associated with a table is dropped when table is dropped, and the other table with same path is unaffected. ``` > drop table test; 0 row(s) fetched. Elapsed 0.015 seconds. Object Store Profiling > select table, path from list_files_cache(); +-------+---------------------------------+ | table | path | +-------+---------------------------------+ | test2 | release/2025-12-17.0/theme=base | +-------+---------------------------------+ 1 row(s) fetched. Elapsed 0.005 seconds. Object Store Profiling > select count(*) from list_files_cache() where table = 'test'; +----------+ | count(*) | +----------+ | 0 | +----------+ 1 row(s) fetched. Elapsed 0.014 seconds. > select count(*) from test2 where type = 'infrastructure'; +-----------+ | count(*) | +-----------+ | 142969564 | +-----------+ 1 row(s) fetched. Elapsed 0.013 seconds. Object Store Profiling ``` - Test that dropping a view does not remove cache ``` > create view test2_view as (select * from test2 where type = 'infrastructure'); 0 row(s) fetched. Elapsed 0.103 seconds. Object Store Profiling > select count(*) from test2_view; +-----------+ | count(*) | +-----------+ | 142969564 | +-----------+ 1 row(s) fetched. Elapsed 0.094 seconds. Object Store Profiling > drop view test2_view; 0 row(s) fetched. Elapsed 0.002 seconds. Object Store Profiling > select table, path from list_files_cache(); +-------+---------------------------------+ | table | path | +-------+---------------------------------+ | test2 | release/2025-12-17.0/theme=base | +-------+---------------------------------+ 1 row(s) fetched. Elapsed 0.007 seconds. ``` ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 1d5d63c commit e6049de

File tree

10 files changed

+474
-184
lines changed

10 files changed

+474
-184
lines changed

datafusion-cli/src/functions.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,23 @@ impl TableFunctionImpl for StatisticsCacheFunc {
703703
}
704704
}
705705

706+
// Implementation of the `list_files_cache` table function in datafusion-cli.
707+
///
708+
/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object.
709+
/// DataFusion uses these cached results to plan queries against external tables.
710+
/// # Schema
711+
/// ```sql
712+
/// > describe select * from list_files_cache();
713+
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
714+
/// | column_name | data_type | is_nullable |
715+
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
716+
/// | table | Utf8 | NO |
717+
/// | path | Utf8 | NO |
718+
/// | metadata_size_bytes | UInt64 | NO |
719+
/// | expires_in | Duration(ms) | YES |
720+
/// | metadata_list | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES |
721+
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
722+
/// ```
706723
#[derive(Debug)]
707724
struct ListFilesCacheTable {
708725
schema: SchemaRef,
@@ -771,6 +788,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
771788
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
772789

773790
let schema = Arc::new(Schema::new(vec![
791+
Field::new("table", DataType::Utf8, false),
774792
Field::new("path", DataType::Utf8, false),
775793
Field::new("metadata_size_bytes", DataType::UInt64, false),
776794
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
@@ -786,6 +804,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
786804
),
787805
]));
788806

807+
let mut table_arr = vec![];
789808
let mut path_arr = vec![];
790809
let mut metadata_size_bytes_arr = vec![];
791810
let mut expires_arr = vec![];
@@ -802,7 +821,8 @@ impl TableFunctionImpl for ListFilesCacheFunc {
802821
let mut current_offset: i32 = 0;
803822

804823
for (path, entry) in list_files_cache.list_entries() {
805-
path_arr.push(path.to_string());
824+
table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string()));
825+
path_arr.push(path.path.to_string());
806826
metadata_size_bytes_arr.push(entry.size_bytes as u64);
807827
// calculates time left before entry expires
808828
expires_arr.push(
@@ -841,6 +861,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
841861
let batch = RecordBatch::try_new(
842862
schema.clone(),
843863
vec![
864+
Arc::new(StringArray::from(table_arr)),
844865
Arc::new(StringArray::from(path_arr)),
845866
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
846867
Arc::new(DurationMillisecondArray::from(expires_arr)),

datafusion-cli/src/main.rs

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -848,35 +848,4 @@ mod tests {
848848

849849
Ok(())
850850
}
851-
852-
#[tokio::test]
853-
async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> {
854-
let rt = RuntimeEnvBuilder::new()
855-
.with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None))
856-
.build_arc()
857-
.unwrap();
858-
859-
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
860-
861-
ctx.register_udtf(
862-
"list_files_cache",
863-
Arc::new(ListFilesCacheFunc::new(
864-
ctx.task_ctx().runtime_env().cache_manager.clone(),
865-
)),
866-
);
867-
868-
let rbs = ctx
869-
.sql("SELECT * FROM list_files_cache()")
870-
.await?
871-
.collect()
872-
.await?;
873-
assert_snapshot!(batches_to_string(&rbs),@r"
874-
+------+---------------------+------------+---------------+
875-
| path | metadata_size_bytes | expires_in | metadata_list |
876-
+------+---------------------+------------+---------------+
877-
+------+---------------------+------------+---------------+
878-
");
879-
880-
Ok(())
881-
}
882851
}

datafusion/catalog-listing/src/table.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3434
use datafusion_datasource::{
3535
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
3636
};
37+
use datafusion_execution::cache::TableScopedPath;
3738
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
3839
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
3940
use datafusion_expr::dml::InsertOp;
@@ -565,7 +566,11 @@ impl TableProvider for ListingTable {
565566

566567
// Invalidate cache entries for this table if they exist
567568
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
568-
let _ = lfc.remove(table_path.prefix());
569+
let key = TableScopedPath {
570+
table: table_path.get_table_ref().clone(),
571+
path: table_path.prefix().clone(),
572+
};
573+
let _ = lfc.remove(&key);
569574
}
570575

571576
// Sink related option, apart from format

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory {
6363
))?
6464
.create(session_state, &cmd.options)?;
6565

66-
let mut table_path = ListingTableUrl::parse(&cmd.location)?;
66+
let mut table_path =
67+
ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone());
6768
let file_extension = match table_path.is_collection() {
6869
// Setting the extension to be empty instead of allowing the default extension seems
6970
// odd, but was done to ensure existing behavior isn't modified. It seems like this
@@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory {
160161
}
161162
None => format!("*.{}", cmd.file_type.to_lowercase()),
162163
};
163-
table_path = table_path.with_glob(glob.as_ref())?;
164+
table_path = table_path
165+
.with_glob(glob.as_ref())?
166+
.with_table_ref(cmd.name.clone());
164167
}
165168
let schema = options.infer_schema(session_state, &table_path).await?;
166169
let df_schema = Arc::clone(&schema).to_dfschema()?;

datafusion/core/src/execution/context/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1322,7 +1322,7 @@ impl SessionContext {
13221322
let table = table_ref.table().to_owned();
13231323
let maybe_schema = {
13241324
let state = self.state.read();
1325-
let resolved = state.resolve_table_ref(table_ref);
1325+
let resolved = state.resolve_table_ref(table_ref.clone());
13261326
state
13271327
.catalog_list()
13281328
.catalog(&resolved.catalog)
@@ -1334,6 +1334,11 @@ impl SessionContext {
13341334
&& table_provider.table_type() == table_type
13351335
{
13361336
schema.deregister_table(&table)?;
1337+
if table_type == TableType::Base
1338+
&& let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache()
1339+
{
1340+
lfc.drop_table_entries(&Some(table_ref))?;
1341+
}
13371342
return Ok(true);
13381343
}
13391344

datafusion/datasource/src/url.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use std::sync::Arc;
1919

20-
use datafusion_common::{DataFusionError, Result};
20+
use datafusion_common::{DataFusionError, Result, TableReference};
21+
use datafusion_execution::cache::TableScopedPath;
2122
use datafusion_execution::object_store::ObjectStoreUrl;
2223
use datafusion_session::Session;
2324

@@ -41,6 +42,8 @@ pub struct ListingTableUrl {
4142
prefix: Path,
4243
/// An optional glob expression used to filter files
4344
glob: Option<Pattern>,
45+
46+
table_ref: Option<TableReference>,
4447
}
4548

4649
impl ListingTableUrl {
@@ -145,7 +148,12 @@ impl ListingTableUrl {
145148
/// to create a [`ListingTableUrl`].
146149
pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147150
let prefix = Path::from_url_path(url.path())?;
148-
Ok(Self { url, prefix, glob })
151+
Ok(Self {
152+
url,
153+
prefix,
154+
glob,
155+
table_ref: None,
156+
})
149157
}
150158

151159
/// Returns the URL scheme
@@ -255,7 +263,14 @@ impl ListingTableUrl {
255263
};
256264

257265
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
258-
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
266+
list_with_cache(
267+
ctx,
268+
store,
269+
self.table_ref.as_ref(),
270+
&self.prefix,
271+
prefix.as_ref(),
272+
)
273+
.await?
259274
} else {
260275
match store.head(&full_prefix).await {
261276
Ok(meta) => futures::stream::once(async { Ok(meta) })
@@ -264,7 +279,14 @@ impl ListingTableUrl {
264279
// If the head command fails, it is likely that object doesn't exist.
265280
// Retry as though it were a prefix (aka a collection)
266281
Err(object_store::Error::NotFound { .. }) => {
267-
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
282+
list_with_cache(
283+
ctx,
284+
store,
285+
self.table_ref.as_ref(),
286+
&self.prefix,
287+
prefix.as_ref(),
288+
)
289+
.await?
268290
}
269291
Err(e) => return Err(e.into()),
270292
}
@@ -323,6 +345,15 @@ impl ListingTableUrl {
323345
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
324346
Self::try_new(self.url, Some(glob))
325347
}
348+
349+
pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
350+
self.table_ref = Some(table_ref);
351+
self
352+
}
353+
354+
pub fn get_table_ref(&self) -> &Option<TableReference> {
355+
&self.table_ref
356+
}
326357
}
327358

328359
/// Lists files with cache support, using prefix-aware lookups.
@@ -345,6 +376,7 @@ impl ListingTableUrl {
345376
async fn list_with_cache<'b>(
346377
ctx: &'b dyn Session,
347378
store: &'b dyn ObjectStore,
379+
table_ref: Option<&TableReference>,
348380
table_base_path: &Path,
349381
prefix: Option<&Path>,
350382
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
@@ -367,9 +399,14 @@ async fn list_with_cache<'b>(
367399
// Convert prefix to Option<Path> for cache lookup
368400
let prefix_filter = prefix.cloned();
369401

402+
let table_scoped_base_path = TableScopedPath {
403+
table: table_ref.cloned(),
404+
path: table_base_path.clone(),
405+
};
406+
370407
// Try cache lookup with optional prefix filter
371408
let vec = if let Some(res) =
372-
cache.get_with_extra(table_base_path, &prefix_filter)
409+
cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
373410
{
374411
debug!("Hit list files cache");
375412
res.as_ref().clone()
@@ -380,7 +417,7 @@ async fn list_with_cache<'b>(
380417
.list(Some(table_base_path))
381418
.try_collect::<Vec<ObjectMeta>>()
382419
.await?;
383-
cache.put(table_base_path, Arc::new(vec.clone()));
420+
cache.put(&table_scoped_base_path, Arc::new(vec.clone()));
384421

385422
// If a prefix filter was requested, apply it to the results
386423
if prefix.is_some() {

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
use crate::cache::cache_unit::DefaultFilesMetadataCache;
1919
use crate::cache::list_files_cache::ListFilesEntry;
20+
use crate::cache::list_files_cache::TableScopedPath;
2021
use crate::cache::{CacheAccessor, DefaultListFilesCache};
22+
use datafusion_common::TableReference;
2123
use datafusion_common::stats::Precision;
2224
use datafusion_common::{Result, Statistics};
2325
use object_store::ObjectMeta;
@@ -81,7 +83,7 @@ pub struct FileStatisticsCacheEntry {
8183
///
8284
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
8385
pub trait ListFilesCache:
84-
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
86+
CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
8587
{
8688
/// Returns the cache's memory limit in bytes.
8789
fn cache_limit(&self) -> usize;
@@ -96,7 +98,9 @@ pub trait ListFilesCache:
9698
fn update_cache_ttl(&self, ttl: Option<Duration>);
9799

98100
/// Retrieves the information about the entries currently cached.
99-
fn list_entries(&self) -> HashMap<Path, ListFilesEntry>;
101+
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
102+
103+
fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
100104
}
101105

102106
/// Generic file-embedded metadata used with [`FileMetadataCache`].

0 commit comments

Comments
 (0)