Skip to content

Commit 5d508d3

Browse files
authored
Skip files outside partition structure in hive-partitioned listing tables (#21756)
## Which issue does this PR close? Closes #21755 ## Rationale When a hive-partitioned listing table has files in the root directory (not inside any `partition_col=value/` path), queries that reference partition columns crash with `Unable to get field named "partition_col"`. This is a common scenario when a table transitions from non-partitioned to hive-partitioned storage — the original root file may still exist alongside the new partition directories. ## What changes are included in this PR? `try_into_partitioned_file` now returns `Ok(None)` for files that don't match the partition structure (where `parse_partitions_for_path` returns `None`). The caller skips them via `try_filter_map`. Previously, `None` from `parse_partitions_for_path` was converted to empty `partition_values` via `.into_iter().flatten()`, causing downstream errors. ## Are these changes tested? Yes, 5 unit tests added: - `test_try_into_partitioned_file_valid_partition` — normal case - `test_try_into_partitioned_file_root_file_skipped` — root file skipped - `test_try_into_partitioned_file_wrong_partition_name` — wrong partition col name - `test_try_into_partitioned_file_multiple_partitions` — multi-level partitions - `test_try_into_partitioned_file_partial_partition_skipped` — incomplete partition path ## Are there any user-facing changes? Files outside the hive partition structure are now silently skipped instead of causing query failures. This may change `COUNT(*)` results if such files previously contributed rows (with empty partition values).
1 parent e7f7fa9 commit 5d508d3

4 files changed

Lines changed: 211 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ log = { workspace = true }
4848
object_store = { workspace = true }
4949

5050
[dev-dependencies]
51+
chrono = { workspace = true }
5152
datafusion-datasource-parquet = { workspace = true }
5253

5354
# Note: add additional linter rules in lib.rs.

datafusion/catalog-listing/src/helpers.rs

Lines changed: 142 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,17 +340,25 @@ fn filter_partitions(
340340
Ok(None)
341341
}
342342

343+
/// Returns `Ok(None)` when the file is not inside a valid partition path
344+
/// (e.g. a stale file in the table root directory). Such files are skipped
345+
/// because hive-style partition values are never null and there is no valid
346+
/// value to assign for non-partitioned files.
343347
fn try_into_partitioned_file(
344348
object_meta: ObjectMeta,
345349
partition_cols: &[(String, DataType)],
346350
table_path: &ListingTableUrl,
347-
) -> Result<PartitionedFile> {
351+
) -> Result<Option<PartitionedFile>> {
348352
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
349353
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
350354

355+
let Some(parsed) = parsed else {
356+
// parse_partitions_for_path already logs a debug message
357+
return Ok(None);
358+
};
359+
351360
let partition_values = parsed
352361
.into_iter()
353-
.flatten()
354362
.zip(partition_cols)
355363
.map(|(parsed, (_, datatype))| {
356364
ScalarValue::try_from_string(parsed.to_string(), datatype)
@@ -360,7 +368,7 @@ fn try_into_partitioned_file(
360368
let mut pf: PartitionedFile = object_meta.into();
361369
pf.partition_values = partition_values;
362370

363-
Ok(pf)
371+
Ok(Some(pf))
364372
}
365373

366374
/// Discover the partitions on the given path and prune out files
@@ -405,13 +413,15 @@ pub async fn pruned_partition_list<'a>(
405413
)?;
406414

407415
Ok(objects
408-
.map_ok(|object_meta| {
409-
try_into_partitioned_file(object_meta, partition_cols, table_path)
416+
.try_filter_map(|object_meta| {
417+
futures::future::ready(try_into_partitioned_file(
418+
object_meta,
419+
partition_cols,
420+
table_path,
421+
))
410422
})
411423
.try_filter_map(move |pf| {
412-
futures::future::ready(
413-
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
414-
)
424+
futures::future::ready(filter_partitions(pf, filters, &df_schema))
415425
})
416426
.boxed())
417427
}
@@ -574,6 +584,130 @@ mod tests {
574584
);
575585
}
576586

587+
#[test]
588+
fn test_try_into_partitioned_file_valid_partition() {
589+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
590+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
591+
let meta = ObjectMeta {
592+
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
593+
last_modified: chrono::Utc::now(),
594+
size: 100,
595+
e_tag: None,
596+
version: None,
597+
};
598+
599+
let result =
600+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
601+
assert!(result.is_some());
602+
let pf = result.unwrap();
603+
assert_eq!(pf.partition_values.len(), 1);
604+
assert_eq!(
605+
pf.partition_values[0],
606+
ScalarValue::Utf8(Some("2024-01".to_string()))
607+
);
608+
}
609+
610+
#[test]
611+
fn test_try_into_partitioned_file_root_file_skipped() {
612+
// File in root directory (not inside any partition path) should be
613+
// skipped — this is the case where a stale file exists from before
614+
// hive partitioning was added.
615+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
616+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
617+
let meta = ObjectMeta {
618+
location: Path::from("bucket/mytable/data.parquet"),
619+
last_modified: chrono::Utc::now(),
620+
size: 100,
621+
e_tag: None,
622+
version: None,
623+
};
624+
625+
let result =
626+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
627+
assert!(
628+
result.is_none(),
629+
"Files outside partition structure should be skipped"
630+
);
631+
}
632+
633+
#[test]
634+
fn test_try_into_partitioned_file_wrong_partition_name() {
635+
// File in a directory that doesn't match the expected partition column
636+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
637+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
638+
let meta = ObjectMeta {
639+
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
640+
last_modified: chrono::Utc::now(),
641+
size: 100,
642+
e_tag: None,
643+
version: None,
644+
};
645+
646+
let result =
647+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
648+
assert!(
649+
result.is_none(),
650+
"Files with wrong partition column name should be skipped"
651+
);
652+
}
653+
654+
#[test]
655+
fn test_try_into_partitioned_file_multiple_partitions() {
656+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
657+
let partition_cols = vec![
658+
("year".to_string(), DataType::Utf8),
659+
("month".to_string(), DataType::Utf8),
660+
];
661+
let meta = ObjectMeta {
662+
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
663+
last_modified: chrono::Utc::now(),
664+
size: 100,
665+
e_tag: None,
666+
version: None,
667+
};
668+
669+
let result =
670+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
671+
assert!(result.is_some());
672+
let pf = result.unwrap();
673+
assert_eq!(pf.partition_values.len(), 2);
674+
assert_eq!(
675+
pf.partition_values[0],
676+
ScalarValue::Utf8(Some("2024".to_string()))
677+
);
678+
assert_eq!(
679+
pf.partition_values[1],
680+
ScalarValue::Utf8(Some("01".to_string()))
681+
);
682+
}
683+
684+
#[test]
685+
fn test_try_into_partitioned_file_partial_partition_skipped() {
686+
// File has first partition but not second — should be skipped
687+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
688+
let partition_cols = vec![
689+
("year".to_string(), DataType::Utf8),
690+
("month".to_string(), DataType::Utf8),
691+
];
692+
let meta = ObjectMeta {
693+
location: Path::from("bucket/mytable/year=2024/data.parquet"),
694+
last_modified: chrono::Utc::now(),
695+
size: 100,
696+
e_tag: None,
697+
version: None,
698+
};
699+
700+
let result =
701+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
702+
// File has year=2024 but no month= directory — parse_partitions_for_path
703+
// returns None because the path component "data.parquet" doesn't match
704+
// the expected "month=..." pattern.
705+
assert!(
706+
result.is_none(),
707+
"Files with incomplete partition structure should be skipped"
708+
);
709+
}
710+
577711
#[test]
578712
fn test_expr_applicable_for_cols() {
579713
assert!(expr_applicable_for_cols(

datafusion/sqllogictest/test_files/listing_table_partitions.slt

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,70 @@ foo
7373

7474
statement count 0
7575
set datafusion.execution.listing_table_factory_infer_partitions = true;
76+
77+
# Test: files outside partition structure are skipped
78+
# This simulates a table that transitioned from non-partitioned to
79+
# hive-partitioned storage, leaving a stale file in the root directory.
80+
81+
# Create partitioned files first
82+
query I
83+
copy (values(1, 'alice'), (2, 'bob'))
84+
to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2024/data.parquet';
85+
----
86+
2
87+
88+
query I
89+
copy (values(3, 'charlie'))
90+
to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2025/data.parquet';
91+
----
92+
1
93+
94+
# Create the table before adding the stale root file, so partition
95+
# inference succeeds (it only runs at CREATE TABLE time).
96+
statement count 0
97+
create external table root_file_test
98+
stored as parquet
99+
location 'test_files/scratch/listing_table_partitions/root_file_skipped/';
100+
101+
# Now add a stale root-level file (outside any partition directory).
102+
# This simulates a file left over from before partitioning was added.
103+
query I
104+
copy (values(99, 'stale'))
105+
to 'test_files/scratch/listing_table_partitions/root_file_skipped/stale.parquet';
106+
----
107+
1
108+
109+
# The root file should be skipped — only partitioned files are included
110+
query IT
111+
select column1, column2 from root_file_test order by column1;
112+
----
113+
1 alice
114+
2 bob
115+
3 charlie
116+
117+
# Partition column should be accessible
118+
query ITT
119+
select column1, column2, year from root_file_test order by column1;
120+
----
121+
1 alice 2024
122+
2 bob 2024
123+
3 charlie 2025
124+
125+
# Partition filter should work
126+
query ITT
127+
select column1, column2, year from root_file_test where year = '2025';
128+
----
129+
3 charlie 2025
130+
131+
# COUNT should not include the root file's rows
132+
query I
133+
select count(*) from root_file_test;
134+
----
135+
3
136+
137+
# GROUP BY partition column should work
138+
query TI
139+
select year, count(*) from root_file_test group by year order by year;
140+
----
141+
2024 2
142+
2025 1

0 commit comments

Comments
 (0)