Skip to content

Commit b4dbb6a

Browse files
authored
Skip files outside partition structure in hive-partitioned listing tables (#51)
1 parent 16e0a5c commit b4dbb6a

3 files changed

Lines changed: 136 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ log = { workspace = true }
4848
object_store = { workspace = true }
4949

5050
[dev-dependencies]
51+
chrono = { workspace = true }
5152
datafusion-datasource-parquet = { workspace = true }
5253

5354
# Note: add additional linter rules in lib.rs.

datafusion/catalog-listing/src/helpers.rs

Lines changed: 134 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -339,17 +339,25 @@ fn filter_partitions(
339339
Ok(None)
340340
}
341341

342+
/// Returns `Ok(None)` when the file is not inside a valid partition path
343+
/// (e.g. a stale file in the table root directory). Such files are skipped
344+
/// because hive-style partition values are never null and there is no valid
345+
/// value to assign for non-partitioned files.
342346
fn try_into_partitioned_file(
343347
object_meta: ObjectMeta,
344348
partition_cols: &[(String, DataType)],
345349
table_path: &ListingTableUrl,
346-
) -> Result<PartitionedFile> {
350+
) -> Result<Option<PartitionedFile>> {
347351
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
348352
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
349353

354+
let Some(parsed) = parsed else {
355+
// parse_partitions_for_path already logs a debug message
356+
return Ok(None);
357+
};
358+
350359
let partition_values = parsed
351360
.into_iter()
352-
.flatten()
353361
.zip(partition_cols)
354362
.map(|(parsed, (_, datatype))| {
355363
ScalarValue::try_from_string(parsed.to_string(), datatype)
@@ -359,7 +367,7 @@ fn try_into_partitioned_file(
359367
let mut pf: PartitionedFile = object_meta.into();
360368
pf.partition_values = partition_values;
361369

362-
Ok(pf)
370+
Ok(Some(pf))
363371
}
364372

365373
/// Discover the partitions on the given path and prune out files
@@ -404,13 +412,15 @@ pub async fn pruned_partition_list<'a>(
404412
)?;
405413

406414
Ok(objects
407-
.map_ok(|object_meta| {
408-
try_into_partitioned_file(object_meta, partition_cols, table_path)
415+
.try_filter_map(|object_meta| {
416+
futures::future::ready(try_into_partitioned_file(
417+
object_meta,
418+
partition_cols,
419+
table_path,
420+
))
409421
})
410422
.try_filter_map(move |pf| {
411-
futures::future::ready(
412-
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
413-
)
423+
futures::future::ready(filter_partitions(pf, filters, &df_schema))
414424
})
415425
.boxed())
416426
}
@@ -573,6 +583,122 @@ mod tests {
573583
);
574584
}
575585

586+
#[test]
587+
fn test_try_into_partitioned_file_valid_partition() {
588+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
589+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
590+
let meta = ObjectMeta {
591+
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
592+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
593+
size: 100,
594+
e_tag: None,
595+
version: None,
596+
};
597+
598+
let result =
599+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
600+
assert!(result.is_some());
601+
let pf = result.unwrap();
602+
assert_eq!(pf.partition_values.len(), 1);
603+
assert_eq!(
604+
pf.partition_values[0],
605+
ScalarValue::Utf8(Some("2024-01".to_string()))
606+
);
607+
}
608+
609+
#[test]
610+
fn test_try_into_partitioned_file_root_file_skipped() {
611+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
612+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
613+
let meta = ObjectMeta {
614+
location: Path::from("bucket/mytable/data.parquet"),
615+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
616+
size: 100,
617+
e_tag: None,
618+
version: None,
619+
};
620+
621+
let result =
622+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
623+
assert!(
624+
result.is_none(),
625+
"Files outside partition structure should be skipped"
626+
);
627+
}
628+
629+
#[test]
630+
fn test_try_into_partitioned_file_wrong_partition_name() {
631+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
632+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
633+
let meta = ObjectMeta {
634+
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
635+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
636+
size: 100,
637+
e_tag: None,
638+
version: None,
639+
};
640+
641+
let result =
642+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
643+
assert!(
644+
result.is_none(),
645+
"Files with wrong partition column name should be skipped"
646+
);
647+
}
648+
649+
#[test]
650+
fn test_try_into_partitioned_file_multiple_partitions() {
651+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
652+
let partition_cols = vec![
653+
("year".to_string(), DataType::Utf8),
654+
("month".to_string(), DataType::Utf8),
655+
];
656+
let meta = ObjectMeta {
657+
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
658+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
659+
size: 100,
660+
e_tag: None,
661+
version: None,
662+
};
663+
664+
let result =
665+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
666+
assert!(result.is_some());
667+
let pf = result.unwrap();
668+
assert_eq!(pf.partition_values.len(), 2);
669+
assert_eq!(
670+
pf.partition_values[0],
671+
ScalarValue::Utf8(Some("2024".to_string()))
672+
);
673+
assert_eq!(
674+
pf.partition_values[1],
675+
ScalarValue::Utf8(Some("01".to_string()))
676+
);
677+
}
678+
679+
#[test]
680+
fn test_try_into_partitioned_file_partial_partition_skipped() {
681+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
682+
let partition_cols = vec![
683+
("year".to_string(), DataType::Utf8),
684+
("month".to_string(), DataType::Utf8),
685+
];
686+
let meta = ObjectMeta {
687+
location: Path::from("bucket/mytable/year=2024/data.parquet"),
688+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
689+
size: 100,
690+
e_tag: None,
691+
version: None,
692+
};
693+
694+
let result =
695+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
696+
assert!(
697+
result.is_none(),
698+
"Files with incomplete partition structure should be skipped"
699+
);
700+
}
701+
576702
#[test]
577703
fn test_expr_applicable_for_cols() {
578704
assert!(expr_applicable_for_cols(

0 commit comments

Comments
 (0)