Skip to content

Commit 1f0232c

Browse files
nathanb9alamb
andauthored
Reattach parquet metadata cache after deserializing in datafusion-proto (#20574)
- Addressing: #20575 --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 27c9cda commit 1f0232c

2 files changed

Lines changed: 73 additions & 5 deletions

File tree

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@ use datafusion_datasource_csv::source::CsvSource;
4242
use datafusion_datasource_json::file_format::JsonSink;
4343
use datafusion_datasource_json::source::JsonSource;
4444
#[cfg(feature = "parquet")]
45+
use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
46+
#[cfg(feature = "parquet")]
4547
use datafusion_datasource_parquet::file_format::ParquetSink;
4648
#[cfg(feature = "parquet")]
4749
use datafusion_datasource_parquet::source::ParquetSource;
50+
#[cfg(feature = "parquet")]
51+
use datafusion_execution::object_store::ObjectStoreUrl;
4852
use datafusion_execution::{FunctionRegistry, TaskContext};
4953
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
5054
use datafusion_functions_table::generate_series::{
@@ -847,9 +851,19 @@ impl protobuf::PhysicalPlanNode {
847851

848852
// Parse table schema with partition columns
849853
let table_schema = parse_table_schema_from_proto(base_conf)?;
850-
851-
let mut source =
852-
ParquetSource::new(table_schema).with_table_parquet_options(options);
854+
let object_store_url = match base_conf.object_store_url.is_empty() {
855+
false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
856+
true => ObjectStoreUrl::local_filesystem(),
857+
};
858+
let store = ctx.runtime_env().object_store(object_store_url)?;
859+
let metadata_cache =
860+
ctx.runtime_env().cache_manager.get_file_metadata_cache();
861+
let reader_factory =
862+
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
863+
864+
let mut source = ParquetSource::new(table_schema)
865+
.with_parquet_file_reader_factory(reader_factory)
866+
.with_table_parquet_options(options);
853867

854868
if let Some(predicate) = predicate {
855869
source = source.with_predicate(predicate);

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ use datafusion::datasource::listing::{
3636
};
3737
use datafusion::datasource::object_store::ObjectStoreUrl;
3838
use datafusion::datasource::physical_plan::{
39-
ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder, FileSinkConfig,
40-
ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
39+
ArrowSource, FileGroup, FileOutputMode, FileScanConfig, FileScanConfigBuilder,
40+
FileSinkConfig, ParquetSource, wrap_partition_type_in_dict,
41+
wrap_partition_value_in_dict,
4142
};
4243
use datafusion::datasource::sink::DataSinkExec;
4344
use datafusion::datasource::source::DataSourceExec;
@@ -929,6 +930,59 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
929930
roundtrip_test(DataSourceExec::from_data_source(scan_config))
930931
}
931932

933+
#[test]
934+
fn roundtrip_parquet_exec_attaches_cached_reader_factory_after_roundtrip() -> Result<()> {
935+
let file_schema =
936+
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
937+
let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema)));
938+
let scan_config =
939+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
940+
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
941+
"/path/to/file.parquet".to_string(),
942+
1024,
943+
)])])
944+
.with_statistics(Statistics {
945+
num_rows: Precision::Inexact(100),
946+
total_byte_size: Precision::Inexact(1024),
947+
column_statistics: Statistics::unknown_column(&file_schema),
948+
})
949+
.build();
950+
let exec_plan = DataSourceExec::from_data_source(scan_config);
951+
952+
let ctx = SessionContext::new();
953+
let codec = DefaultPhysicalExtensionCodec {};
954+
let proto_converter = DefaultPhysicalProtoConverter {};
955+
let roundtripped =
956+
roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?;
957+
958+
let data_source = roundtripped
959+
.as_any()
960+
.downcast_ref::<DataSourceExec>()
961+
.ok_or_else(|| {
962+
internal_datafusion_err!("Expected DataSourceExec after roundtrip")
963+
})?;
964+
let file_scan = data_source
965+
.data_source()
966+
.as_any()
967+
.downcast_ref::<FileScanConfig>()
968+
.ok_or_else(|| {
969+
internal_datafusion_err!("Expected FileScanConfig after roundtrip")
970+
})?;
971+
let parquet_source = file_scan
972+
.file_source()
973+
.as_any()
974+
.downcast_ref::<ParquetSource>()
975+
.ok_or_else(|| {
976+
internal_datafusion_err!("Expected ParquetSource after roundtrip")
977+
})?;
978+
979+
assert!(
980+
parquet_source.parquet_file_reader_factory().is_some(),
981+
"Parquet reader factory should be attached after decoding from protobuf"
982+
);
983+
Ok(())
984+
}
985+
932986
#[test]
933987
fn roundtrip_arrow_scan() -> Result<()> {
934988
let file_schema =

0 commit comments

Comments
 (0)