@@ -25,8 +25,6 @@ use crate::{
2525} ;
2626use arrow:: array:: { RecordBatch , RecordBatchOptions } ;
2727use arrow:: datatypes:: { DataType , Schema } ;
28- #[ cfg( test) ]
29- use datafusion_datasource:: file_stream:: FileOpenFuture ;
3028use datafusion_datasource:: morsel:: { Morsel , MorselPlan , MorselPlanner , Morselizer } ;
3129use datafusion_physical_expr:: projection:: { ProjectionExprs , Projector } ;
3230use datafusion_physical_expr:: utils:: reassign_expr_columns;
@@ -58,7 +56,6 @@ use datafusion_physical_plan::metrics::{
5856} ;
5957use datafusion_pruning:: { FilePruner , PruningPredicate , build_pruning_predicate} ;
6058
61- use crate :: sort:: reverse_row_selection;
6259#[ cfg( feature = "parquet_encryption" ) ]
6360use datafusion_common:: config:: EncryptionFactoryOptions ;
6461#[ cfg( feature = "parquet_encryption" ) ]
@@ -73,7 +70,7 @@ use parquet::arrow::arrow_reader::{
7370use parquet:: arrow:: async_reader:: AsyncFileReader ;
7471use parquet:: arrow:: push_decoder:: { ParquetPushDecoder , ParquetPushDecoderBuilder } ;
7572use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask } ;
76- use parquet:: file:: metadata:: { PageIndexPolicy , ParquetMetaDataReader , RowGroupMetaData } ;
73+ use parquet:: file:: metadata:: { PageIndexPolicy , ParquetMetaDataReader } ;
7774use tokio:: sync:: oneshot;
7875
7976/// Implements [`Morselizer`] for a parquet file.
@@ -152,12 +149,9 @@ impl ParquetMorselizer {
152149 }
153150 }
154151
155- #[ cfg( test) ]
156- fn open_file ( & self , partitioned_file : PartitionedFile ) -> Result < FileOpenFuture > {
157- self . state . open_file ( partitioned_file)
158- }
159152}
160153
154+
161155impl Deref for ParquetMorselizer {
162156 type Target = ParquetMorselizerState ;
163157
@@ -190,53 +184,6 @@ impl Debug for ParquetMorselizer {
190184 }
191185}
192186
193- /// Represents a prepared access plan with optional row selection
194- pub ( crate ) struct PreparedAccessPlan {
195- /// Row group indexes to read
196- pub ( crate ) row_group_indexes : Vec < usize > ,
197- /// Optional row selection for filtering within row groups
198- pub ( crate ) row_selection : Option < parquet:: arrow:: arrow_reader:: RowSelection > ,
199- }
200-
201- impl PreparedAccessPlan {
202- /// Create a new prepared access plan from a ParquetAccessPlan
203- pub ( crate ) fn from_access_plan (
204- access_plan : ParquetAccessPlan ,
205- rg_metadata : & [ RowGroupMetaData ] ,
206- ) -> Result < Self > {
207- let row_group_indexes = access_plan. row_group_indexes ( ) ;
208- let row_selection = access_plan. into_overall_row_selection ( rg_metadata) ?;
209-
210- Ok ( Self {
211- row_group_indexes,
212- row_selection,
213- } )
214- }
215-
216- /// Reverse the access plan for reverse scanning
217- pub ( crate ) fn reverse (
218- mut self ,
219- file_metadata : & parquet:: file:: metadata:: ParquetMetaData ,
220- ) -> Result < Self > {
221- // Get the row group indexes before reversing
222- let row_groups_to_scan = self . row_group_indexes . clone ( ) ;
223-
224- // Reverse the row group indexes
225- self . row_group_indexes = self . row_group_indexes . into_iter ( ) . rev ( ) . collect ( ) ;
226-
227- // If we have a row selection, reverse it to match the new row group order
228- if let Some ( row_selection) = self . row_selection {
229- self . row_selection = Some ( reverse_row_selection (
230- & row_selection,
231- file_metadata,
232- & row_groups_to_scan, // Pass the original (non-reversed) row group indexes
233- ) ?) ;
234- }
235-
236- Ok ( self )
237- }
238- }
239-
240187/// Result of preparing a PartitionedFile using CPU before any I/O.
241188///
242189/// This captures the state computed from `PartitionedFile`, the table schema,
@@ -267,40 +214,6 @@ struct MetadataLoadedParquetOpen {
267214}
268215
269216impl ParquetMorselizerState {
270- /// Build a stream for a single parquet file using the copied opener logic.
271- ///
272- /// This remains private so the public integration point for parquet scans is
273- /// the morsel API rather than the legacy `FileOpener` trait.
274- #[ cfg( test) ]
275- fn open_file (
276- self : & Arc < Self > ,
277- partitioned_file : PartitionedFile ,
278- ) -> Result < FileOpenFuture > {
279- let prepared = self . prepare_open_file ( partitioned_file) ?;
280- Ok ( Box :: pin ( async move {
281- #[ cfg( feature = "parquet_encryption" ) ]
282- let prepared = {
283- let mut prepared = prepared;
284- if prepared
285- . state
286- . get_encryption_context ( )
287- . needs_file_decryption_properties ( )
288- {
289- prepared. file_decryption_properties = prepared
290- . state
291- . load_file_decryption_properties (
292- prepared. partitioned_file . object_meta . location . clone ( ) ,
293- )
294- . await ?;
295- }
296- prepared
297- } ;
298- #[ cfg( not( feature = "parquet_encryption" ) ) ]
299- let prepared = prepared;
300- ParquetMorselizerState :: execute_prepared_open ( prepared) . await
301- } ) )
302- }
303-
304217 /// Perform the CPU-only setup for opening a parquet file.
305218 fn prepare_open_file (
306219 self : & Arc < Self > ,
@@ -436,18 +349,6 @@ impl ParquetMorselizerState {
436349 Ok ( None )
437350 }
438351
439- /// Continue opening a parquet file once the CPU-only setup has completed.
440- #[ cfg( test) ]
441- async fn execute_prepared_open (
442- prepared : PreparedParquetOpen ,
443- ) -> Result < BoxStream < ' static , Result < RecordBatch > > > {
444- let Some ( prepared) = Self :: prune_prepared_file ( prepared) ? else {
445- return Ok ( futures:: stream:: empty ( ) . boxed ( ) ) ;
446- } ;
447- let loaded = Self :: load_prepared_metadata ( prepared) . await ?;
448- Self :: execute_metadata_loaded_open ( loaded) . await
449- }
450-
451352 /// CPU-only file pruning performed before metadata I/O begins.
452353 ///
453354 /// Returns `None` if the file was completely pruned
@@ -727,8 +628,7 @@ impl ParquetMorselizerState {
727628 }
728629
729630 // Prepare the access plan (extract row groups and row selection)
730- let mut prepared_plan =
731- PreparedAccessPlan :: from_access_plan ( access_plan, rg_metadata) ?;
631+ let mut prepared_plan = access_plan. prepare ( rg_metadata) ?;
732632
733633 // ----------------------------------------------------------
734634 // Step: potentially reverse the access plan for performance.
@@ -1953,7 +1853,7 @@ mod test {
19531853 let expr = col ( "a" ) . eq ( lit ( 1 ) ) ;
19541854 let predicate = logical2physical ( & expr, & schema) ;
19551855 let opener = make_opener ( predicate) ;
1956- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
1856+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
19571857 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
19581858 assert_eq ! ( num_batches, 1 ) ;
19591859 assert_eq ! ( num_rows, 3 ) ;
@@ -1962,7 +1862,7 @@ mod test {
19621862 let expr = col ( "b" ) . eq ( lit ( ScalarValue :: Float32 ( Some ( 5.0 ) ) ) ) ;
19631863 let predicate = logical2physical ( & expr, & schema) ;
19641864 let opener = make_opener ( predicate) ;
1965- let stream = opener . open_file ( file) . unwrap ( ) . await . unwrap ( ) ;
1865+ let stream = open_via_morselizer ( & opener , file) . await ;
19661866 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
19671867 assert_eq ! ( num_batches, 0 ) ;
19681868 assert_eq ! ( num_rows, 0 ) ;
@@ -2008,7 +1908,7 @@ mod test {
20081908 // Otherwise we assume it already happened at the planning stage and won't re-do the work here
20091909 let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
20101910 let opener = make_opener ( predicate) ;
2011- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
1911+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
20121912 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
20131913 assert_eq ! ( num_batches, 1 ) ;
20141914 assert_eq ! ( num_rows, 3 ) ;
@@ -2019,7 +1919,7 @@ mod test {
20191919 // Otherwise we assume it already happened at the planning stage and won't re-do the work here
20201920 let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
20211921 let opener = make_opener ( predicate) ;
2022- let stream = opener . open_file ( file) . unwrap ( ) . await . unwrap ( ) ;
1922+ let stream = open_via_morselizer ( & opener , file) . await ;
20231923 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
20241924 assert_eq ! ( num_batches, 0 ) ;
20251925 assert_eq ! ( num_rows, 0 ) ;
@@ -2075,7 +1975,7 @@ mod test {
20751975 let expr = col ( "part" ) . eq ( lit ( 1 ) ) . and ( col ( "b" ) . eq ( lit ( 1.0 ) ) ) ;
20761976 let predicate = logical2physical ( & expr, & table_schema) ;
20771977 let opener = make_opener ( predicate) ;
2078- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
1978+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
20791979 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
20801980 assert_eq ! ( num_batches, 1 ) ;
20811981 assert_eq ! ( num_rows, 3 ) ;
@@ -2084,7 +1984,7 @@ mod test {
20841984 let expr = col ( "part" ) . eq ( lit ( 2 ) ) . and ( col ( "b" ) . eq ( lit ( 1.0 ) ) ) ;
20851985 let predicate = logical2physical ( & expr, & table_schema) ;
20861986 let opener = make_opener ( predicate) ;
2087- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
1987+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
20881988 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
20891989 assert_eq ! ( num_batches, 0 ) ;
20901990 assert_eq ! ( num_rows, 0 ) ;
@@ -2093,7 +1993,7 @@ mod test {
20931993 let expr = col ( "part" ) . eq ( lit ( 1 ) ) . and ( col ( "b" ) . eq ( lit ( 7.0 ) ) ) ;
20941994 let predicate = logical2physical ( & expr, & table_schema) ;
20951995 let opener = make_opener ( predicate) ;
2096- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
1996+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
20971997 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
20981998 assert_eq ! ( num_batches, 0 ) ;
20991999 assert_eq ! ( num_rows, 0 ) ;
@@ -2102,7 +2002,7 @@ mod test {
21022002 let expr = col ( "part" ) . eq ( lit ( 2 ) ) . and ( col ( "b" ) . eq ( lit ( 7.0 ) ) ) ;
21032003 let predicate = logical2physical ( & expr, & table_schema) ;
21042004 let opener = make_opener ( predicate) ;
2105- let stream = opener . open_file ( file) . unwrap ( ) . await . unwrap ( ) ;
2005+ let stream = open_via_morselizer ( & opener , file) . await ;
21062006 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
21072007 assert_eq ! ( num_batches, 0 ) ;
21082008 assert_eq ! ( num_rows, 0 ) ;
@@ -2148,7 +2048,7 @@ mod test {
21482048 let expr = col ( "part" ) . eq ( lit ( 1 ) ) . or ( col ( "a" ) . eq ( lit ( 1 ) ) ) ;
21492049 let predicate = logical2physical ( & expr, & table_schema) ;
21502050 let opener = make_opener ( predicate) ;
2151- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2051+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
21522052 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
21532053 assert_eq ! ( num_batches, 1 ) ;
21542054 assert_eq ! ( num_rows, 3 ) ;
@@ -2157,7 +2057,7 @@ mod test {
21572057 let expr = col ( "part" ) . eq ( lit ( 1 ) ) . or ( col ( "a" ) . eq ( lit ( 3 ) ) ) ;
21582058 let predicate = logical2physical ( & expr, & table_schema) ;
21592059 let opener = make_opener ( predicate) ;
2160- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2060+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
21612061 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
21622062 assert_eq ! ( num_batches, 1 ) ;
21632063 assert_eq ! ( num_rows, 3 ) ;
@@ -2166,7 +2066,7 @@ mod test {
21662066 let expr = col ( "part" ) . eq ( lit ( 2 ) ) . or ( col ( "a" ) . eq ( lit ( 1 ) ) ) ;
21672067 let predicate = logical2physical ( & expr, & table_schema) ;
21682068 let opener = make_opener ( predicate) ;
2169- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2069+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
21702070 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
21712071 assert_eq ! ( num_batches, 1 ) ;
21722072 assert_eq ! ( num_rows, 1 ) ;
@@ -2175,7 +2075,7 @@ mod test {
21752075 let expr = col ( "part" ) . eq ( lit ( 2 ) ) . or ( col ( "a" ) . eq ( lit ( 3 ) ) ) ;
21762076 let predicate = logical2physical ( & expr, & table_schema) ;
21772077 let opener = make_opener ( predicate) ;
2178- let stream = opener . open_file ( file) . unwrap ( ) . await . unwrap ( ) ;
2078+ let stream = open_via_morselizer ( & opener , file) . await ;
21792079 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
21802080 assert_eq ! ( num_batches, 0 ) ;
21812081 assert_eq ! ( num_rows, 0 ) ;
@@ -2228,7 +2128,7 @@ mod test {
22282128 let expr = col ( "a" ) . eq ( lit ( 42 ) ) ;
22292129 let predicate = logical2physical ( & expr, & table_schema) ;
22302130 let opener = make_opener ( predicate) ;
2231- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2131+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
22322132 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
22332133 assert_eq ! ( num_batches, 0 ) ;
22342134 assert_eq ! ( num_rows, 0 ) ;
@@ -2237,7 +2137,7 @@ mod test {
22372137 // This allows dynamic filters to prune partitions/files even if they are populated late into execution.
22382138 let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
22392139 let opener = make_opener ( predicate) ;
2240- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2140+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
22412141 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
22422142 assert_eq ! ( num_batches, 0 ) ;
22432143 assert_eq ! ( num_rows, 0 ) ;
@@ -2247,7 +2147,7 @@ mod test {
22472147 let expr = col ( "part" ) . eq ( lit ( 2 ) ) ;
22482148 let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
22492149 let opener = make_opener ( predicate) ;
2250- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2150+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
22512151 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
22522152 assert_eq ! ( num_batches, 0 ) ;
22532153 assert_eq ! ( num_rows, 0 ) ;
@@ -2256,7 +2156,7 @@ mod test {
22562156 let expr = col ( "part" ) . eq ( lit ( 2 ) ) . and ( col ( "a" ) . eq ( lit ( 42 ) ) ) ;
22572157 let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
22582158 let opener = make_opener ( predicate) ;
2259- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2159+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
22602160 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
22612161 assert_eq ! ( num_batches, 0 ) ;
22622162 assert_eq ! ( num_rows, 0 ) ;
@@ -2307,12 +2207,12 @@ mod test {
23072207
23082208 // Test normal scan (forward)
23092209 let opener = make_opener ( false ) ;
2310- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2210+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
23112211 let forward_values = collect_int32_values ( stream) . await ;
23122212
23132213 // Test reverse scan
23142214 let opener = make_opener ( true ) ;
2315- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2215+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
23162216 let reverse_values = collect_int32_values ( stream) . await ;
23172217
23182218 // The forward scan should return data in the order written
@@ -2352,7 +2252,7 @@ mod test {
23522252 ) ;
23532253
23542254 let opener_values =
2355- collect_int32_values ( opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) )
2255+ collect_int32_values ( open_via_morselizer ( & opener , file. clone ( ) ) . await )
23562256 . await ;
23572257 let morsel_values =
23582258 collect_int32_values ( open_via_morselizer ( & opener, file) . await ) . await ;
@@ -2387,15 +2287,11 @@ mod test {
23872287 // With a single row group, forward and reverse should be the same
23882288 // (only the row group order is reversed, not the rows within)
23892289 let opener_forward = make_opener ( false ) ;
2390- let stream_forward = opener_forward
2391- . open_file ( file. clone ( ) )
2392- . unwrap ( )
2393- . await
2394- . unwrap ( ) ;
2290+ let stream_forward = open_via_morselizer ( & opener_forward, file. clone ( ) ) . await ;
23952291 let ( batches_forward, _) = count_batches_and_rows ( stream_forward) . await ;
23962292
23972293 let opener_reverse = make_opener ( true ) ;
2398- let stream_reverse = opener_reverse . open_file ( file) . unwrap ( ) . await . unwrap ( ) ;
2294+ let stream_reverse = open_via_morselizer ( & opener_reverse , file) . await ;
23992295 let ( batches_reverse, _) = count_batches_and_rows ( stream_reverse) . await ;
24002296
24012297 // Both should have the same number of batches since there's only one row group
@@ -2466,7 +2362,7 @@ mod test {
24662362
24672363 // Forward scan: RG0(3,4), RG1(5,6,7,8), RG2(9,10)
24682364 let opener = make_opener ( false ) ;
2469- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2365+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
24702366 let forward_values = collect_int32_values ( stream) . await ;
24712367
24722368 // Forward scan should produce: RG0(3,4), RG1(5,6,7,8), RG2(9,10)
@@ -2482,7 +2378,7 @@ mod test {
24822378 // - RG1 is read second, WITH RG1's selection (select all) -> 5, 6, 7, 8
24832379 // - RG0 is read third, WITH RG0's selection (skip 2, select 2) -> 3, 4
24842380 let opener = make_opener ( true ) ;
2485- let stream = opener . open_file ( file) . unwrap ( ) . await . unwrap ( ) ;
2381+ let stream = open_via_morselizer ( & opener , file) . await ;
24862382 let reverse_values = collect_int32_values ( stream) . await ;
24872383
24882384 // Correct expected result: row groups reversed but each keeps its own selection
@@ -2568,7 +2464,7 @@ mod test {
25682464 // Forward scan: RG0(1), RG2(5), RG3(7)
25692465 // Note: RG1 is completely skipped
25702466 let opener = make_opener ( false ) ;
2571- let stream = opener . open_file ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
2467+ let stream = open_via_morselizer ( & opener , file. clone ( ) ) . await ;
25722468 let forward_values = collect_int32_values ( stream) . await ;
25732469
25742470 assert_eq ! (
@@ -2581,7 +2477,7 @@ mod test {
25812477 // WITHOUT the bug fix, this would return WRONG values
25822478 // because the RowSelection would be incorrectly mapped
25832479 let opener = make_opener ( true ) ;
2584- let stream = opener . open_file ( file) . unwrap ( ) . await . unwrap ( ) ;
2480+ let stream = open_via_morselizer ( & opener , file) . await ;
25852481 let reverse_values = collect_int32_values ( stream) . await ;
25862482
25872483 assert_eq ! (
0 commit comments