Skip to content

Commit 9ed8aa4

Browse files
committed
rework tests
1 parent 9c23a24 commit 9ed8aa4

2 files changed

Lines changed: 200 additions & 262 deletions

File tree

datafusion/datasource/src/file_stream.rs

Lines changed: 132 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -613,8 +613,8 @@ impl FileStreamMetrics {
613613
mod tests {
614614
use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
615615
use crate::morsel::test_utils::{
616-
BatchId, IoFutureId, MockMorselSpec, MockMorselizer, MockPlanner,
617-
MorselEventsBuilder, MorselId, MorselObserver, PlannerId, ReturnPlanBuilder,
616+
IoFutureId, MockMorselSpec, MockMorselizer, MockPlanner, MorselId,
617+
MorselObserver, PlannerId, ReturnPlanBuilder,
618618
};
619619
use crate::tests::make_partition;
620620
use crate::{PartitionedFile, TableSchema};
@@ -783,13 +783,15 @@ mod tests {
783783
struct MorselTest {
784784
morselizer: MockMorselizer,
785785
file_names: Vec<String>,
786+
observer: Option<MorselObserver>,
786787
}
787788

788789
impl MorselTest {
789790
fn new() -> Self {
790791
Self {
791792
morselizer: MockMorselizer::new(),
792793
file_names: vec![],
794+
observer: None,
793795
}
794796
}
795797

@@ -801,14 +803,25 @@ mod tests {
801803
}
802804

803805
fn with_observer(mut self, observer: MorselObserver) -> Self {
804-
self.morselizer = self.morselizer.with_observer(observer);
806+
self.morselizer = self.morselizer.with_observer(observer.clone());
807+
self.observer = Some(observer);
805808
self
806809
}
807810

808811
async fn run(self) -> Result<String> {
809812
let file_names = self.file_names.iter().map(String::as_str).collect();
810813
let config = test_config(file_names);
811-
run_stream(self.morselizer, config).await
814+
let output = run_stream(self.morselizer, config).await?;
815+
816+
// Snapshot both the produced output and the scheduler trace
817+
// together. This makes scheduler changes much easier to review than
818+
// maintaining long hand-written event assertions separately.
819+
let mut parts = vec!["----- Output Stream -----".to_string(), output];
820+
if let Some(observer) = self.observer {
821+
parts.push("----- File Stream Events -----".to_string());
822+
parts.push(observer.format_events());
823+
}
824+
Ok(parts.join("\n"))
812825
}
813826
}
814827

@@ -829,24 +842,20 @@ mod tests {
829842
.with_observer(observer.clone());
830843

831844
insta::assert_snapshot!(test.run().await.unwrap(), @r"
845+
----- Output Stream -----
832846
Batch: 42
833847
Done
848+
----- File Stream Events -----
849+
morselize_file: file1.parquet
850+
planner_created: PlannerId(0)
851+
planner_called: PlannerId(0)
852+
morsel_produced: PlannerId(0), MorselId(10)
853+
planner_called: PlannerId(0)
854+
morsel_stream_started: MorselId(10)
855+
morsel_stream_batch_produced: MorselId(10), BatchId(42)
856+
morsel_stream_finished: MorselId(10)
834857
");
835858

836-
assert_eq!(
837-
observer.events(),
838-
MorselEventsBuilder::new()
839-
.morselize_file("file1.parquet")
840-
.planner_created(PlannerId(0))
841-
.planner_called(PlannerId(0))
842-
.morsel_produced(PlannerId(0), MorselId(10))
843-
.planner_called(PlannerId(0))
844-
.morsel_stream_started(MorselId(10))
845-
.morsel_stream_batch_produced(MorselId(10), BatchId(42))
846-
.morsel_stream_finished(MorselId(10))
847-
.build()
848-
);
849-
850859
Ok(())
851860
}
852861

@@ -868,31 +877,25 @@ mod tests {
868877
.with_observer(observer.clone());
869878

870879
insta::assert_snapshot!(test.run().await.unwrap(), @r"
880+
----- Output Stream -----
871881
Batch: 42
872882
Done
883+
----- File Stream Events -----
884+
morselize_file: file1.parquet
885+
planner_created: PlannerId(0)
886+
planner_called: PlannerId(0)
887+
io_future_created: PlannerId(0), IoFutureId(100)
888+
io_future_polled: PlannerId(0), IoFutureId(100)
889+
io_future_polled: PlannerId(0), IoFutureId(100)
890+
io_future_resolved: PlannerId(0), IoFutureId(100)
891+
planner_called: PlannerId(0)
892+
morsel_produced: PlannerId(0), MorselId(10)
893+
planner_called: PlannerId(0)
894+
morsel_stream_started: MorselId(10)
895+
morsel_stream_batch_produced: MorselId(10), BatchId(42)
896+
morsel_stream_finished: MorselId(10)
873897
");
874898

875-
assert_eq!(
876-
observer.events(),
877-
MorselEventsBuilder::new()
878-
.morselize_file("file1.parquet")
879-
.planner_created(PlannerId(0))
880-
.planner_called(PlannerId(0))
881-
.io_future_created(PlannerId(0), IoFutureId(100))
882-
// `with_io(IoFutureId(100), 1)` means the first poll is pending.
883-
.io_future_polled(PlannerId(0), IoFutureId(100))
884-
// The second poll resolves the future.
885-
.io_future_polled(PlannerId(0), IoFutureId(100))
886-
.io_future_resolved(PlannerId(0), IoFutureId(100))
887-
.planner_called(PlannerId(0))
888-
.morsel_produced(PlannerId(0), MorselId(10))
889-
.planner_called(PlannerId(0))
890-
.morsel_stream_started(MorselId(10))
891-
.morsel_stream_batch_produced(MorselId(10), BatchId(42))
892-
.morsel_stream_finished(MorselId(10))
893-
.build()
894-
);
895-
896899
Ok(())
897900
}
898901

@@ -916,26 +919,22 @@ mod tests {
916919
.with_observer(observer.clone());
917920

918921
insta::assert_snapshot!(test.run().await.unwrap(), @r"
922+
----- Output Stream -----
919923
Batch: 42
920924
Done
925+
----- File Stream Events -----
926+
morselize_file: file1.parquet
927+
planner_created: PlannerId(0)
928+
planner_called: PlannerId(0)
929+
planner_called: PlannerId(0)
930+
planner_called: PlannerId(0)
931+
morsel_produced: PlannerId(0), MorselId(10)
932+
planner_called: PlannerId(0)
933+
morsel_stream_started: MorselId(10)
934+
morsel_stream_batch_produced: MorselId(10), BatchId(42)
935+
morsel_stream_finished: MorselId(10)
921936
");
922937

923-
assert_eq!(
924-
observer.events(),
925-
MorselEventsBuilder::new()
926-
.morselize_file("file1.parquet")
927-
.planner_created(PlannerId(0))
928-
.planner_called(PlannerId(0))
929-
.planner_called(PlannerId(0))
930-
.planner_called(PlannerId(0))
931-
.morsel_produced(PlannerId(0), MorselId(10))
932-
.planner_called(PlannerId(0))
933-
.morsel_stream_started(MorselId(10))
934-
.morsel_stream_batch_produced(MorselId(10), BatchId(42))
935-
.morsel_stream_finished(MorselId(10))
936-
.build()
937-
);
938-
939938
Ok(())
940939
}
941940

@@ -966,32 +965,28 @@ mod tests {
966965
.with_observer(observer.clone());
967966

968967
insta::assert_snapshot!(test.run().await.unwrap(), @r"
968+
----- Output Stream -----
969969
Batch: 42
970970
Batch: 43
971971
Done
972+
----- File Stream Events -----
973+
morselize_file: file1.parquet
974+
planner_created: PlannerId(0)
975+
planner_called: PlannerId(0)
976+
morsel_produced: PlannerId(0), MorselId(10)
977+
planner_produced_child: PlannerId(0) -> PlannerId(1)
978+
planner_called: PlannerId(0)
979+
planner_called: PlannerId(1)
980+
morsel_produced: PlannerId(1), MorselId(11)
981+
planner_called: PlannerId(1)
982+
morsel_stream_started: MorselId(10)
983+
morsel_stream_batch_produced: MorselId(10), BatchId(42)
984+
morsel_stream_finished: MorselId(10)
985+
morsel_stream_started: MorselId(11)
986+
morsel_stream_batch_produced: MorselId(11), BatchId(43)
987+
morsel_stream_finished: MorselId(11)
972988
");
973989

974-
assert_eq!(
975-
observer.events(),
976-
MorselEventsBuilder::new()
977-
.morselize_file("file1.parquet")
978-
.planner_created(PlannerId(0))
979-
.planner_called(PlannerId(0))
980-
.morsel_produced(PlannerId(0), MorselId(10))
981-
.planner_produced_child(PlannerId(0), PlannerId(1))
982-
.planner_called(PlannerId(0))
983-
.planner_called(PlannerId(1))
984-
.morsel_produced(PlannerId(1), MorselId(11))
985-
.planner_called(PlannerId(1))
986-
.morsel_stream_started(MorselId(10))
987-
.morsel_stream_batch_produced(MorselId(10), BatchId(42))
988-
.morsel_stream_finished(MorselId(10))
989-
.morsel_stream_started(MorselId(11))
990-
.morsel_stream_batch_produced(MorselId(11), BatchId(43))
991-
.morsel_stream_finished(MorselId(11))
992-
.build()
993-
);
994-
995990
Ok(())
996991
}
997992

@@ -1028,47 +1023,45 @@ mod tests {
10281023
.with_file("file1.parquet", parent_planner)
10291024
.with_observer(observer.clone());
10301025

1026+
// Expect both futures to be polled, but second planner (42) batch to be
1027+
// produced first
10311028
insta::assert_snapshot!(test.run().await.unwrap(), @r"
1029+
----- Output Stream -----
10321030
Batch: 42
10331031
Batch: 41
10341032
Done
1033+
----- File Stream Events -----
1034+
morselize_file: file1.parquet
1035+
planner_created: PlannerId(0)
1036+
planner_called: PlannerId(0)
1037+
planner_produced_child: PlannerId(0) -> PlannerId(1)
1038+
planner_produced_child: PlannerId(0) -> PlannerId(2)
1039+
planner_called: PlannerId(0)
1040+
planner_called: PlannerId(1)
1041+
io_future_created: PlannerId(1), IoFutureId(100)
1042+
planner_called: PlannerId(2)
1043+
io_future_created: PlannerId(2), IoFutureId(101)
1044+
io_future_polled: PlannerId(1), IoFutureId(100)
1045+
io_future_polled: PlannerId(2), IoFutureId(101)
1046+
io_future_polled: PlannerId(1), IoFutureId(100)
1047+
io_future_polled: PlannerId(2), IoFutureId(101)
1048+
io_future_resolved: PlannerId(2), IoFutureId(101)
1049+
planner_called: PlannerId(2)
1050+
morsel_produced: PlannerId(2), MorselId(12)
1051+
planner_called: PlannerId(2)
1052+
io_future_polled: PlannerId(1), IoFutureId(100)
1053+
io_future_resolved: PlannerId(1), IoFutureId(100)
1054+
planner_called: PlannerId(1)
1055+
morsel_produced: PlannerId(1), MorselId(11)
1056+
planner_called: PlannerId(1)
1057+
morsel_stream_started: MorselId(12)
1058+
morsel_stream_batch_produced: MorselId(12), BatchId(42)
1059+
morsel_stream_finished: MorselId(12)
1060+
morsel_stream_started: MorselId(11)
1061+
morsel_stream_batch_produced: MorselId(11), BatchId(41)
1062+
morsel_stream_finished: MorselId(11)
10351063
");
10361064

1037-
assert_eq!(
1038-
observer.events(),
1039-
MorselEventsBuilder::new()
1040-
.morselize_file("file1.parquet")
1041-
.planner_created(PlannerId(0))
1042-
.planner_called(PlannerId(0))
1043-
.planner_produced_child(PlannerId(0), PlannerId(1))
1044-
.planner_produced_child(PlannerId(0), PlannerId(2))
1045-
.planner_called(PlannerId(0))
1046-
.planner_called(PlannerId(1))
1047-
.io_future_created(PlannerId(1), IoFutureId(100))
1048-
.planner_called(PlannerId(2))
1049-
.io_future_created(PlannerId(2), IoFutureId(101))
1050-
.io_future_polled(PlannerId(1), IoFutureId(100))
1051-
.io_future_polled(PlannerId(2), IoFutureId(101))
1052-
.io_future_polled(PlannerId(1), IoFutureId(100))
1053-
.io_future_polled(PlannerId(2), IoFutureId(101))
1054-
.io_future_resolved(PlannerId(2), IoFutureId(101))
1055-
.planner_called(PlannerId(2))
1056-
.morsel_produced(PlannerId(2), MorselId(12))
1057-
.planner_called(PlannerId(2))
1058-
.io_future_polled(PlannerId(1), IoFutureId(100))
1059-
.io_future_resolved(PlannerId(1), IoFutureId(100))
1060-
.planner_called(PlannerId(1))
1061-
.morsel_produced(PlannerId(1), MorselId(11))
1062-
.planner_called(PlannerId(1))
1063-
.morsel_stream_started(MorselId(12))
1064-
.morsel_stream_batch_produced(MorselId(12), BatchId(42))
1065-
.morsel_stream_finished(MorselId(12))
1066-
.morsel_stream_started(MorselId(11))
1067-
.morsel_stream_batch_produced(MorselId(11), BatchId(41))
1068-
.morsel_stream_finished(MorselId(11))
1069-
.build()
1070-
);
1071-
10721065
Ok(())
10731066
}
10741067

@@ -1100,44 +1093,39 @@ mod tests {
11001093
.with_observer(observer.clone());
11011094

11021095
insta::assert_snapshot!(test.run().await.unwrap(), @r"
1096+
----- Output Stream -----
11031097
Batch: 42
11041098
Batch: 43
11051099
Done
1100+
----- File Stream Events -----
1101+
morselize_file: file1.parquet
1102+
planner_created: PlannerId(0)
1103+
planner_called: PlannerId(0)
1104+
io_future_created: PlannerId(0), IoFutureId(100)
1105+
morselize_file: file2.parquet
1106+
planner_created: PlannerId(1)
1107+
planner_called: PlannerId(1)
1108+
io_future_created: PlannerId(1), IoFutureId(101)
1109+
io_future_polled: PlannerId(0), IoFutureId(100)
1110+
io_future_polled: PlannerId(1), IoFutureId(101)
1111+
io_future_polled: PlannerId(0), IoFutureId(100)
1112+
io_future_resolved: PlannerId(0), IoFutureId(100)
1113+
io_future_polled: PlannerId(1), IoFutureId(101)
1114+
io_future_resolved: PlannerId(1), IoFutureId(101)
1115+
planner_called: PlannerId(0)
1116+
morsel_produced: PlannerId(0), MorselId(10)
1117+
planner_called: PlannerId(0)
1118+
planner_called: PlannerId(1)
1119+
morsel_produced: PlannerId(1), MorselId(11)
1120+
planner_called: PlannerId(1)
1121+
morsel_stream_started: MorselId(10)
1122+
morsel_stream_batch_produced: MorselId(10), BatchId(42)
1123+
morsel_stream_finished: MorselId(10)
1124+
morsel_stream_started: MorselId(11)
1125+
morsel_stream_batch_produced: MorselId(11), BatchId(43)
1126+
morsel_stream_finished: MorselId(11)
11061127
");
11071128

1108-
assert_eq!(
1109-
observer.events(),
1110-
MorselEventsBuilder::new()
1111-
.morselize_file("file1.parquet")
1112-
.planner_created(PlannerId(0))
1113-
.planner_called(PlannerId(0))
1114-
.io_future_created(PlannerId(0), IoFutureId(100))
1115-
.morselize_file("file2.parquet")
1116-
.planner_created(PlannerId(1))
1117-
.planner_called(PlannerId(1))
1118-
// note IO from both planners occurs before CPU for planner 0
1119-
.io_future_created(PlannerId(1), IoFutureId(101))
1120-
.io_future_polled(PlannerId(0), IoFutureId(100))
1121-
.io_future_polled(PlannerId(1), IoFutureId(101))
1122-
.io_future_polled(PlannerId(0), IoFutureId(100))
1123-
.io_future_resolved(PlannerId(0), IoFutureId(100))
1124-
.io_future_polled(PlannerId(1), IoFutureId(101))
1125-
.io_future_resolved(PlannerId(1), IoFutureId(101))
1126-
.planner_called(PlannerId(0))
1127-
.morsel_produced(PlannerId(0), MorselId(10))
1128-
.planner_called(PlannerId(0))
1129-
.planner_called(PlannerId(1))
1130-
.morsel_produced(PlannerId(1), MorselId(11))
1131-
.planner_called(PlannerId(1))
1132-
.morsel_stream_started(MorselId(10))
1133-
.morsel_stream_batch_produced(MorselId(10), BatchId(42))
1134-
.morsel_stream_finished(MorselId(10))
1135-
.morsel_stream_started(MorselId(11))
1136-
.morsel_stream_batch_produced(MorselId(11), BatchId(43))
1137-
.morsel_stream_finished(MorselId(11))
1138-
.build()
1139-
);
1140-
11411129
Ok(())
11421130
}
11431131

0 commit comments

Comments
 (0)