diff --git a/datafusion/core/tests/datasource/object_store_access.rs b/datafusion/core/tests/datasource/object_store_access.rs index 30654c687f8d2..626687a7aca66 100644 --- a/datafusion/core/tests/datasource/object_store_access.rs +++ b/datafusion/core/tests/datasource/object_store_access.rs @@ -353,7 +353,7 @@ async fn create_single_parquet_file_default() { RequestCountingObjectStore() Total Requests: 2 - GET (opts) path=parquet_table.parquet head=true - - GET (opts) path=parquet_table.parquet range=0-2994 + - GET (ranges) path=parquet_table.parquet ranges=0-2994 " ); } @@ -372,7 +372,7 @@ async fn create_single_parquet_file_prefetch() { RequestCountingObjectStore() Total Requests: 2 - GET (opts) path=parquet_table.parquet head=true - - GET (opts) path=parquet_table.parquet range=1994-2994 + - GET (ranges) path=parquet_table.parquet ranges=1994-2994 " ); } @@ -401,9 +401,9 @@ async fn create_single_parquet_file_too_small_prefetch() { RequestCountingObjectStore() Total Requests: 4 - GET (opts) path=parquet_table.parquet head=true - - GET (opts) path=parquet_table.parquet range=2494-2994 - - GET (opts) path=parquet_table.parquet range=2264-2986 - - GET (opts) path=parquet_table.parquet range=2124-2264 + - GET (ranges) path=parquet_table.parquet ranges=2494-2994 + - GET (ranges) path=parquet_table.parquet ranges=2264-2986 + - GET (ranges) path=parquet_table.parquet ranges=2124-2264 " ); } @@ -433,8 +433,8 @@ async fn create_single_parquet_file_small_prefetch() { RequestCountingObjectStore() Total Requests: 3 - GET (opts) path=parquet_table.parquet head=true - - GET (opts) path=parquet_table.parquet range=2254-2994 - - GET (opts) path=parquet_table.parquet range=2124-2264 + - GET (ranges) path=parquet_table.parquet ranges=2254-2994 + - GET (ranges) path=parquet_table.parquet ranges=2124-2264 " ); } @@ -457,7 +457,7 @@ async fn create_single_parquet_file_no_prefetch() { RequestCountingObjectStore() Total Requests: 2 - GET (opts) path=parquet_table.parquet head=true - - GET (opts) path=parquet_table.parquet range=0-2994 + - GET (ranges) path=parquet_table.parquet ranges=0-2994 " ); } diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 77e29cf35cd5b..86c12b11c0461 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -18,9 +18,7 @@ //! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics //! and schema information. -use crate::{ - ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution, -}; +use crate::{apply_file_schema_type_coercions, coerce_int96_to_resolution}; use arrow::array::{Array, ArrayRef, BooleanArray}; use arrow::compute::and; use arrow::compute::kernels::cmp::eq; @@ -41,10 +39,11 @@ use datafusion_physical_plan::Accumulator; use log::debug; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use parquet::DecodeResult; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::{parquet_column, parquet_to_arrow_schema}; use parquet::file::metadata::{ - PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, + PageIndexPolicy, ParquetMetaData, ParquetMetaDataPushDecoder, RowGroupMetaData, SortingColumn, }; use parquet::schema::types::SchemaDescriptor; @@ -119,25 +118,14 @@ impl<'a> DFParquetMetadata<'a> { /// Fetch parquet metadata from the remote object store pub async fn fetch_metadata(&self) -> Result> { - let Self { - store, - object_meta, - metadata_size_hint, - decryption_properties, - file_metadata_cache, - coerce_int96: _, - } = self; - - let fetch = ObjectStoreFetch::new(*store, object_meta); - // implementation to fetch parquet metadata let cache_metadata = - !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); + !cfg!(feature = "parquet_encryption") || self.decryption_properties.is_none(); if cache_metadata - && let Some(file_metadata_cache) = file_metadata_cache.as_ref() - && let Some(cached) = file_metadata_cache.get(&object_meta.location) - && cached.is_valid_for(object_meta) + && let Some(file_metadata_cache) = self.file_metadata_cache.as_ref() + && let Some(cached) = file_metadata_cache.get(&self.object_meta.location) + && cached.is_valid_for(self.object_meta) && let Some(cached_parquet) = cached .file_metadata .as_any() @@ -146,32 +134,69 @@ impl<'a> DFParquetMetadata<'a> { return Ok(Arc::clone(cached_parquet.parquet_metadata())); } - let mut reader = - ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint); + let file_size = self.object_meta.size; + let mut decoder = ParquetMetaDataPushDecoder::try_new(file_size) + .map_err(DataFusionError::from)?; #[cfg(feature = "parquet_encryption")] - if let Some(decryption_properties) = decryption_properties { - reader = reader - .with_decryption_properties(Some(Arc::clone(decryption_properties))); + if let Some(decryption_properties) = &self.decryption_properties { + decoder = decoder + .with_file_decryption_properties(Some(Arc::clone(decryption_properties))); } - if cache_metadata && file_metadata_cache.is_some() { + if cache_metadata && self.file_metadata_cache.is_some() { // Need to retrieve the entire metadata for the caching to be effective. - reader = reader.with_page_index_policy(PageIndexPolicy::Optional); + decoder = decoder.with_page_index_policy(PageIndexPolicy::Optional); + } else { + decoder = decoder.with_page_index_policy(PageIndexPolicy::Skip); } - let metadata = Arc::new( - reader - .load_and_finish(fetch, object_meta.size) + // If we have a size hint, prefetch that many bytes from the end of the file + if let Some(hint) = self.metadata_size_hint { + let prefetch_start = file_size.saturating_sub(hint as u64); + let prefetch_range = prefetch_start..file_size; + let data = self + .store + .get_ranges( + &self.object_meta.location, + std::slice::from_ref(&prefetch_range), + ) .await - .map_err(DataFusionError::from)?, - ); + .map_err(DataFusionError::from)?; + decoder + .push_ranges(vec![prefetch_range], data) + .map_err(DataFusionError::from)?; + } + + let metadata = loop { + match decoder.try_decode().map_err(DataFusionError::from)? { + DecodeResult::Data(metadata) => break metadata, + DecodeResult::NeedsData(ranges) => { + let buffers = self + .store + .get_ranges(&self.object_meta.location, &ranges) + .await + .map_err(DataFusionError::from)?; + decoder + .push_ranges(ranges, buffers) + .map_err(DataFusionError::from)?; + } + DecodeResult::Finished => { + return Err(DataFusionError::Internal( + "ParquetMetaDataPushDecoder finished without producing metadata" + .to_string(), + )); + } + } + }; + + let metadata = Arc::new(metadata); - if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache { + if cache_metadata && let Some(file_metadata_cache) = &self.file_metadata_cache { file_metadata_cache.put( - &object_meta.location, + &self.object_meta.location, CachedFileMetadataEntry::new( - (*object_meta).clone(), + self.object_meta.clone(), Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), ), );