Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
//! select * from data limit 10;
//! ```

use std::sync::Arc;

use arrow::compute::concat_batches;
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::collect;
Expand Down Expand Up @@ -617,6 +619,91 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
}
}

/// Verifies that `struct_col IS NOT NULL` is pushed into the parquet scan
/// and produces correct results, including when the struct is non-null
/// but all leaf fields are null.
#[tokio::test]
async fn struct_is_not_null_pushdown() {
use arrow::array::{Int32Array, StructArray};
use arrow::datatypes::{DataType, Field, Fields, Schema};

let fields = Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("s", DataType::Struct(fields.clone()), true),
]));

// Row 0: s={a:1, b:10, c:100} → struct NOT null, leaves NOT null
// Row 1: s={a:NULL, b:NULL, c:NULL} → struct NOT null, ALL leaves null
// Row 2: s=NULL → struct IS null
// Row 3: s={a:3, b:NULL, c:300} → struct NOT null, mixed leaves
let ids = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let a = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(3)]));
let b = Arc::new(Int32Array::from(vec![Some(10), None, None, None]));
let c = Arc::new(Int32Array::from(vec![Some(100), None, None, Some(300)]));
let struct_array = StructArray::try_new(
fields,
vec![a, b, c],
Some(vec![true, true, false, true].into()),
)
.unwrap();
let batch =
RecordBatch::try_new(schema.clone(), vec![ids, Arc::new(struct_array)]).unwrap();

let tempdir = TempDir::new().unwrap();
let props = WriterProperties::builder().build();
let test_file = TestParquetFile::try_new(
tempdir.path().join("struct.parquet"),
props,
vec![batch],
)
.unwrap();

let scan_options = ParquetScanOptions {
pushdown_filters: true,
enable_page_index: false,
reorder_filters: false,
};
let ctx = SessionContext::new_with_config(scan_options.config());
let filter = col("s").is_not_null();
let exec = test_file.create_scan(&ctx, Some(filter)).await.unwrap();
let result = collect(exec.clone(), ctx.task_ctx()).await.unwrap();

// Verify correct rows: 1, 2, 4 (row 2 has non-null struct with null leaves)
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 3,
"Expected 3 rows (struct non-null), got {total_rows}"
);

let batch = concat_batches(&test_file.schema(), &result).unwrap();
let id_col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let ids: Vec<i32> = (0..id_col.len()).map(|i| id_col.value(i)).collect();
assert_eq!(
ids,
vec![1, 2, 4],
"Row 2 (null-leaves, non-null struct) must be included"
);

// Verify pushdown metrics: filter was evaluated at the row level
let metrics = TestParquetFile::parquet_metrics(&exec).expect("found metrics");
let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
assert_eq!(
pushdown_rows_matched, 3,
"Expected 3 rows matched by pushdown"
);
assert_eq!(pushdown_rows_pruned, 1, "Expected 1 row pruned by pushdown");
}

#[tokio::test]
async fn predicate_cache_default() -> datafusion_common::Result<()> {
let ctx = SessionContext::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,5 +349,224 @@ fn build_struct_batch(
)?)
}

criterion_group!(benches, parquet_struct_filter_pushdown);
// ---------------------------------------------------------------------------
// Benchmark: struct IS NOT NULL pushdown
//
// Uses a separate dataset with a NULLABLE struct containing many leaf fields.
// Compares scanning with vs without row-level pushdown for `s IS NOT NULL`.
//
// The key metric: with pushdown, only 1 leaf column is read for the null
// check (via definition levels); without pushdown, ALL leaf columns are
// decoded to materialize the struct and then check nullability post-scan.
// ---------------------------------------------------------------------------

const NULLABLE_STRUCT_COLUMN_NAME: &str = "s";
/// Number of leaf fields inside the nullable struct.
/// More fields = bigger difference between pushdown and no-pushdown.
const NUM_STRUCT_FIELDS: usize = 12;
/// Fraction of rows where the struct is null (~10%).
const NULL_FRACTION: usize = 10;

struct NullableBenchmarkDataset {
_tempdir: TempDir,
file_path: PathBuf,
}

impl NullableBenchmarkDataset {
fn path(&self) -> &Path {
&self.file_path
}
}

static NULLABLE_DATASET: LazyLock<NullableBenchmarkDataset> = LazyLock::new(|| {
create_nullable_dataset()
.expect("failed to prepare nullable struct benchmark dataset")
});

fn nullable_struct_schema() -> SchemaRef {
let struct_fields: Vec<Field> = (0..NUM_STRUCT_FIELDS)
.map(|i| Field::new(format!("f{i}"), DataType::Utf8, true))
.collect();
Arc::new(Schema::new(vec![
Field::new(ID_COLUMN_NAME, DataType::Int32, false),
Field::new(
NULLABLE_STRUCT_COLUMN_NAME,
DataType::Struct(Fields::from(struct_fields)),
true,
),
]))
}

fn create_nullable_dataset() -> datafusion_common::Result<NullableBenchmarkDataset> {
let tempdir = TempDir::new()?;
let file_path = tempdir.path().join("struct_nullable_filter.parquet");

let schema = nullable_struct_schema();
let writer_props = WriterProperties::builder()
.set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT))
.build();

