Skip to content

Commit 9c23a24

Browse files
committed
more tests
1 parent 8d91cf9 commit 9c23a24

1 file changed

Lines changed: 123 additions & 17 deletions

File tree

datafusion/datasource/src/file_stream.rs

Lines changed: 123 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -778,12 +778,46 @@ mod tests {
778778
.expect("error executing stream")
779779
}
780780

781+
/// Helper for morsel-driven `FileStream` tests that bundles the mock
782+
/// `Morselizer` setup with the corresponding `FileScanConfig`.
783+
struct MorselTest {
784+
morselizer: MockMorselizer,
785+
file_names: Vec<String>,
786+
}
787+
788+
impl MorselTest {
789+
fn new() -> Self {
790+
Self {
791+
morselizer: MockMorselizer::new(),
792+
file_names: vec![],
793+
}
794+
}
795+
796+
fn with_file(mut self, path: impl Into<String>, planner: MockPlanner) -> Self {
797+
let path = path.into();
798+
self.morselizer = self.morselizer.with_file(path.clone(), planner);
799+
self.file_names.push(path);
800+
self
801+
}
802+
803+
fn with_observer(mut self, observer: MorselObserver) -> Self {
804+
self.morselizer = self.morselizer.with_observer(observer);
805+
self
806+
}
807+
808+
async fn run(self) -> Result<String> {
809+
let file_names = self.file_names.iter().map(String::as_str).collect();
810+
let config = test_config(file_names);
811+
run_stream(self.morselizer, config).await
812+
}
813+
}
814+
781815
/// Verifies the simplest morsel-driven flow: one planner produces one
782816
/// morsel immediately, and the morsel is then scanned to completion.
783817
#[tokio::test]
784818
async fn morsel_framework_single_morsel_no_io() -> Result<()> {
785819
let observer = MorselObserver::new();
786-
let morselizer = MockMorselizer::new()
820+
let test = MorselTest::new()
787821
.with_file(
788822
"file1.parquet",
789823
MockPlanner::builder()
@@ -794,8 +828,7 @@ mod tests {
794828
)
795829
.with_observer(observer.clone());
796830

797-
let config = test_config(vec!["file1.parquet"]);
798-
insta::assert_snapshot!(run_stream(morselizer, config).await.unwrap(), @r"
831+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
799832
Batch: 42
800833
Done
801834
");
@@ -820,9 +853,9 @@ mod tests {
820853
/// Verifies that a planner can block on one I/O phase, resume, and only
821854
/// then produce its morsel.
822855
#[tokio::test]
823-
async fn morsel_framework_single_io() -> Result<()> {
856+
async fn morsel_framework_single_morsel_io() -> Result<()> {
824857
let observer = MorselObserver::new();
825-
let morselizer = MockMorselizer::new()
858+
let test = MorselTest::new()
826859
.with_file(
827860
"file1.parquet",
828861
MockPlanner::builder()
@@ -834,8 +867,7 @@ mod tests {
834867
)
835868
.with_observer(observer.clone());
836869

837-
let config = test_config(vec!["file1.parquet"]);
838-
insta::assert_snapshot!(run_stream(morselizer, config).await.unwrap(), @r"
870+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
839871
Batch: 42
840872
Done
841873
");
@@ -870,7 +902,7 @@ mod tests {
870902
#[tokio::test]
871903
async fn morsel_framework_two_cpu_steps_before_morsel() -> Result<()> {
872904
let observer = MorselObserver::new();
873-
let morselizer = MockMorselizer::new()
905+
let test = MorselTest::new()
874906
.with_file(
875907
"file1.parquet",
876908
MockPlanner::builder()
@@ -883,8 +915,7 @@ mod tests {
883915
)
884916
.with_observer(observer.clone());
885917

886-
let config = test_config(vec!["file1.parquet"]);
887-
insta::assert_snapshot!(run_stream(morselizer, config).await.unwrap(), @r"
918+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
888919
Batch: 42
889920
Done
890921
");
@@ -911,7 +942,7 @@ mod tests {
911942
/// Verifies direct morsels returned from a planner are consumed before
912943
/// batches produced by any returned child planners.
913944
#[tokio::test]
914-
async fn morsel_framework_direct_morsel_before_child_planner() -> Result<()> {
945+
async fn morsel_framework_morsels_before_child_planner() -> Result<()> {
915946
let observer = MorselObserver::new();
916947
let child_planner = MockPlanner::builder()
917948
.with_id(PlannerId(1))
@@ -930,12 +961,11 @@ mod tests {
930961
.return_none()
931962
.build();
932963

933-
let morselizer = MockMorselizer::new()
964+
let test = MorselTest::new()
934965
.with_file("file1.parquet", parent_planner)
935966
.with_observer(observer.clone());
936967

937-
let config = test_config(vec!["file1.parquet"]);
938-
insta::assert_snapshot!(run_stream(morselizer, config).await.unwrap(), @r"
968+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
939969
Batch: 42
940970
Batch: 43
941971
Done
@@ -965,13 +995,90 @@ mod tests {
965995
Ok(())
966996
}
967997

998+
/// Verifies the non-ordered behavior for child planners: if the first child
999+
/// planner blocks on I/O and the second can make progress immediately, the
1000+
/// second planner's batches are emitted first.
1001+
#[tokio::test]
1002+
async fn morsel_framework_child_planner_reorder() -> Result<()> {
1003+
let observer = MorselObserver::new();
1004+
let planner_1 = MockPlanner::builder()
1005+
.with_id(PlannerId(1))
1006+
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(100), 2))
1007+
.return_morsel(MorselId(11), 41)
1008+
.return_none()
1009+
.build();
1010+
let planner_2 = MockPlanner::builder()
1011+
.with_id(PlannerId(2))
1012+
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(101), 1)) // IO returns after 1 poll
1013+
.return_morsel(MorselId(12), 42)
1014+
.return_none()
1015+
.build();
1016+
1017+
let parent_planner = MockPlanner::builder()
1018+
.with_id(PlannerId(0))
1019+
.return_plan(
1020+
ReturnPlanBuilder::new()
1021+
.with_planner(planner_1)
1022+
.with_planner(planner_2),
1023+
)
1024+
.return_none()
1025+
.build();
1026+
1027+
let test = MorselTest::new()
1028+
.with_file("file1.parquet", parent_planner)
1029+
.with_observer(observer.clone());
1030+
1031+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
1032+
Batch: 42
1033+
Batch: 41
1034+
Done
1035+
");
1036+
1037+
assert_eq!(
1038+
observer.events(),
1039+
MorselEventsBuilder::new()
1040+
.morselize_file("file1.parquet")
1041+
.planner_created(PlannerId(0))
1042+
.planner_called(PlannerId(0))
1043+
.planner_produced_child(PlannerId(0), PlannerId(1))
1044+
.planner_produced_child(PlannerId(0), PlannerId(2))
1045+
.planner_called(PlannerId(0))
1046+
.planner_called(PlannerId(1))
1047+
.io_future_created(PlannerId(1), IoFutureId(100))
1048+
.planner_called(PlannerId(2))
1049+
.io_future_created(PlannerId(2), IoFutureId(101))
1050+
.io_future_polled(PlannerId(1), IoFutureId(100))
1051+
.io_future_polled(PlannerId(2), IoFutureId(101))
1052+
.io_future_polled(PlannerId(1), IoFutureId(100))
1053+
.io_future_polled(PlannerId(2), IoFutureId(101))
1054+
.io_future_resolved(PlannerId(2), IoFutureId(101))
1055+
.planner_called(PlannerId(2))
1056+
.morsel_produced(PlannerId(2), MorselId(12))
1057+
.planner_called(PlannerId(2))
1058+
.io_future_polled(PlannerId(1), IoFutureId(100))
1059+
.io_future_resolved(PlannerId(1), IoFutureId(100))
1060+
.planner_called(PlannerId(1))
1061+
.morsel_produced(PlannerId(1), MorselId(11))
1062+
.planner_called(PlannerId(1))
1063+
.morsel_stream_started(MorselId(12))
1064+
.morsel_stream_batch_produced(MorselId(12), BatchId(42))
1065+
.morsel_stream_finished(MorselId(12))
1066+
.morsel_stream_started(MorselId(11))
1067+
.morsel_stream_batch_produced(MorselId(11), BatchId(41))
1068+
.morsel_stream_finished(MorselId(11))
1069+
.build()
1070+
);
1071+
1072+
Ok(())
1073+
}
1074+
9681075
/// Verifies that `FileStream` overlaps planner I/O across multiple files
9691076
/// rather than waiting for the first file to finish before starting the
9701077
/// second.
9711078
#[tokio::test]
9721079
async fn morsel_framework_two_files_overlapping_io() -> Result<()> {
9731080
let observer = MorselObserver::new();
974-
let morselizer = MockMorselizer::new()
1081+
let test = MorselTest::new()
9751082
.with_file(
9761083
"file1.parquet",
9771084
MockPlanner::builder()
@@ -992,8 +1099,7 @@ mod tests {
9921099
)
9931100
.with_observer(observer.clone());
9941101

995-
let config = test_config(vec!["file1.parquet", "file2.parquet"]);
996-
insta::assert_snapshot!(run_stream(morselizer, config).await.unwrap(), @r"
1102+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
9971103
Batch: 42
9981104
Batch: 43
9991105
Done

0 commit comments

Comments
 (0)