Skip to content

Commit 1e4bd75

Browse files
Feat: DefaultListFilesCache prefix-aware for partition pruning optimization (#19298)
## Which issue does this PR close? Closes #19273 ## Rationale for this change ```DefaultListFilesCache``` currently uses the exact listing path as the cache key. When partition pruning narrows queries to specific partition prefixes (e.g., s3://bucket/table/year=2024/), the cache lookup fails even if a full table listing (s3://bucket/table/) was previously cached. This leads to redundant object store calls and duplicate cache entries for different partition filters on the same table. ## What changes are included in this PR? - Updated ```ListFilesCache``` trait to use ```Extra = Option<Path>``` (partition prefix) instead of ObjectMeta - Added ```get_with_prefix```(table_base, prefix, now) method to ```DefaultListFilesCache``` that: - Uses the table base path as a stable cache key - Optionally filters cached results by a partition prefix - Handles TTL expiration checks - Updated ```list_with_cache``` in ```url.rs``` to: - Always use the table base path as the cache key - Compute the relative prefix between the listing URL and table base - Always cache full table listings to ensure complete data is available for subsequent partition queries - Added ```compute_relative_prefix``` helper function ## Are these changes tested? Yes. Six dedicated unit tests validate prefix-aware cache behavior: test_prefix_aware_cache_hit - filters cached results by prefix test_prefix_aware_cache_no_filter_returns_all - returns all files when no prefix specified test_prefix_aware_cache_miss_no_entry - handles cache misses correctly test_prefix_aware_cache_no_matching_files - returns empty when no files match prefix test_prefix_aware_nested_partitions - handles nested partition paths (e.g., year=2024/month=01/) test_prefix_aware_different_tables - ensures different tables have isolated cache entries ## Are there any user-facing changes? No direct API changes. Users will see improved cache efficiency when querying partitioned tables - partition-pruned queries can now be served from cached full-table listings, reducing object store calls.
1 parent 3879b24 commit 1e4bd75

3 files changed

Lines changed: 548 additions & 33 deletions

File tree

datafusion/datasource/src/url.rs

Lines changed: 255 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -245,25 +245,26 @@ impl ListingTableUrl {
245245
let exec_options = &ctx.config_options().execution;
246246
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
247247

248-
let prefix = if let Some(prefix) = prefix {
249-
let mut p = self.prefix.parts().collect::<Vec<_>>();
250-
p.extend(prefix.parts());
251-
Path::from_iter(p.into_iter())
248+
// Build full_prefix for non-cached path and head() calls
249+
let full_prefix = if let Some(ref p) = prefix {
250+
let mut parts = self.prefix.parts().collect::<Vec<_>>();
251+
parts.extend(p.parts());
252+
Path::from_iter(parts.into_iter())
252253
} else {
253254
self.prefix.clone()
254255
};
255256

256257
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
257-
list_with_cache(ctx, store, &prefix).await?
258+
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
258259
} else {
259-
match store.head(&prefix).await {
260+
match store.head(&full_prefix).await {
260261
Ok(meta) => futures::stream::once(async { Ok(meta) })
261262
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
262263
.boxed(),
263264
// If the head command fails, it is likely that object doesn't exist.
264265
// Retry as though it were a prefix (aka a collection)
265266
Err(object_store::Error::NotFound { .. }) => {
266-
list_with_cache(ctx, store, &prefix).await?
267+
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
267268
}
268269
Err(e) => return Err(e.into()),
269270
}
@@ -324,27 +325,74 @@ impl ListingTableUrl {
324325
}
325326
}
326327

328+
/// Lists files with cache support, using prefix-aware lookups.
329+
///
330+
/// # Arguments
331+
/// * `ctx` - The session context
332+
/// * `store` - The object store to list from
333+
/// * `table_base_path` - The table's base path (the stable cache key)
334+
/// * `prefix` - Optional prefix relative to table base for filtering results
335+
///
336+
/// # Cache Behavior:
337+
/// The cache key is always `table_base_path`. When a prefix-filtered listing
338+
/// is requested via `prefix`, the cache:
339+
/// - Looks up `table_base_path` in the cache
340+
/// - Filters results to match `table_base_path/prefix`
341+
/// - Returns filtered results without a storage call
342+
///
343+
/// On cache miss, the full table is always listed and cached, ensuring
344+
/// subsequent prefix queries can be served from cache.
327345
async fn list_with_cache<'b>(
328346
ctx: &'b dyn Session,
329347
store: &'b dyn ObjectStore,
330-
prefix: &Path,
348+
table_base_path: &Path,
349+
prefix: Option<&Path>,
331350
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
351+
// Build the full listing path (table_base + prefix)
352+
let full_prefix = match prefix {
353+
Some(p) => {
354+
let mut parts: Vec<_> = table_base_path.parts().collect();
355+
parts.extend(p.parts());
356+
Path::from_iter(parts)
357+
}
358+
None => table_base_path.clone(),
359+
};
360+
332361
match ctx.runtime_env().cache_manager.get_list_files_cache() {
333362
None => Ok(store
334-
.list(Some(prefix))
363+
.list(Some(&full_prefix))
335364
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
336365
.boxed()),
337366
Some(cache) => {
338-
let vec = if let Some(res) = cache.get(prefix) {
339-
debug!("Hit list all files cache");
367+
// Convert prefix to Option<Path> for cache lookup
368+
let prefix_filter = prefix.cloned();
369+
370+
// Try cache lookup with optional prefix filter
371+
let vec = if let Some(res) =
372+
cache.get_with_extra(table_base_path, &prefix_filter)
373+
{
374+
debug!("Hit list files cache");
340375
res.as_ref().clone()
341376
} else {
377+
// Cache miss - always list and cache the full table
378+
// This ensures we have complete data for future prefix queries
342379
let vec = store
343-
.list(Some(prefix))
380+
.list(Some(table_base_path))
344381
.try_collect::<Vec<ObjectMeta>>()
345382
.await?;
346-
cache.put(prefix, Arc::new(vec.clone()));
347-
vec
383+
cache.put(table_base_path, Arc::new(vec.clone()));
384+
385+
// If a prefix filter was requested, apply it to the results
386+
if prefix.is_some() {
387+
let full_prefix_str = full_prefix.as_ref();
388+
vec.into_iter()
389+
.filter(|meta| {
390+
meta.location.as_ref().starts_with(full_prefix_str)
391+
})
392+
.collect()
393+
} else {
394+
vec
395+
}
348396
};
349397
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
350398
}
@@ -754,6 +802,191 @@ mod tests {
754802
Ok(())
755803
}
756804

805+
/// Tests that the cached code path produces identical results to the non-cached path.
806+
///
807+
/// This is critical: the cache is a transparent optimization, so both paths
808+
/// MUST return the same files. Note: order is not guaranteed by ObjectStore::list,
809+
/// so we sort results before comparison.
810+
#[tokio::test]
811+
async fn test_cache_path_equivalence() -> Result<()> {
812+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
813+
814+
let store = MockObjectStore {
815+
in_mem: object_store::memory::InMemory::new(),
816+
forbidden_paths: vec![],
817+
};
818+
819+
// Create test files with partition-style paths
820+
create_file(&store, "/table/year=2023/data1.parquet").await;
821+
create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
822+
create_file(&store, "/table/year=2024/data3.parquet").await;
823+
create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
824+
create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
825+
826+
// Session WITHOUT cache
827+
let session_no_cache = MockSession::new();
828+
829+
// Session WITH cache - use RuntimeEnvBuilder with cache limit (no TTL needed for this test)
830+
let runtime_with_cache = RuntimeEnvBuilder::new()
831+
.with_object_list_cache_limit(1024 * 1024) // 1MB limit
832+
.build_arc()?;
833+
let session_with_cache = MockSession::with_runtime_env(runtime_with_cache);
834+
835+
// Test cases: (url, prefix, description)
836+
let test_cases = vec![
837+
("/table/", None, "full table listing"),
838+
(
839+
"/table/",
840+
Some(Path::from("year=2023")),
841+
"single partition filter",
842+
),
843+
(
844+
"/table/",
845+
Some(Path::from("year=2024")),
846+
"different partition filter",
847+
),
848+
(
849+
"/table/",
850+
Some(Path::from("year=2024/month=06")),
851+
"nested partition filter",
852+
),
853+
(
854+
"/table/",
855+
Some(Path::from("year=2025")),
856+
"non-existent partition",
857+
),
858+
];
859+
860+
for (url_str, prefix, description) in test_cases {
861+
let url = ListingTableUrl::parse(url_str)?;
862+
863+
// Get results WITHOUT cache (sorted for comparison)
864+
let mut results_no_cache: Vec<String> = url
865+
.list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
866+
.await?
867+
.try_collect::<Vec<_>>()
868+
.await?
869+
.into_iter()
870+
.map(|m| m.location.to_string())
871+
.collect();
872+
results_no_cache.sort();
873+
874+
// Get results WITH cache (first call - cache miss, sorted for comparison)
875+
let mut results_with_cache_miss: Vec<String> = url
876+
.list_prefixed_files(
877+
&session_with_cache,
878+
&store,
879+
prefix.clone(),
880+
"parquet",
881+
)
882+
.await?
883+
.try_collect::<Vec<_>>()
884+
.await?
885+
.into_iter()
886+
.map(|m| m.location.to_string())
887+
.collect();
888+
results_with_cache_miss.sort();
889+
890+
// Get results WITH cache (second call - cache hit, sorted for comparison)
891+
let mut results_with_cache_hit: Vec<String> = url
892+
.list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
893+
.await?
894+
.try_collect::<Vec<_>>()
895+
.await?
896+
.into_iter()
897+
.map(|m| m.location.to_string())
898+
.collect();
899+
results_with_cache_hit.sort();
900+
901+
// All three should contain the same files
902+
assert_eq!(
903+
results_no_cache, results_with_cache_miss,
904+
"Cache miss path should match non-cached path for: {description}"
905+
);
906+
assert_eq!(
907+
results_no_cache, results_with_cache_hit,
908+
"Cache hit path should match non-cached path for: {description}"
909+
);
910+
}
911+
912+
Ok(())
913+
}
914+
915+
/// Tests that prefix queries can be served from a cached full-table listing
916+
#[tokio::test]
917+
async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
918+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
919+
920+
let store = MockObjectStore {
921+
in_mem: object_store::memory::InMemory::new(),
922+
forbidden_paths: vec![],
923+
};
924+
925+
// Create test files
926+
create_file(&store, "/sales/region=US/q1.parquet").await;
927+
create_file(&store, "/sales/region=US/q2.parquet").await;
928+
create_file(&store, "/sales/region=EU/q1.parquet").await;
929+
930+
// Create session with cache (no TTL needed for this test)
931+
let runtime = RuntimeEnvBuilder::new()
932+
.with_object_list_cache_limit(1024 * 1024) // 1MB limit
933+
.build_arc()?;
934+
let session = MockSession::with_runtime_env(runtime);
935+
936+
let url = ListingTableUrl::parse("/sales/")?;
937+
938+
// First: query full table (populates cache)
939+
let full_results: Vec<String> = url
940+
.list_prefixed_files(&session, &store, None, "parquet")
941+
.await?
942+
.try_collect::<Vec<_>>()
943+
.await?
944+
.into_iter()
945+
.map(|m| m.location.to_string())
946+
.collect();
947+
assert_eq!(full_results.len(), 3);
948+
949+
// Second: query with prefix (should be served from cache)
950+
let mut us_results: Vec<String> = url
951+
.list_prefixed_files(
952+
&session,
953+
&store,
954+
Some(Path::from("region=US")),
955+
"parquet",
956+
)
957+
.await?
958+
.try_collect::<Vec<_>>()
959+
.await?
960+
.into_iter()
961+
.map(|m| m.location.to_string())
962+
.collect();
963+
us_results.sort();
964+
965+
assert_eq!(
966+
us_results,
967+
vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
968+
);
969+
970+
// Third: different prefix (also from cache)
971+
let eu_results: Vec<String> = url
972+
.list_prefixed_files(
973+
&session,
974+
&store,
975+
Some(Path::from("region=EU")),
976+
"parquet",
977+
)
978+
.await?
979+
.try_collect::<Vec<_>>()
980+
.await?
981+
.into_iter()
982+
.map(|m| m.location.to_string())
983+
.collect();
984+
985+
assert_eq!(eu_results, vec!["sales/region=EU/q1.parquet"]);
986+
987+
Ok(())
988+
}
989+
757990
/// Creates a file with "hello world" content at the specified path
758991
async fn create_file(object_store: &dyn ObjectStore, path: &str) {
759992
object_store
@@ -906,6 +1139,14 @@ mod tests {
9061139
runtime_env: Arc::new(RuntimeEnv::default()),
9071140
}
9081141
}
1142+
1143+
/// Create a MockSession with a custom RuntimeEnv (for cache testing)
1144+
fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
1145+
Self {
1146+
config: SessionConfig::new(),
1147+
runtime_env,
1148+
}
1149+
}
9091150
}
9101151

9111152
#[async_trait::async_trait]

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,21 @@ pub struct FileStatisticsCacheEntry {
6666
/// command on the local filesystem. This operation can be expensive,
6767
/// especially when done over remote object stores.
6868
///
69+
/// The cache key is always the table's base path, ensuring a stable cache key.
70+
/// The `Extra` type is `Option<Path>`, representing an optional prefix filter
71+
/// (relative to the table base path) for partition-aware lookups.
72+
///
73+
/// When `get_with_extra(key, Some(prefix))` is called:
74+
/// - The cache entry for `key` (table base path) is fetched
75+
/// - Results are filtered to only include files matching `key/prefix`
76+
/// - Filtered results are returned without making a storage call
77+
///
78+
/// This enables efficient partition pruning: a single cached listing of the
79+
/// full table can serve queries for any partition subset.
80+
///
6981
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
7082
pub trait ListFilesCache:
71-
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
83+
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
7284
{
7385
/// Returns the cache's memory limit in bytes.
7486
fn cache_limit(&self) -> usize;

0 commit comments

Comments
 (0)