let mut writer = ArrowWriter::try_new(
std::fs::File::create(&file_path)?,
Arc::clone(&schema),
Some(writer_props),
)?;

for rg_idx in 0..TOTAL_ROW_GROUPS {
let batch = build_nullable_struct_batch(&schema, rg_idx, ROW_GROUP_ROW_COUNT)?;
writer.write(&batch)?;
}

writer.close()?;
Ok(NullableBenchmarkDataset {
_tempdir: tempdir,
file_path,
})
}

fn build_nullable_struct_batch(
schema: &SchemaRef,
_rg_idx: usize,
len: usize,
) -> datafusion_common::Result<RecordBatch> {
use arrow::array::NullBufferBuilder;

let large_string: String = "x".repeat(LARGE_STRING_LEN);
let id_array = Arc::new(Int32Array::from_iter_values(0..len as i32));

// Build struct fields — each leaf is a large string column
let fields: Vec<(Arc<Field>, Arc<dyn arrow::array::Array>)> = (0..NUM_STRUCT_FIELDS)
.map(|i| {
let mut builder = StringBuilder::new();
for _ in 0..len {
builder.append_value(&large_string);
}
(
Arc::new(Field::new(format!("f{i}"), DataType::Utf8, true)),
Arc::new(builder.finish()) as Arc<dyn arrow::array::Array>,
)
})
.collect();

// ~10% of rows have null struct
let mut null_buffer = NullBufferBuilder::new(len);
for row in 0..len {
null_buffer.append(row % NULL_FRACTION != 0);
}
let struct_array = StructArray::try_new(
Fields::from(
fields
.iter()
.map(|(f, _)| Arc::clone(f))
.collect::<Vec<_>>(),
),
fields.into_iter().map(|(_, a)| a).collect(),
null_buffer.finish(),
)?;

Ok(RecordBatch::try_new(
Arc::clone(schema),
vec![id_array, Arc::new(struct_array)],
)?)
}

/// `s IS NOT NULL`
fn struct_is_not_null_expr() -> Expr {
col(NULLABLE_STRUCT_COLUMN_NAME).is_not_null()
}

/// `s IS NULL`
fn struct_is_null_expr() -> Expr {
col(NULLABLE_STRUCT_COLUMN_NAME).is_null()
}

fn expected_non_null_rows() -> usize {
// rows where row % NULL_FRACTION != 0
TOTAL_ROWS - TOTAL_ROWS / NULL_FRACTION
}

fn expected_null_rows() -> usize {
TOTAL_ROWS / NULL_FRACTION
}

fn parquet_struct_null_check_pushdown(c: &mut Criterion) {
let dataset_path = NULLABLE_DATASET.path().to_owned();
let mut group = c.benchmark_group("parquet_struct_null_check_pushdown");
group.throughput(Throughput::Elements(TOTAL_ROWS as u64));

// Scenario 1: SELECT * FROM t WHERE s IS NOT NULL — no pushdown
// Without pushdown, ALL 12 leaf columns of the struct are decoded
// to materialize the struct, then IS NOT NULL is checked post-scan.
group.bench_function("select_star/no_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
.expect("scan succeeded");
assert_eq!(matched, expected_non_null_rows());
});
});

// Scenario 2: SELECT * FROM t WHERE s IS NOT NULL — with pushdown
// With pushdown, only 1 leaf column is read for the null check.
// Remaining leaves are read only for matched rows.
group.bench_function("select_star/with_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, true, ProjectionMask::all())
.expect("scan succeeded");
assert_eq!(matched, expected_non_null_rows());
});
});

// Scenario 3: SELECT id FROM t WHERE s IS NOT NULL — no pushdown
// Without pushdown we must read all columns to materialize the struct
// for post-scan IS NOT NULL evaluation, so ProjectionMask::all() is
// correct here even though the query only needs `id` in the output.
group.bench_function("select_id/no_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
.expect("scan succeeded");
assert_eq!(matched, expected_non_null_rows());
});
});

// Scenario 4: SELECT id FROM t WHERE s IS NOT NULL — with pushdown
// Best case: pushdown reads 1 leaf for null check, output reads only `id`.
// The 12 struct leaves are never decoded at all.
group.bench_function("select_id/with_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
let id_only = id_projection(&dataset_path);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, true, id_only.clone())
.expect("scan succeeded");
assert_eq!(matched, expected_non_null_rows());
});
});

// Scenario 5: SELECT id FROM t WHERE s IS NULL — with pushdown
// Verify IS NULL pushdown works symmetrically with IS NOT NULL.
group.bench_function("select_id_is_null/with_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_is_null_expr(), &file_schema);
let id_only = id_projection(&dataset_path);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, true, id_only.clone())
.expect("scan succeeded");
assert_eq!(matched, expected_null_rows());
});
});

group.finish();
}

criterion_group!(
benches,
parquet_struct_filter_pushdown,
parquet_struct_null_check_pushdown
);
criterion_main!(benches);
Loading