diff --git a/Cargo.lock b/Cargo.lock index 895b3059f50c1..08658d1b1883f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1870,6 +1870,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", + "percent-encoding", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 64673c025d299..a2361f872013c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,6 +179,7 @@ parquet = { version = "58.1.0", default-features = false, features = [ ] } pbjson = { version = "0.9.0" } pbjson-types = "0.9" +percent-encoding = "2.3" # Should match arrow-flight's version of prost. prost = "0.14.1" rand = "0.9" diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index be1374b371485..a32045c7586ca 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -46,6 +46,7 @@ futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } +percent-encoding = { workspace = true } [dev-dependencies] datafusion-datasource-parquet = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 9c30028ddd547..860b61e6486ed 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -17,6 +17,7 @@ //! Helper functions for the table implementation +use std::borrow::Cow; use std::mem; use std::sync::Arc; @@ -42,6 +43,7 @@ use datafusion_expr::{Expr, Volatility}; use datafusion_physical_expr::create_physical_expr; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use percent_encoding::percent_decode_str; /// Check whether the given expression can be resolved using only the columns `col_names`. /// This means that if this function returns true: @@ -353,7 +355,7 @@ fn try_into_partitioned_file( .flatten() .zip(partition_cols) .map(|(parsed, (_, datatype))| { - ScalarValue::try_from_string(parsed.to_string(), datatype) + ScalarValue::try_from_string(parsed.into_owned(), datatype) }) .collect::>>()?; @@ -418,12 +420,15 @@ pub async fn pruned_partition_list<'a>( } /// Extract the partition values for the given `file_path` (in the given `table_path`) -/// associated to the partitions defined by `table_partition_cols` +/// associated to the partitions defined by `table_partition_cols`. +/// +/// Partition values are URL-decoded, since object stores like S3 encode special +/// characters (e.g., `/` becomes `%2F`) in path segments. pub fn parse_partitions_for_path<'a, I>( table_path: &ListingTableUrl, file_path: &'a Path, table_partition_cols: I, -) -> Option> +) -> Option>> where I: IntoIterator, { @@ -432,7 +437,12 @@ where let mut part_values = vec![]; for (part, expected_partition) in subpath.zip(table_partition_cols) { match part.split_once('=') { - Some((name, val)) if name == expected_partition => part_values.push(val), + Some((name, val)) if name == expected_partition => { + let decoded = percent_decode_str(val) + .decode_utf8() + .unwrap_or(Cow::Borrowed(val)); + part_values.push(decoded); + } _ => { debug!( "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'", @@ -508,7 +518,7 @@ mod tests { #[test] fn test_parse_partitions_for_path() { assert_eq!( - Some(vec![]), + Some(vec![] as Vec>), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/file.csv"), @@ -532,15 +542,25 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1"]), + Some(vec![Cow::Borrowed("v1")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/file.csv"), vec!["mypartition"] ) ); + // URL-encoded partition values should be decoded + // Use Path::parse to avoid double-encoding (Path::from encodes % as %25) + assert_eq!( + Some(vec![Cow::::Owned("v/1".to_string())]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/mypartition=v%2F1/file.csv").unwrap(), + vec!["mypartition"] + ) + ); assert_eq!( - Some(vec!["v1"]), + Some(vec![Cow::Borrowed("v1")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(), &Path::from("bucket/mytable/mypartition=v1/file.csv"), @@ -557,7 +577,7 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1", "v2"]), + Some(vec![Cow::Borrowed("v1"), Cow::Borrowed("v2")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), @@ -565,13 +585,57 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1"]), + Some(vec![Cow::Borrowed("v1")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), vec!["mypartition"] ) ); + assert_eq!( + Some(vec![Cow::::Owned("John Doe".to_string())]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/name=John%20Doe/file.csv").unwrap(), + vec!["name"] + ) + ); + assert_eq!( + Some(vec![ + Cow::::Owned("a/b".to_string()), + Cow::::Owned("c d".to_string()), + ]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/p1=a%2Fb/p2=c%20d/file.csv").unwrap(), + vec!["p1", "p2"] + ) + ); + assert_eq!( + Some(vec![Cow::::Owned("Müller".to_string())]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/name=M%C3%BCller/file.csv").unwrap(), + vec!["name"] + ) + ); + assert_eq!( + Some(vec![Cow::Borrowed("invalid%XX")]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/p1=invalid%XX/file.csv").unwrap(), + vec!["p1"] + ) + ); + // Invalid UTF-8 after percent-decoding falls back to raw value + assert_eq!( + Some(vec![Cow::Borrowed("%FF")]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/p1=%FF/file.csv").unwrap(), + vec!["p1"] + ) + ); } #[test]