Skip to content

Commit 129757d

Browse files
alambnathanb9
andauthored
[branch-53] Reattach parquet metadata cache after deserializing in datafusion-proto (#20574) (#20891)
- Part of #19692 - Closes #20575 on branch-53 This PR: - Backports #20574 from @nathanb9 to the branch-53 line Co-authored-by: nathan <56370526+nathanb9@users.noreply.github.com>
1 parent 604bc33 commit 129757d

2 files changed

Lines changed: 73 additions & 5 deletions

File tree

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@ use datafusion_datasource_csv::source::CsvSource;
4242
use datafusion_datasource_json::file_format::JsonSink;
4343
use datafusion_datasource_json::source::JsonSource;
4444
#[cfg(feature = "parquet")]
45+
use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
46+
#[cfg(feature = "parquet")]
4547
use datafusion_datasource_parquet::file_format::ParquetSink;
4648
#[cfg(feature = "parquet")]
4749
use datafusion_datasource_parquet::source::ParquetSource;
50+
#[cfg(feature = "parquet")]
51+
use datafusion_execution::object_store::ObjectStoreUrl;
4852
use datafusion_execution::{FunctionRegistry, TaskContext};
4953
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
5054
use datafusion_functions_table::generate_series::{
@@ -848,9 +852,19 @@ impl protobuf::PhysicalPlanNode {
848852

849853
// Parse table schema with partition columns
850854
let table_schema = parse_table_schema_from_proto(base_conf)?;
851-
852-
let mut source =
853-
ParquetSource::new(table_schema).with_table_parquet_options(options);
855+
let object_store_url = match base_conf.object_store_url.is_empty() {
856+
false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
857+
true => ObjectStoreUrl::local_filesystem(),
858+
};
859+
let store = ctx.runtime_env().object_store(object_store_url)?;
860+
let metadata_cache =
861+
ctx.runtime_env().cache_manager.get_file_metadata_cache();
862+
let reader_factory =
863+
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
864+
865+
let mut source = ParquetSource::new(table_schema)
866+
.with_parquet_file_reader_factory(reader_factory)
867+
.with_table_parquet_options(options);
854868

855869
if let Some(predicate) = predicate {
856870
source = source.with_predicate(predicate);

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ use datafusion::datasource::listing::{
3636
};
3737
use datafusion::datasource::object_store::ObjectStoreUrl;
3838
use datafusion::datasource::physical_plan::{
39-
ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder, FileSinkConfig,
40-
ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
39+
ArrowSource, FileGroup, FileOutputMode, FileScanConfig, FileScanConfigBuilder,
40+
FileSinkConfig, ParquetSource, wrap_partition_type_in_dict,
41+
wrap_partition_value_in_dict,
4142
};
4243
use datafusion::datasource::sink::DataSinkExec;
4344
use datafusion::datasource::source::DataSourceExec;
@@ -942,6 +943,59 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
942943
roundtrip_test(DataSourceExec::from_data_source(scan_config))
943944
}
944945

946+
#[test]
947+
fn roundtrip_parquet_exec_attaches_cached_reader_factory_after_roundtrip() -> Result<()> {
948+
let file_schema =
949+
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
950+
let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema)));
951+
let scan_config =
952+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
953+
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
954+
"/path/to/file.parquet".to_string(),
955+
1024,
956+
)])])
957+
.with_statistics(Statistics {
958+
num_rows: Precision::Inexact(100),
959+
total_byte_size: Precision::Inexact(1024),
960+
column_statistics: Statistics::unknown_column(&file_schema),
961+
})
962+
.build();
963+
let exec_plan = DataSourceExec::from_data_source(scan_config);
964+
965+
let ctx = SessionContext::new();
966+
let codec = DefaultPhysicalExtensionCodec {};
967+
let proto_converter = DefaultPhysicalProtoConverter {};
968+
let roundtripped =
969+
roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?;
970+
971+
let data_source = roundtripped
972+
.as_any()
973+
.downcast_ref::<DataSourceExec>()
974+
.ok_or_else(|| {
975+
internal_datafusion_err!("Expected DataSourceExec after roundtrip")
976+
})?;
977+
let file_scan = data_source
978+
.data_source()
979+
.as_any()
980+
.downcast_ref::<FileScanConfig>()
981+
.ok_or_else(|| {
982+
internal_datafusion_err!("Expected FileScanConfig after roundtrip")
983+
})?;
984+
let parquet_source = file_scan
985+
.file_source()
986+
.as_any()
987+
.downcast_ref::<ParquetSource>()
988+
.ok_or_else(|| {
989+
internal_datafusion_err!("Expected ParquetSource after roundtrip")
990+
})?;
991+
992+
assert!(
993+
parquet_source.parquet_file_reader_factory().is_some(),
994+
"Parquet reader factory should be attached after decoding from protobuf"
995+
);
996+
Ok(())
997+
}
998+
945999
#[test]
9461000
fn roundtrip_arrow_scan() -> Result<()> {
9471001
let file_schema =

0 commit comments

Comments
 (0)