@@ -613,8 +613,8 @@ impl FileStreamMetrics {
613613mod tests {
614614 use crate :: file_scan_config:: { FileScanConfig , FileScanConfigBuilder } ;
615615 use crate :: morsel:: test_utils:: {
616- BatchId , IoFutureId , MockMorselizer , MockPlanner , MorselEventsBuilder , MorselId ,
617- MorselObserver , PlannerId , ReturnPlanBuilder ,
616+ BatchId , IoFutureId , MockMorselSpec , MockMorselizer , MockPlanner ,
617+ MorselEventsBuilder , MorselId , MorselObserver , PlannerId , ReturnPlanBuilder ,
618618 } ;
619619 use crate :: tests:: make_partition;
620620 use crate :: { PartitionedFile , TableSchema } ;
@@ -778,6 +778,8 @@ mod tests {
778778 . expect ( "error executing stream" )
779779 }
780780
781+ /// Verifies the simplest morsel-driven flow: one planner produces one
782+ /// morsel immediately, and the morsel is then scanned to completion.
781783 #[ tokio:: test]
782784 async fn morsel_framework_single_morsel_no_io ( ) -> Result < ( ) > {
783785 let observer = MorselObserver :: new ( ) ;
@@ -794,8 +796,8 @@ mod tests {
794796
795797 let config = test_config ( vec ! [ "file1.parquet" ] ) ;
796798 insta:: assert_snapshot!( run_stream( morselizer, config) . await . unwrap( ) , @r"
797- **** Batch: 42
798- **** Done
799+ Batch: 42
800+ Done
799801 " ) ;
800802
801803 assert_eq ! (
@@ -815,6 +817,8 @@ mod tests {
815817 Ok ( ( ) )
816818 }
817819
820+ /// Verifies that a planner can block on one I/O phase, resume, and only
821+ /// then produce its morsel.
818822 #[ tokio:: test]
819823 async fn morsel_framework_single_io ( ) -> Result < ( ) > {
820824 let observer = MorselObserver :: new ( ) ;
@@ -832,8 +836,8 @@ mod tests {
832836
833837 let config = test_config ( vec ! [ "file1.parquet" ] ) ;
834838 insta:: assert_snapshot!( run_stream( morselizer, config) . await . unwrap( ) , @r"
835- **** Batch: 42
836- **** Done
839+ Batch: 42
840+ Done
837841 " ) ;
838842
839843 assert_eq ! (
@@ -860,6 +864,9 @@ mod tests {
860864 Ok ( ( ) )
861865 }
862866
867+ /// Verifies that a planner can require multiple CPU-only `plan()` calls
868+ /// before it discovers any morsels or I/O, matching the staged behavior of
869+ /// the Parquet morsel planner.
863870 #[ tokio:: test]
864871 async fn morsel_framework_two_cpu_steps_before_morsel ( ) -> Result < ( ) > {
865872 let observer = MorselObserver :: new ( ) ;
@@ -878,8 +885,8 @@ mod tests {
878885
879886 let config = test_config ( vec ! [ "file1.parquet" ] ) ;
880887 insta:: assert_snapshot!( run_stream( morselizer, config) . await . unwrap( ) , @r"
881- **** Batch: 42
882- **** Done
888+ Batch: 42
889+ Done
883890 " ) ;
884891
885892 assert_eq ! (
@@ -901,6 +908,133 @@ mod tests {
901908 Ok ( ( ) )
902909 }
903910
911+ /// Verifies direct morsels returned from a planner are consumed before
912+ /// batches produced by any returned child planners.
913+ #[ tokio:: test]
914+ async fn morsel_framework_direct_morsel_before_child_planner ( ) -> Result < ( ) > {
915+ let observer = MorselObserver :: new ( ) ;
916+ let child_planner = MockPlanner :: builder ( )
917+ . with_id ( PlannerId ( 1 ) )
918+ . return_morsel ( MorselId ( 11 ) , 43 )
919+ . return_none ( )
920+ . build ( ) ;
921+
922+ // planner 0 returns batch 42
923+ let parent_planner = MockPlanner :: builder ( )
924+ . with_id ( PlannerId ( 0 ) )
925+ . return_plan (
926+ ReturnPlanBuilder :: new ( )
927+ . with_morsel ( MockMorselSpec :: single_batch ( MorselId ( 10 ) , 42 ) )
928+ . with_planner ( child_planner) ,
929+ )
930+ . return_none ( )
931+ . build ( ) ;
932+
933+ let morselizer = MockMorselizer :: new ( )
934+ . with_file ( "file1.parquet" , parent_planner)
935+ . with_observer ( observer. clone ( ) ) ;
936+
937+ let config = test_config ( vec ! [ "file1.parquet" ] ) ;
938+ insta:: assert_snapshot!( run_stream( morselizer, config) . await . unwrap( ) , @r"
939+ Batch: 42
940+ Batch: 43
941+ Done
942+ " ) ;
943+
944+ assert_eq ! (
945+ observer. events( ) ,
946+ MorselEventsBuilder :: new( )
947+ . morselize_file( "file1.parquet" )
948+ . planner_created( PlannerId ( 0 ) )
949+ . planner_called( PlannerId ( 0 ) )
950+ . morsel_produced( PlannerId ( 0 ) , MorselId ( 10 ) )
951+ . planner_produced_child( PlannerId ( 0 ) , PlannerId ( 1 ) )
952+ . planner_called( PlannerId ( 0 ) )
953+ . planner_called( PlannerId ( 1 ) )
954+ . morsel_produced( PlannerId ( 1 ) , MorselId ( 11 ) )
955+ . planner_called( PlannerId ( 1 ) )
956+ . morsel_stream_started( MorselId ( 10 ) )
957+ . morsel_stream_batch_produced( MorselId ( 10 ) , BatchId ( 42 ) )
958+ . morsel_stream_finished( MorselId ( 10 ) )
959+ . morsel_stream_started( MorselId ( 11 ) )
960+ . morsel_stream_batch_produced( MorselId ( 11 ) , BatchId ( 43 ) )
961+ . morsel_stream_finished( MorselId ( 11 ) )
962+ . build( )
963+ ) ;
964+
965+ Ok ( ( ) )
966+ }
967+
968+ /// Verifies that `FileStream` overlaps planner I/O across multiple files
969+ /// rather than waiting for the first file to finish before starting the
970+ /// second.
971+ #[ tokio:: test]
972+ async fn morsel_framework_two_files_overlapping_io ( ) -> Result < ( ) > {
973+ let observer = MorselObserver :: new ( ) ;
974+ let morselizer = MockMorselizer :: new ( )
975+ . with_file (
976+ "file1.parquet" ,
977+ MockPlanner :: builder ( )
978+ . with_id ( PlannerId ( 0 ) )
979+ . return_plan ( ReturnPlanBuilder :: new ( ) . with_io ( IoFutureId ( 100 ) , 1 ) )
980+ . return_morsel ( MorselId ( 10 ) , 42 )
981+ . return_none ( )
982+ . build ( ) ,
983+ )
984+ . with_file (
985+ "file2.parquet" ,
986+ MockPlanner :: builder ( )
987+ . with_id ( PlannerId ( 1 ) )
988+ . return_plan ( ReturnPlanBuilder :: new ( ) . with_io ( IoFutureId ( 101 ) , 1 ) )
989+ . return_morsel ( MorselId ( 11 ) , 43 )
990+ . return_none ( )
991+ . build ( ) ,
992+ )
993+ . with_observer ( observer. clone ( ) ) ;
994+
995+ let config = test_config ( vec ! [ "file1.parquet" , "file2.parquet" ] ) ;
996+ insta:: assert_snapshot!( run_stream( morselizer, config) . await . unwrap( ) , @r"
997+ Batch: 42
998+ Batch: 43
999+ Done
1000+ " ) ;
1001+
1002+ assert_eq ! (
1003+ observer. events( ) ,
1004+ MorselEventsBuilder :: new( )
1005+ . morselize_file( "file1.parquet" )
1006+ . planner_created( PlannerId ( 0 ) )
1007+ . planner_called( PlannerId ( 0 ) )
1008+ . io_future_created( PlannerId ( 0 ) , IoFutureId ( 100 ) )
1009+ . morselize_file( "file2.parquet" )
1010+ . planner_created( PlannerId ( 1 ) )
1011+ . planner_called( PlannerId ( 1 ) )
1012+ // note IO from both planners occurs before CPU for planner 0
1013+ . io_future_created( PlannerId ( 1 ) , IoFutureId ( 101 ) )
1014+ . io_future_polled( PlannerId ( 0 ) , IoFutureId ( 100 ) )
1015+ . io_future_polled( PlannerId ( 1 ) , IoFutureId ( 101 ) )
1016+ . io_future_polled( PlannerId ( 0 ) , IoFutureId ( 100 ) )
1017+ . io_future_resolved( PlannerId ( 0 ) , IoFutureId ( 100 ) )
1018+ . io_future_polled( PlannerId ( 1 ) , IoFutureId ( 101 ) )
1019+ . io_future_resolved( PlannerId ( 1 ) , IoFutureId ( 101 ) )
1020+ . planner_called( PlannerId ( 0 ) )
1021+ . morsel_produced( PlannerId ( 0 ) , MorselId ( 10 ) )
1022+ . planner_called( PlannerId ( 0 ) )
1023+ . planner_called( PlannerId ( 1 ) )
1024+ . morsel_produced( PlannerId ( 1 ) , MorselId ( 11 ) )
1025+ . planner_called( PlannerId ( 1 ) )
1026+ . morsel_stream_started( MorselId ( 10 ) )
1027+ . morsel_stream_batch_produced( MorselId ( 10 ) , BatchId ( 42 ) )
1028+ . morsel_stream_finished( MorselId ( 10 ) )
1029+ . morsel_stream_started( MorselId ( 11 ) )
1030+ . morsel_stream_batch_produced( MorselId ( 11 ) , BatchId ( 43 ) )
1031+ . morsel_stream_finished( MorselId ( 11 ) )
1032+ . build( )
1033+ ) ;
1034+
1035+ Ok ( ( ) )
1036+ }
1037+
9041038 fn test_config ( file_names : Vec < & str > ) -> FileScanConfig {
9051039 let file_group = file_names
9061040 . into_iter ( )
@@ -945,14 +1079,14 @@ mod tests {
9451079 assert_eq ! ( col. len( ) , 1 ) ;
9461080 assert ! ( col. is_valid( 0 ) ) ;
9471081 let batch_id = col. value ( 0 ) ;
948- stream_contents. push ( format ! ( "**** Batch: {batch_id}" ) ) ;
1082+ stream_contents. push ( format ! ( "Batch: {batch_id}" ) ) ;
9491083 }
9501084 Err ( e) => {
951- stream_contents. push ( format ! ( "**** Error: {e}" ) ) ;
1085+ stream_contents. push ( format ! ( "Error: {e}" ) ) ;
9521086 }
9531087 }
9541088 }
955- stream_contents. push ( "**** Done" . to_string ( ) ) ;
1089+ stream_contents. push ( "Done" . to_string ( ) ) ;
9561090 Ok ( stream_contents. join ( "\n " ) )
9571091 }
9581092
0 commit comments