@@ -25,11 +25,18 @@ mod data_utils;
2525use crate :: criterion:: Criterion ;
2626use arrow:: array:: { ArrayRef , RecordBatch } ;
2727use arrow:: datatypes:: { DataType , Field , Fields , Schema } ;
28+ use arrow_schema:: TimeUnit :: Nanosecond ;
2829use criterion:: Bencher ;
2930use datafusion:: datasource:: MemTable ;
3031use datafusion:: execution:: context:: SessionContext ;
32+ use datafusion:: prelude:: DataFrame ;
3133use datafusion_common:: ScalarValue ;
32- use datafusion_expr:: col;
34+ use datafusion_expr:: Expr :: Literal ;
35+ use datafusion_expr:: { cast, col, lit, not, try_cast, when} ;
36+ use datafusion_functions:: expr_fn:: {
37+ btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
38+ } ;
39+ use std:: ops:: Rem ;
3340use std:: path:: PathBuf ;
3441use std:: sync:: Arc ;
3542use test_utils:: tpcds:: tpcds_schemas;
@@ -58,6 +65,150 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
5865 } ) ) ;
5966}
6067
68+ /// Build a dataframe for testing logical plan optimization
69+ fn build_test_data_frame ( ctx : & SessionContext , rt : & Runtime ) -> DataFrame {
70+ register_string_table ( ctx, 100 , 1000 ) ;
71+
72+ rt. block_on ( async {
73+ let mut df = ctx. table ( "t" ) . await . unwrap ( ) ;
74+ // add some columns in
75+ for i in 100 ..150 {
76+ df = df
77+ . with_column ( & format ! ( "c{i}" ) , Literal ( ScalarValue :: Utf8 ( None ) , None ) )
78+ . unwrap ( ) ;
79+ }
80+ // add in some columns with string encoded timestamps
81+ for i in 150 ..175 {
82+ df = df
83+ . with_column (
84+ & format ! ( "c{i}" ) ,
85+ Literal ( ScalarValue :: Utf8 ( Some ( "2025-08-21 09:43:17" . into ( ) ) ) , None ) ,
86+ )
87+ . unwrap ( ) ;
88+ }
89+ // do a bunch of ops on the columns
90+ for i in 0 ..175 {
91+ // trim the columns
92+ df = df
93+ . with_column ( & format ! ( "c{i}" ) , btrim ( vec ! [ col( format!( "c{i}" ) ) ] ) )
94+ . unwrap ( ) ;
95+ }
96+
97+ for i in 0 ..175 {
98+ let c_name = format ! ( "c{i}" ) ;
99+ let c = col ( & c_name) ;
100+
101+ // random ops
102+ if i % 5 == 0 && i < 150 {
103+ // the actual ops here are largely unimportant as they are just a sample
104+ // of ops that could occur on a dataframe
105+ df = df
106+ . with_column ( & c_name, cast ( c. clone ( ) , DataType :: Utf8 ) )
107+ . unwrap ( )
108+ . with_column (
109+ & c_name,
110+ when (
111+ cast ( c. clone ( ) , DataType :: Int32 ) . gt ( lit ( 135 ) ) ,
112+ cast (
113+ cast ( c. clone ( ) , DataType :: Int32 ) - lit ( i + 3 ) ,
114+ DataType :: Utf8 ,
115+ ) ,
116+ )
117+ . otherwise ( c. clone ( ) )
118+ . unwrap ( ) ,
119+ )
120+ . unwrap ( )
121+ . with_column (
122+ & c_name,
123+ when (
124+ c. clone ( ) . is_not_null ( ) . and (
125+ cast ( c. clone ( ) , DataType :: Int32 )
126+ . between ( lit ( 120 ) , lit ( 130 ) ) ,
127+ ) ,
128+ Literal ( ScalarValue :: Utf8 ( None ) , None ) ,
129+ )
130+ . otherwise (
131+ when (
132+ c. clone ( ) . is_not_null ( ) . and ( regexp_like (
133+ cast ( c. clone ( ) , DataType :: Utf8View ) ,
134+ lit ( "[0-9]*" ) ,
135+ None ,
136+ ) ) ,
137+ upper ( c. clone ( ) ) ,
138+ )
139+ . otherwise ( c. clone ( ) )
140+ . unwrap ( ) ,
141+ )
142+ . unwrap ( ) ,
143+ )
144+ . unwrap ( )
145+ . with_column (
146+ & c_name,
147+ when (
148+ c. clone ( ) . is_not_null ( ) . and (
149+ cast ( c. clone ( ) , DataType :: Int32 )
150+ . between ( lit ( 90 ) , lit ( 100 ) ) ,
151+ ) ,
152+ cast ( c. clone ( ) , DataType :: Utf8View ) ,
153+ )
154+ . otherwise ( Literal ( ScalarValue :: Date32 ( None ) , None ) )
155+ . unwrap ( ) ,
156+ )
157+ . unwrap ( )
158+ . with_column (
159+ & c_name,
160+ when (
161+ c. clone ( ) . is_not_null ( ) . and (
162+ cast ( c. clone ( ) , DataType :: Int32 ) . rem ( lit ( 10 ) ) . gt ( lit ( 7 ) ) ,
163+ ) ,
164+ regexp_replace (
165+ cast ( c. clone ( ) , DataType :: Utf8View ) ,
166+ lit ( "1" ) ,
167+ lit ( "a" ) ,
168+ None ,
169+ ) ,
170+ )
171+ . otherwise ( Literal ( ScalarValue :: Date32 ( None ) , None ) )
172+ . unwrap ( ) ,
173+ )
174+ . unwrap ( )
175+ }
176+ if i >= 150 {
177+ df = df
178+ . with_column (
179+ & c_name,
180+ try_cast (
181+ to_timestamp ( vec ! [ c. clone( ) , lit( "%Y-%m-%d %H:%M:%S" ) ] ) ,
182+ DataType :: Timestamp ( Nanosecond , Some ( "UTC" . into ( ) ) ) ,
183+ ) ,
184+ )
185+ . unwrap ( )
186+ . with_column ( & c_name, try_cast ( c. clone ( ) , DataType :: Date32 ) )
187+ . unwrap ( )
188+ }
189+
190+ // add in a few unions
191+ if i % 30 == 0 {
192+ let df1 = df
193+ . clone ( )
194+ . filter ( length ( c. clone ( ) ) . gt ( lit ( 2 ) ) )
195+ . unwrap ( )
196+ . with_column ( & format ! ( "c{i}_filtered" ) , lit ( true ) )
197+ . unwrap ( ) ;
198+ let df2 = df
199+ . filter ( not ( length ( c. clone ( ) ) . gt ( lit ( 2 ) ) ) )
200+ . unwrap ( )
201+ . with_column ( & format ! ( "c{i}_filtered" ) , lit ( false ) )
202+ . unwrap ( ) ;
203+
204+ df = df1. union_by_name ( df2) . unwrap ( )
205+ }
206+ }
207+
208+ df
209+ } )
210+ }
211+
61212/// Create schema with the specified number of columns
62213fn create_schema ( column_prefix : & str , num_columns : usize ) -> Schema {
63214 let fields: Fields = ( 0 ..num_columns)
@@ -180,13 +331,40 @@ fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows
180331 ctx. register_table ( "t" , Arc :: new ( table) ) . unwrap ( ) ;
181332}
182333
334+ /// Registers a table like this:
335+ /// c0,c1,c2...,c99
336+ /// "0","100"..."9900"
337+ /// "0","200"..."19800"
338+ /// "0","300"..."29700"
339+ fn register_string_table ( ctx : & SessionContext , num_columns : usize , num_rows : usize ) {
340+ // ("c0", ["0", "0", ...])
341+ // ("c1": ["100", "200", ...])
342+ // etc
343+ let iter = ( 0 ..num_columns) . map ( |i| i as u64 ) . map ( |i| {
344+ let array: ArrayRef = Arc :: new ( arrow:: array:: StringViewArray :: from_iter_values (
345+ ( 0 ..num_rows)
346+ . map ( |j| format ! ( "c{}" , j as u64 * 100 + i) )
347+ . collect :: < Vec < _ > > ( ) ,
348+ ) ) ;
349+ ( format ! ( "c{i}" ) , array)
350+ } ) ;
351+ let batch = RecordBatch :: try_from_iter ( iter) . unwrap ( ) ;
352+ let schema = batch. schema ( ) ;
353+ let partitions = vec ! [ vec![ batch] ] ;
354+
355+ // create the table
356+ let table = MemTable :: try_new ( schema, partitions) . unwrap ( ) ;
357+
358+ ctx. register_table ( "t" , Arc :: new ( table) ) . unwrap ( ) ;
359+ }
360+
183361/// return a query like
184362/// ```sql
185- /// select c1, null as c2, ... null as cn from t ORDER BY c1
363+ /// select c1, 2 as c2, ... n as cn from t ORDER BY c1
186364/// UNION ALL
187- /// select null as c1, c2, ... null as cn from t ORDER BY c2
365+ /// select 1 as c1, c2, ... n as cn from t ORDER BY c2
188366/// ...
189- /// select null as c1, null as c2, ... cn from t ORDER BY cn
367+ /// select 1 as c1, 2 as c2, ... cn from t ORDER BY cn
190368/// ORDER BY c1, c2 ... CN
191369/// ```
192370fn union_orderby_query ( n : usize ) -> String {
@@ -200,7 +378,7 @@ fn union_orderby_query(n: usize) -> String {
200378 if i == j {
201379 format ! ( "c{j}" )
202380 } else {
203- format ! ( "null as c{j}" )
381+ format ! ( "{j} as c{j}" )
204382 }
205383 } )
206384 . collect :: < Vec < _ > > ( )
@@ -370,16 +548,37 @@ fn criterion_benchmark(c: &mut Criterion) {
370548 } ) ;
371549
372550 // -- Sorted Queries --
373- register_union_order_table ( & ctx, 100 , 1000 ) ;
374-
375- // this query has many expressions in its sort order so stresses
376- // order equivalence validation
377- c. bench_function ( "physical_sorted_union_orderby" , |b| {
378- // SELECT ... UNION ALL ...
379- let query = union_orderby_query ( 20 ) ;
380- b. iter ( || physical_plan ( & ctx, & rt, & query) )
551+ for column_count in [ 10 , 50 , 100 , 200 , 300 ] {
552+ register_union_order_table ( & ctx, column_count, 1000 ) ;
553+
554+ // this query has many expressions in its sort order so stresses
555+ // order equivalence validation
556+ c. bench_function (
557+ & format ! ( "physical_sorted_union_order_by_{column_count}" ) ,
558+ |b| {
559+ // SELECT ... UNION ALL ...
560+ let query = union_orderby_query ( column_count) ;
561+ b. iter ( || physical_plan ( & ctx, & rt, & query) )
562+ } ,
563+ ) ;
564+
565+ let _ = ctx. deregister_table ( "t" ) ;
566+ }
567+
568+ // -- validate logical plan optimize performance
569+ let df = build_test_data_frame ( & ctx, & rt) ;
570+
571+ c. bench_function ( "logical_plan_optimize" , |b| {
572+ b. iter ( || {
573+ let df_clone = df. clone ( ) ;
574+ criterion:: black_box (
575+ rt. block_on ( async { df_clone. into_optimized_plan ( ) . unwrap ( ) } ) ,
576+ ) ;
577+ } )
381578 } ) ;
382579
580+ let _ = ctx. deregister_table ( "t" ) ;
581+
383582 // --- TPC-H ---
384583
385584 let tpch_ctx = register_defs ( SessionContext :: new ( ) , tpch_schemas ( ) ) ;
0 commit comments