Skip to content

Commit e3b5355

Browse files
committed
feat: add ndjson tests with multiple file ranges and object store chunk sizes
1 parent f686059 commit e3b5355

1 file changed

Lines changed: 328 additions & 0 deletions

File tree

datafusion/datasource-json/src/source.rs

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,10 +667,13 @@ pub async fn plan_to_json(
667667
#[cfg(test)]
668668
mod tests {
669669
use super::*;
670+
use arrow::array::{Int64Array, StringArray};
671+
use arrow::compute;
670672
use arrow::datatypes::{DataType, Field, Schema};
671673
use bytes::Bytes;
672674
use datafusion_datasource::FileRange;
673675
use futures::TryStreamExt;
676+
use object_store::chunked::ChunkedStore;
674677
use object_store::memory::InMemory;
675678
use object_store::path::Path;
676679
use object_store::{ObjectStoreExt, PutPayload};
@@ -683,6 +686,24 @@ mod tests {
683686
]))
684687
}
685688

689+
fn get_chunked_stores() -> Vec<Arc<ChunkedStore>> {
690+
let object_store = Arc::new(InMemory::new());
691+
let mut chunked_stores = Vec::new();
692+
// use usize::max() as chunk_size to basically use the original store
693+
for chunk_size in [usize::MAX, 1, 2, 3, 4, 8, 13, 16] {
694+
chunked_stores.push(Arc::new(ChunkedStore::new(
695+
Arc::new(Box::new(object_store.clone()) as Box<dyn ObjectStore>),
696+
chunk_size,
697+
)));
698+
}
699+
700+
chunked_stores
701+
}
702+
703+
fn get_partition_splits() -> Vec<usize> {
704+
vec![1usize, 2, 3, 5, 7, 10]
705+
}
706+
686707
#[tokio::test]
687708
async fn test_json_array_from_file() -> Result<()> {
688709
// Test reading JSON array format from a file
@@ -922,6 +943,313 @@ mod tests {
922943
Ok(())
923944
}
924945

946+
/// Opens each byte-range partition of `path` in `store` and collects all
947+
/// record batches produced across every partition.
948+
async fn collect_partitioned_batches(
949+
store: Arc<dyn ObjectStore>,
950+
path: &Path,
951+
file_size: u64,
952+
num_partitions: usize,
953+
) -> Result<Vec<RecordBatch>> {
954+
let mut all_batches = Vec::new();
955+
for p in 0..num_partitions {
956+
let start = (p as u64 * file_size) / num_partitions as u64;
957+
let end = ((p as u64 + 1) * file_size) / num_partitions as u64;
958+
959+
let meta = store.head(path).await?;
960+
let mut file = PartitionedFile::new(path.to_string(), meta.size);
961+
file.range = Some(FileRange {
962+
start: start as i64,
963+
end: end as i64,
964+
});
965+
966+
let opener = JsonOpener::new(
967+
1024,
968+
test_schema(),
969+
FileCompressionType::UNCOMPRESSED,
970+
Arc::clone(&store),
971+
true, // NDJSON
972+
);
973+
974+
let stream = opener.open(file)?.await?;
975+
let batches: Vec<_> = stream.try_collect().await?;
976+
all_batches.extend(batches);
977+
}
978+
Ok(all_batches)
979+
}
980+
981+
/// Concatenates `batches` and returns a single batch sorted ascending by
982+
/// the first (id) column.
983+
fn concat_and_sort_by_id(batches: &[RecordBatch]) -> Result<RecordBatch> {
984+
let schema = test_schema();
985+
let combined = compute::concat_batches(&schema, batches)?;
986+
let indices = compute::sort_to_indices(combined.column(0), None, None)?;
987+
let sorted_cols: Vec<_> = combined
988+
.columns()
989+
.iter()
990+
.map(|col| compute::take(col.as_ref(), &indices, None))
991+
.collect::<std::result::Result<_, _>>()?;
992+
Ok(RecordBatch::try_new(schema, sorted_cols)?)
993+
}
994+
995+
#[tokio::test]
996+
async fn test_ndjson_partitioned() -> Result<()> {
997+
// Build an NDJSON file with a known number of rows.
998+
let num_rows: usize = 20;
999+
let mut ndjson = String::new();
1000+
for i in 0..num_rows {
1001+
ndjson.push_str(&format!("{{\"id\": {i}, \"name\": \"user{i}\"}}\n"));
1002+
}
1003+
let ndjson_bytes = Bytes::from(ndjson);
1004+
let file_size = ndjson_bytes.len() as u64;
1005+
let path = Path::from("test_partitioned.ndjson");
1006+
1007+
for store in get_chunked_stores() {
1008+
let store = Arc::clone(&store) as Arc<dyn ObjectStore>;
1009+
store
1010+
.put(&path, PutPayload::from(ndjson_bytes.clone()))
1011+
.await?;
1012+
1013+
for num_partitions in get_partition_splits() {
1014+
let batches = collect_partitioned_batches(
1015+
Arc::clone(&store),
1016+
&path,
1017+
file_size,
1018+
num_partitions,
1019+
)
1020+
.await?;
1021+
1022+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
1023+
assert_eq!(
1024+
total, num_rows,
1025+
"Expected {num_rows} rows with {num_partitions} partitions"
1026+
);
1027+
1028+
let result = concat_and_sort_by_id(&batches)?;
1029+
let ids = result
1030+
.column(0)
1031+
.as_any()
1032+
.downcast_ref::<Int64Array>()
1033+
.unwrap();
1034+
let names = result
1035+
.column(1)
1036+
.as_any()
1037+
.downcast_ref::<StringArray>()
1038+
.unwrap();
1039+
for i in 0..num_rows {
1040+
assert_eq!(
1041+
ids.value(i),
1042+
i as i64,
1043+
"id mismatch at row {i} with {num_partitions} partitions"
1044+
);
1045+
assert_eq!(
1046+
names.value(i),
1047+
format!("user{i}"),
1048+
"name mismatch at row {i} with {num_partitions} partitions"
1049+
);
1050+
}
1051+
}
1052+
}
1053+
1054+
Ok(())
1055+
}
1056+
1057+
#[tokio::test]
1058+
async fn test_ndjson_partitioned_uneven_lines() -> Result<()> {
1059+
// Lines of deliberately varying lengths so byte-range boundaries are
1060+
// more likely to land in the middle of a line.
1061+
let rows: &[(&str, &str)] = &[
1062+
("1", "alice"),
1063+
("2", "bob-with-a-longer-name"),
1064+
("3", "charlie"),
1065+
("4", "x"),
1066+
("5", "diana-has-an-even-longer-name-here"),
1067+
("6", "ed"),
1068+
("7", "francesca"),
1069+
("8", "g"),
1070+
("9", "hector-the-magnificent"),
1071+
("10", "isabella"),
1072+
];
1073+
let num_rows = rows.len();
1074+
1075+
let mut ndjson = String::new();
1076+
for (id, name) in rows {
1077+
ndjson.push_str(&format!("{{\"id\": {id}, \"name\": \"{name}\"}}\n"));
1078+
}
1079+
let ndjson_bytes = Bytes::from(ndjson);
1080+
let file_size = ndjson_bytes.len() as u64;
1081+
let path = Path::from("test_partitioned_uneven.ndjson");
1082+
1083+
for store in get_chunked_stores() {
1084+
let store = Arc::clone(&store) as Arc<dyn ObjectStore>;
1085+
store
1086+
.put(&path, PutPayload::from(ndjson_bytes.clone()))
1087+
.await?;
1088+
1089+
for num_partitions in get_partition_splits() {
1090+
let batches = collect_partitioned_batches(
1091+
Arc::clone(&store),
1092+
&path,
1093+
file_size,
1094+
num_partitions,
1095+
)
1096+
.await?;
1097+
1098+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
1099+
assert_eq!(
1100+
total, num_rows,
1101+
"Expected {num_rows} rows with {num_partitions} partitions"
1102+
);
1103+
1104+
let result = concat_and_sort_by_id(&batches)?;
1105+
let ids = result
1106+
.column(0)
1107+
.as_any()
1108+
.downcast_ref::<Int64Array>()
1109+
.unwrap();
1110+
let names = result
1111+
.column(1)
1112+
.as_any()
1113+
.downcast_ref::<StringArray>()
1114+
.unwrap();
1115+
for (i, (expected_id, expected_name)) in rows.iter().enumerate() {
1116+
assert_eq!(
1117+
ids.value(i),
1118+
expected_id.parse::<i64>().unwrap(),
1119+
"id mismatch at row {i} with {num_partitions} partitions"
1120+
);
1121+
assert_eq!(
1122+
names.value(i),
1123+
*expected_name,
1124+
"name mismatch at row {i} with {num_partitions} partitions"
1125+
);
1126+
}
1127+
}
1128+
}
1129+
1130+
Ok(())
1131+
}
1132+
1133+
#[tokio::test]
1134+
async fn test_ndjson_partitioned_single_entry() -> Result<()> {
1135+
// A single JSON object with no trailing newline. No matter how many
1136+
// byte-range partitions the file is split into, exactly one row must
1137+
// be produced in total.
1138+
let ndjson = r#"{"id": 1, "name": "alice"}"#;
1139+
let ndjson_bytes = Bytes::from(ndjson);
1140+
let file_size = ndjson_bytes.len() as u64;
1141+
let path = Path::from("test_single_entry.ndjson");
1142+
1143+
for store in get_chunked_stores() {
1144+
let store = Arc::clone(&store) as Arc<dyn ObjectStore>;
1145+
store
1146+
.put(&path, PutPayload::from(ndjson_bytes.clone()))
1147+
.await?;
1148+
1149+
for num_partitions in get_partition_splits() {
1150+
let batches = collect_partitioned_batches(
1151+
Arc::clone(&store),
1152+
&path,
1153+
file_size,
1154+
num_partitions,
1155+
)
1156+
.await?;
1157+
1158+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
1159+
assert_eq!(
1160+
total, 1,
1161+
"Expected exactly 1 row with {num_partitions} partitions"
1162+
);
1163+
1164+
let result = concat_and_sort_by_id(&batches)?;
1165+
let ids = result
1166+
.column(0)
1167+
.as_any()
1168+
.downcast_ref::<Int64Array>()
1169+
.unwrap();
1170+
let names = result
1171+
.column(1)
1172+
.as_any()
1173+
.downcast_ref::<StringArray>()
1174+
.unwrap();
1175+
assert_eq!(ids.value(0), 1);
1176+
assert_eq!(names.value(0), "alice");
1177+
}
1178+
}
1179+
1180+
Ok(())
1181+
}
1182+
1183+
#[tokio::test]
1184+
async fn test_ndjson_partitioned_overflow_stream() -> Result<()> {
1185+
// Force the overflow_stream code path by making row 1's line longer
1186+
// than END_SCAN_LOOKAHEAD. When a partition boundary lands inside
1187+
// that line, the initial bounded fetch covers
1188+
// [fetch_start, raw_end + END_SCAN_LOOKAHEAD)
1189+
// which does not reach the line's newline. overflow_stream then
1190+
// issues successive END_SCAN_LOOKAHEAD-sized GETs until the newline
1191+
// is found.
1192+
//
1193+
// With N=2 partitions:
1194+
// raw_end ≈ file_size / 2
1195+
// initial_fetch_end = raw_end + END_SCAN_LOOKAHEAD
1196+
// row-1 newline ≈ 2 * END_SCAN_LOOKAHEAD + overhead
1197+
// > initial_fetch_end ✓
1198+
let long_name = "x".repeat(2 * END_SCAN_LOOKAHEAD as usize + 1000);
1199+
let mut ndjson = String::new();
1200+
ndjson.push_str(&format!("{{\"id\": 1, \"name\": \"{long_name}\"}}\n"));
1201+
for i in 2..=5 {
1202+
ndjson.push_str(&format!("{{\"id\": {i}, \"name\": \"short{i}\"}}\n"));
1203+
}
1204+
let ndjson_bytes = Bytes::from(ndjson);
1205+
let file_size = ndjson_bytes.len() as u64;
1206+
let path = Path::from("test_overflow.ndjson");
1207+
let num_rows = 5usize;
1208+
1209+
for store in get_chunked_stores() {
1210+
let store = Arc::clone(&store) as Arc<dyn ObjectStore>;
1211+
store
1212+
.put(&path, PutPayload::from(ndjson_bytes.clone()))
1213+
.await?;
1214+
1215+
for num_partitions in get_partition_splits() {
1216+
let batches = collect_partitioned_batches(
1217+
Arc::clone(&store),
1218+
&path,
1219+
file_size,
1220+
num_partitions,
1221+
)
1222+
.await?;
1223+
1224+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
1225+
assert_eq!(
1226+
total, num_rows,
1227+
"Expected {num_rows} rows with {num_partitions} partitions"
1228+
);
1229+
1230+
let result = concat_and_sort_by_id(&batches)?;
1231+
let ids = result
1232+
.column(0)
1233+
.as_any()
1234+
.downcast_ref::<Int64Array>()
1235+
.unwrap();
1236+
let names = result
1237+
.column(1)
1238+
.as_any()
1239+
.downcast_ref::<StringArray>()
1240+
.unwrap();
1241+
assert_eq!(ids.value(0), 1);
1242+
assert_eq!(names.value(0), long_name);
1243+
for i in 1..num_rows {
1244+
assert_eq!(ids.value(i), (i + 1) as i64);
1245+
assert_eq!(names.value(i), format!("short{}", i + 1));
1246+
}
1247+
}
1248+
}
1249+
1250+
Ok(())
1251+
}
1252+
9251253
#[tokio::test]
9261254
async fn test_json_array_stream_cancellation() -> Result<()> {
9271255
// Test that cancellation works correctly (tasks are aborted when stream is dropped)

0 commit comments

Comments
 (0)