Skip to content

Commit 422129c

Browse files
Dandandanclaude
andauthored
Use ParquetMetaDataPushDecoder instead of ParquetMetaDataReader (#21357)
## Which issue does this PR close? - Closes #20856. ## Rationale for this change Replace pull-based `ParquetMetaDataReader` with push-based `ParquetMetaDataPushDecoder`, which decouples IO from decoding. We don't actually do this in this PR, just use the different API to make it more obvious it is possible. ## What changes are included in this PR? - Replace `ParquetMetaDataReader` with `ParquetMetaDataPushDecoder` ## Are these changes tested? Covered by existing tests — this is a behavioral-equivalent replacement of the metadata reading path. ## Are there any user-facing changes? No user-facing API changes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 587f4c0 commit 422129c

2 files changed

Lines changed: 67 additions & 42 deletions

File tree

datafusion/core/tests/datasource/object_store_access.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ async fn create_single_parquet_file_default() {
353353
RequestCountingObjectStore()
354354
Total Requests: 2
355355
- GET (opts) path=parquet_table.parquet head=true
356-
- GET (opts) path=parquet_table.parquet range=0-2994
356+
- GET (ranges) path=parquet_table.parquet ranges=0-2994
357357
"
358358
);
359359
}
@@ -372,7 +372,7 @@ async fn create_single_parquet_file_prefetch() {
372372
RequestCountingObjectStore()
373373
Total Requests: 2
374374
- GET (opts) path=parquet_table.parquet head=true
375-
- GET (opts) path=parquet_table.parquet range=1994-2994
375+
- GET (ranges) path=parquet_table.parquet ranges=1994-2994
376376
"
377377
);
378378
}
@@ -401,9 +401,9 @@ async fn create_single_parquet_file_too_small_prefetch() {
401401
RequestCountingObjectStore()
402402
Total Requests: 4
403403
- GET (opts) path=parquet_table.parquet head=true
404-
- GET (opts) path=parquet_table.parquet range=2494-2994
405-
- GET (opts) path=parquet_table.parquet range=2264-2986
406-
- GET (opts) path=parquet_table.parquet range=2124-2264
404+
- GET (ranges) path=parquet_table.parquet ranges=2494-2994
405+
- GET (ranges) path=parquet_table.parquet ranges=2264-2986
406+
- GET (ranges) path=parquet_table.parquet ranges=2124-2264
407407
"
408408
);
409409
}
@@ -433,8 +433,8 @@ async fn create_single_parquet_file_small_prefetch() {
433433
RequestCountingObjectStore()
434434
Total Requests: 3
435435
- GET (opts) path=parquet_table.parquet head=true
436-
- GET (opts) path=parquet_table.parquet range=2254-2994
437-
- GET (opts) path=parquet_table.parquet range=2124-2264
436+
- GET (ranges) path=parquet_table.parquet ranges=2254-2994
437+
- GET (ranges) path=parquet_table.parquet ranges=2124-2264
438438
"
439439
);
440440
}
@@ -457,7 +457,7 @@ async fn create_single_parquet_file_no_prefetch() {
457457
RequestCountingObjectStore()
458458
Total Requests: 2
459459
- GET (opts) path=parquet_table.parquet head=true
460-
- GET (opts) path=parquet_table.parquet range=0-2994
460+
- GET (ranges) path=parquet_table.parquet ranges=0-2994
461461
"
462462
);
463463
}

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
//! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics
1919
//! and schema information.
2020
21-
use crate::{
22-
ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution,
23-
};
21+
use crate::{apply_file_schema_type_coercions, coerce_int96_to_resolution};
2422
use arrow::array::{Array, ArrayRef, BooleanArray};
2523
use arrow::compute::and;
2624
use arrow::compute::kernels::cmp::eq;
@@ -41,10 +39,11 @@ use datafusion_physical_plan::Accumulator;
4139
use log::debug;
4240
use object_store::path::Path;
4341
use object_store::{ObjectMeta, ObjectStore};
42+
use parquet::DecodeResult;
4443
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
4544
use parquet::arrow::{parquet_column, parquet_to_arrow_schema};
4645
use parquet::file::metadata::{
47-
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
46+
PageIndexPolicy, ParquetMetaData, ParquetMetaDataPushDecoder, RowGroupMetaData,
4847
SortingColumn,
4948
};
5049
use parquet::schema::types::SchemaDescriptor;
@@ -119,25 +118,14 @@ impl<'a> DFParquetMetadata<'a> {
119118

120119
/// Fetch parquet metadata from the remote object store
121120
pub async fn fetch_metadata(&self) -> Result<Arc<ParquetMetaData>> {
122-
let Self {
123-
store,
124-
object_meta,
125-
metadata_size_hint,
126-
decryption_properties,
127-
file_metadata_cache,
128-
coerce_int96: _,
129-
} = self;
130-
131-
let fetch = ObjectStoreFetch::new(*store, object_meta);
132-
133121
// implementation to fetch parquet metadata
134122
let cache_metadata =
135-
!cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
123+
!cfg!(feature = "parquet_encryption") || self.decryption_properties.is_none();
136124

137125
if cache_metadata
138-
&& let Some(file_metadata_cache) = file_metadata_cache.as_ref()
139-
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
140-
&& cached.is_valid_for(object_meta)
126+
&& let Some(file_metadata_cache) = self.file_metadata_cache.as_ref()
127+
&& let Some(cached) = file_metadata_cache.get(&self.object_meta.location)
128+
&& cached.is_valid_for(self.object_meta)
141129
&& let Some(cached_parquet) = cached
142130
.file_metadata
143131
.as_any()
@@ -146,32 +134,69 @@ impl<'a> DFParquetMetadata<'a> {
146134
return Ok(Arc::clone(cached_parquet.parquet_metadata()));
147135
}
148136

149-
let mut reader =
150-
ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint);
137+
let file_size = self.object_meta.size;
138+
let mut decoder = ParquetMetaDataPushDecoder::try_new(file_size)
139+
.map_err(DataFusionError::from)?;
151140

152141
#[cfg(feature = "parquet_encryption")]
153-
if let Some(decryption_properties) = decryption_properties {
154-
reader = reader
155-
.with_decryption_properties(Some(Arc::clone(decryption_properties)));
142+
if let Some(decryption_properties) = &self.decryption_properties {
143+
decoder = decoder
144+
.with_file_decryption_properties(Some(Arc::clone(decryption_properties)));
156145
}
157146

158-
if cache_metadata && file_metadata_cache.is_some() {
147+
if cache_metadata && self.file_metadata_cache.is_some() {
159148
// Need to retrieve the entire metadata for the caching to be effective.
160-
reader = reader.with_page_index_policy(PageIndexPolicy::Optional);
149+
decoder = decoder.with_page_index_policy(PageIndexPolicy::Optional);
150+
} else {
151+
decoder = decoder.with_page_index_policy(PageIndexPolicy::Skip);
161152
}
162153

163-
let metadata = Arc::new(
164-
reader
165-
.load_and_finish(fetch, object_meta.size)
154+
// If we have a size hint, prefetch that many bytes from the end of the file
155+
if let Some(hint) = self.metadata_size_hint {
156+
let prefetch_start = file_size.saturating_sub(hint as u64);
157+
let prefetch_range = prefetch_start..file_size;
158+
let data = self
159+
.store
160+
.get_ranges(
161+
&self.object_meta.location,
162+
std::slice::from_ref(&prefetch_range),
163+
)
166164
.await
167-
.map_err(DataFusionError::from)?,
168-
);
165+
.map_err(DataFusionError::from)?;
166+
decoder
167+
.push_ranges(vec![prefetch_range], data)
168+
.map_err(DataFusionError::from)?;
169+
}
170+
171+
let metadata = loop {
172+
match decoder.try_decode().map_err(DataFusionError::from)? {
173+
DecodeResult::Data(metadata) => break metadata,
174+
DecodeResult::NeedsData(ranges) => {
175+
let buffers = self
176+
.store
177+
.get_ranges(&self.object_meta.location, &ranges)
178+
.await
179+
.map_err(DataFusionError::from)?;
180+
decoder
181+
.push_ranges(ranges, buffers)
182+
.map_err(DataFusionError::from)?;
183+
}
184+
DecodeResult::Finished => {
185+
return Err(DataFusionError::Internal(
186+
"ParquetMetaDataPushDecoder finished without producing metadata"
187+
.to_string(),
188+
));
189+
}
190+
}
191+
};
192+
193+
let metadata = Arc::new(metadata);
169194

170-
if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
195+
if cache_metadata && let Some(file_metadata_cache) = &self.file_metadata_cache {
171196
file_metadata_cache.put(
172-
&object_meta.location,
197+
&self.object_meta.location,
173198
CachedFileMetadataEntry::new(
174-
(*object_meta).clone(),
199+
self.object_meta.clone(),
175200
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
176201
),
177202
);

0 commit comments

Comments
 (0)