Skip to content

Commit ab6be02

Browse files
committed
move comparators initialization
1 parent f5bd681 commit ab6be02

11 files changed

Lines changed: 54 additions & 97 deletions

File tree

datafusion/core/tests/physical_optimizer/window_optimize.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ mod test {
5050
&[],
5151
Arc::new(frame),
5252
None,
53-
Arc::clone(&schema),
5453
);
5554

5655
let bounded_agg_exec = BoundedWindowAggExec::try_new(

datafusion/expr/src/window_frame.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,26 @@ impl WindowFrameBoundsComparators {
5757
/// - `N FOLLOWING` (asc) → right_delta = `current + N` (AddWrapping)
5858
/// - `N FOLLOWING` (desc) → right_delta = `current - N` (SubWrapping)
5959
/// - `CURRENT ROW` → no delta
60-
pub fn new(frame: &WindowFrame, columns: &[(DataType, SortOptions)]) -> Option<Self> {
60+
pub fn new(
61+
frame: &WindowFrame,
62+
columns: &[(DataType, SortOptions)],
63+
) -> Result<Option<Self>> {
6164
// Comparators are only needed for RANGE frames.
6265
if frame.units != WindowFrameUnits::Range {
63-
return None;
66+
return Ok(None);
6467
}
6568

6669
let sort_descending = columns.first().map(|(_, o)| o.descending).unwrap_or(false);
6770

6871
let start_bound_comparators =
69-
Self::build_bound_comparators(&frame.start_bound, columns, sort_descending)
70-
.ok()?;
72+
Self::build_bound_comparators(&frame.start_bound, columns, sort_descending)?;
7173
let end_bound_comparators =
72-
Self::build_bound_comparators(&frame.end_bound, columns, sort_descending)
73-
.ok()?;
74+
Self::build_bound_comparators(&frame.end_bound, columns, sort_descending)?;
7475

75-
Some(Self {
76+
Ok(Some(Self {
7677
start_bound_comparators,
7778
end_bound_comparators,
78-
})
79+
}))
7980
}
8081

8182
/// Builds per-column `ArrayComparator`s for a single bound.

datafusion/expr/src/window_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ mod tests {
689689
expected_results: Vec<Range<usize>>,
690690
) -> Result<()> {
691691
let (range_columns, _) = get_test_data();
692-
let frame_comparators = WindowFrameBoundsComparators::new(window_frame, &[]);
692+
let frame_comparators = WindowFrameBoundsComparators::new(window_frame, &[])?;
693693
let mut window_frame_context =
694694
WindowFrameContext::new(Arc::clone(window_frame), frame_comparators);
695695
let n_row = range_columns[0].len();

datafusion/physical-expr/src/window/aggregate.rs

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use arrow::array::BooleanArray;
3434
use arrow::datatypes::{FieldRef, SchemaRef};
3535
use arrow::record_batch::RecordBatch;
3636
use datafusion_common::{Result, ScalarValue, exec_datafusion_err};
37-
use datafusion_expr::window_frame::WindowFrameBoundsComparators;
3837
use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits};
3938
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
4039

@@ -49,8 +48,6 @@ pub struct PlainAggregateWindowExpr {
4948
window_frame: Arc<WindowFrame>,
5049
is_constant_in_partition: bool,
5150
filter: Option<Arc<dyn PhysicalExpr>>,
52-
schema: SchemaRef,
53-
frame_comparators: Option<WindowFrameBoundsComparators>,
5451
}
5552

5653
impl PlainAggregateWindowExpr {
@@ -61,26 +58,16 @@ impl PlainAggregateWindowExpr {
6158
order_by: &[PhysicalSortExpr],
6259
window_frame: Arc<WindowFrame>,
6360
filter: Option<Arc<dyn PhysicalExpr>>,
64-
schema: SchemaRef,
6561
) -> Self {
6662
let is_constant_in_partition =
6763
Self::is_window_constant_in_partition(order_by, &window_frame);
68-
let frame_comparators = {
69-
let cols: Option<Vec<_>> = order_by
70-
.iter()
71-
.map(|o| o.expr.data_type(&schema).ok().map(|dt| (dt, o.options)))
72-
.collect();
73-
cols.and_then(|c| WindowFrameBoundsComparators::new(&window_frame, &c))
74-
};
7564
Self {
7665
aggregate,
7766
partition_by: partition_by.to_vec(),
7867
order_by: order_by.to_vec(),
7968
window_frame,
8069
is_constant_in_partition,
8170
filter,
82-
schema,
83-
frame_comparators,
8471
}
8572
}
8673

@@ -159,10 +146,15 @@ impl WindowExpr for PlainAggregateWindowExpr {
159146

160147
fn evaluate_stateful(
161148
&self,
149+
input_schema: &SchemaRef,
162150
partition_batches: &PartitionBatches,
163151
window_agg_state: &mut PartitionWindowAggStates,
164152
) -> Result<()> {
165-
self.aggregate_evaluate_stateful(partition_batches, window_agg_state)?;
153+
self.aggregate_evaluate_stateful(
154+
input_schema,
155+
partition_batches,
156+
window_agg_state,
157+
)?;
166158

167159
// Update window frame range for each partition. As we know that
168160
// non-sliding aggregations will never call `retract_batch`, this value
@@ -208,7 +200,6 @@ impl WindowExpr for PlainAggregateWindowExpr {
208200
.collect::<Vec<_>>(),
209201
Arc::new(self.window_frame.reverse()),
210202
self.filter.clone(),
211-
Arc::clone(&self.schema),
212203
)) as _
213204
} else {
214205
Arc::new(SlidingAggregateWindowExpr::new(
@@ -221,7 +212,6 @@ impl WindowExpr for PlainAggregateWindowExpr {
221212
.collect::<Vec<_>>(),
222213
Arc::new(self.window_frame.reverse()),
223214
self.filter.clone(),
224-
Arc::clone(&self.schema),
225215
)) as _
226216
}
227217
})
@@ -234,10 +224,6 @@ impl WindowExpr for PlainAggregateWindowExpr {
234224
fn create_window_fn(&self) -> Result<WindowFn> {
235225
Ok(WindowFn::Aggregate(self.get_accumulator()?))
236226
}
237-
238-
fn frame_bounds_comparators(&self) -> Option<WindowFrameBoundsComparators> {
239-
self.frame_comparators.clone()
240-
}
241227
}
242228

243229
impl AggregateWindowExpr for PlainAggregateWindowExpr {

datafusion/physical-expr/src/window/sliding_aggregate.rs

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use arrow::datatypes::{FieldRef, SchemaRef};
3333
use arrow::record_batch::RecordBatch;
3434

3535
use datafusion_common::{Result, ScalarValue};
36-
use datafusion_expr::window_frame::WindowFrameBoundsComparators;
3736
use datafusion_expr::{Accumulator, WindowFrame};
3837

3938
/// A window expr that takes the form of an aggregate function that
@@ -47,8 +46,6 @@ pub struct SlidingAggregateWindowExpr {
4746
order_by: Vec<PhysicalSortExpr>,
4847
window_frame: Arc<WindowFrame>,
4948
filter: Option<Arc<dyn PhysicalExpr>>,
50-
schema: SchemaRef,
51-
frame_comparators: Option<WindowFrameBoundsComparators>,
5249
}
5350

5451
impl SlidingAggregateWindowExpr {
@@ -59,23 +56,13 @@ impl SlidingAggregateWindowExpr {
5956
order_by: &[PhysicalSortExpr],
6057
window_frame: Arc<WindowFrame>,
6158
filter: Option<Arc<dyn PhysicalExpr>>,
62-
schema: SchemaRef,
6359
) -> Self {
64-
let frame_comparators = {
65-
let cols: Option<Vec<_>> = order_by
66-
.iter()
67-
.map(|o| o.expr.data_type(&schema).ok().map(|dt| (dt, o.options)))
68-
.collect();
69-
cols.and_then(|c| WindowFrameBoundsComparators::new(&window_frame, &c))
70-
};
7160
Self {
7261
aggregate,
7362
partition_by: partition_by.to_vec(),
7463
order_by: order_by.to_vec(),
7564
window_frame,
7665
filter,
77-
schema,
78-
frame_comparators,
7966
}
8067
}
8168

@@ -114,10 +101,15 @@ impl WindowExpr for SlidingAggregateWindowExpr {
114101

115102
fn evaluate_stateful(
116103
&self,
104+
input_schema: &SchemaRef,
117105
partition_batches: &PartitionBatches,
118106
window_agg_state: &mut PartitionWindowAggStates,
119107
) -> Result<()> {
120-
self.aggregate_evaluate_stateful(partition_batches, window_agg_state)
108+
self.aggregate_evaluate_stateful(
109+
input_schema,
110+
partition_batches,
111+
window_agg_state,
112+
)
121113
}
122114

123115
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
@@ -144,7 +136,6 @@ impl WindowExpr for SlidingAggregateWindowExpr {
144136
&reversed_order_by,
145137
Arc::new(reverse_window_frame),
146138
self.filter.clone(),
147-
self.schema.clone(),
148139
)) as _
149140
} else {
150141
Arc::new(SlidingAggregateWindowExpr::new(
@@ -157,7 +148,6 @@ impl WindowExpr for SlidingAggregateWindowExpr {
157148
.collect::<Vec<_>>(),
158149
Arc::new(self.window_frame.reverse()),
159150
self.filter.clone(),
160-
self.schema.clone(),
161151
)) as _
162152
}
163153
})
@@ -193,17 +183,12 @@ impl WindowExpr for SlidingAggregateWindowExpr {
193183
&new_order_by,
194184
Arc::clone(&self.window_frame),
195185
self.filter.clone(),
196-
self.schema.clone(),
197186
)))
198187
}
199188

200189
fn create_window_fn(&self) -> Result<WindowFn> {
201190
Ok(WindowFn::Aggregate(self.get_accumulator()?))
202191
}
203-
204-
fn frame_bounds_comparators(&self) -> Option<WindowFrameBoundsComparators> {
205-
self.frame_comparators.clone()
206-
}
207192
}
208193

209194
impl AggregateWindowExpr for SlidingAggregateWindowExpr {

datafusion/physical-expr/src/window/standard.rs

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use arrow::record_batch::RecordBatch;
3232
use datafusion_common::utils::evaluate_partition_ranges;
3333
use datafusion_common::{Result, ScalarValue};
3434
use datafusion_expr::WindowFrame;
35-
use datafusion_expr::window_frame::WindowFrameBoundsComparators;
3635
use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
3736
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3837

@@ -43,8 +42,6 @@ pub struct StandardWindowExpr {
4342
partition_by: Vec<Arc<dyn PhysicalExpr>>,
4443
order_by: Vec<PhysicalSortExpr>,
4544
window_frame: Arc<WindowFrame>,
46-
schema: SchemaRef,
47-
frame_comparators: Option<WindowFrameBoundsComparators>,
4845
}
4946

5047
impl StandardWindowExpr {
@@ -54,22 +51,12 @@ impl StandardWindowExpr {
5451
partition_by: &[Arc<dyn PhysicalExpr>],
5552
order_by: &[PhysicalSortExpr],
5653
window_frame: Arc<WindowFrame>,
57-
schema: SchemaRef,
5854
) -> Self {
59-
let frame_comparators = {
60-
let cols: Option<Vec<_>> = order_by
61-
.iter()
62-
.map(|o| o.expr.data_type(&schema).ok().map(|dt| (dt, o.options)))
63-
.collect();
64-
cols.and_then(|c| WindowFrameBoundsComparators::new(&window_frame, &c))
65-
};
6655
Self {
6756
expr,
6857
partition_by: partition_by.to_vec(),
6958
order_by: order_by.to_vec(),
7059
window_frame,
71-
schema,
72-
frame_comparators,
7360
}
7461
}
7562

@@ -139,7 +126,7 @@ impl WindowExpr for StandardWindowExpr {
139126

140127
let mut window_frame_ctx = WindowFrameContext::new(
141128
Arc::clone(&self.window_frame),
142-
self.frame_comparators.clone(),
129+
self.build_frame_bounds_comparators(batch.schema_ref())?,
143130
);
144131
let mut last_range = Range { start: 0, end: 0 };
145132

@@ -170,6 +157,7 @@ impl WindowExpr for StandardWindowExpr {
170157
/// stateful, bounded-memory implementations.
171158
fn evaluate_stateful(
172159
&self,
160+
input_schema: &SchemaRef,
173161
partition_batches: &PartitionBatches,
174162
window_agg_state: &mut PartitionWindowAggStates,
175163
) -> Result<()> {
@@ -178,6 +166,10 @@ impl WindowExpr for StandardWindowExpr {
178166
// create a WindowAggState to clone when `window_agg_state` does not contain the respective
179167
// group, which is faster than potentially creating a new one at every iteration
180168
let new_state = WindowAggState::new(out_type)?;
169+
170+
let frame_bounds_comparators =
171+
self.build_frame_bounds_comparators(input_schema)?;
172+
181173
for (partition_row, partition_batch_state) in partition_batches.iter() {
182174
let window_state =
183175
if let Some(window_state) = window_agg_state.get_mut(partition_row) {
@@ -225,7 +217,7 @@ impl WindowExpr for StandardWindowExpr {
225217
.get_or_insert_with(|| {
226218
WindowFrameContext::new(
227219
Arc::clone(&self.window_frame),
228-
self.frame_comparators.clone(),
220+
frame_bounds_comparators.clone(),
229221
)
230222
})
231223
.calculate_range(
@@ -283,7 +275,6 @@ impl WindowExpr for StandardWindowExpr {
283275
.map(|e| e.reverse())
284276
.collect::<Vec<_>>(),
285277
Arc::new(self.window_frame.reverse()),
286-
Arc::clone(&self.schema),
287278
)) as _
288279
})
289280
}
@@ -301,10 +292,6 @@ impl WindowExpr for StandardWindowExpr {
301292
fn create_window_fn(&self) -> Result<WindowFn> {
302293
Ok(WindowFn::Builtin(self.expr.create_evaluator()?))
303294
}
304-
305-
fn frame_bounds_comparators(&self) -> Option<WindowFrameBoundsComparators> {
306-
self.frame_comparators.clone()
307-
}
308295
}
309296

310297
/// Adds a new ordering expression into existing ordering equivalence class(es) based on

datafusion/physical-expr/src/window/window_expr.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use arrow::array::{Array, ArrayRef, new_empty_array};
2727
use arrow::compute::SortOptions;
2828
use arrow::compute::filter as arrow_filter;
2929
use arrow::compute::kernels::sort::SortColumn;
30-
use arrow::datatypes::FieldRef;
30+
use arrow::datatypes::{FieldRef, SchemaRef};
3131
use arrow::record_batch::RecordBatch;
3232
use datafusion_common::cast::as_boolean_array;
3333

@@ -71,7 +71,17 @@ use indexmap::IndexMap;
7171
/// [`PlainAggregateWindowExpr`]: crate::window::PlainAggregateWindowExpr
7272
/// [`SlidingAggregateWindowExpr`]: crate::window::SlidingAggregateWindowExpr
7373
pub trait WindowExpr: Send + Sync + Debug {
74-
fn frame_bounds_comparators(&self) -> Option<WindowFrameBoundsComparators>;
74+
fn build_frame_bounds_comparators(
75+
&self,
76+
input_schema: &SchemaRef,
77+
) -> Result<Option<WindowFrameBoundsComparators>> {
78+
let cols: Result<Vec<_>> = self
79+
.order_by()
80+
.iter()
81+
.map(|o| o.expr.data_type(input_schema).map(|dt| (dt, o.options)))
82+
.collect();
83+
cols.and_then(|c| WindowFrameBoundsComparators::new(self.get_window_frame(), &c))
84+
}
7585

7686
/// Returns the window expression as [`Any`] so that it can be
7787
/// downcast to a specific implementation.
@@ -104,6 +114,7 @@ pub trait WindowExpr: Send + Sync + Debug {
104114
/// stateful, bounded-memory implementations.
105115
fn evaluate_stateful(
106116
&self,
117+
_input_schema: &SchemaRef,
107118
_partition_batches: &PartitionBatches,
108119
_window_agg_state: &mut PartitionWindowAggStates,
109120
) -> Result<()> {
@@ -211,7 +222,7 @@ pub trait AggregateWindowExpr: WindowExpr {
211222
let mut last_range = Range { start: 0, end: 0 };
212223
let mut window_frame_ctx = WindowFrameContext::new(
213224
Arc::clone(self.get_window_frame()),
214-
self.frame_bounds_comparators(),
225+
self.build_frame_bounds_comparators(batch.schema_ref())?,
215226
);
216227
self.get_result_column(
217228
&mut accumulator,
@@ -228,11 +239,16 @@ pub trait AggregateWindowExpr: WindowExpr {
228239
/// state so that it can work incrementally over multiple chunks.
229240
fn aggregate_evaluate_stateful(
230241
&self,
242+
input_schema: &SchemaRef,
231243
partition_batches: &PartitionBatches,
232244
window_agg_state: &mut PartitionWindowAggStates,
233245
) -> Result<()> {
234246
let field = self.field()?;
235247
let out_type = field.data_type();
248+
249+
let frame_bounds_comparators =
250+
self.build_frame_bounds_comparators(input_schema)?;
251+
236252
for (partition_row, partition_batch_state) in partition_batches.iter() {
237253
if !window_agg_state.contains_key(partition_row) {
238254
let accumulator = self.get_accumulator()?;
@@ -259,7 +275,7 @@ pub trait AggregateWindowExpr: WindowExpr {
259275
let window_frame_ctx = state.window_frame_ctx.get_or_insert_with(|| {
260276
WindowFrameContext::new(
261277
Arc::clone(self.get_window_frame()),
262-
self.frame_bounds_comparators(),
278+
frame_bounds_comparators.clone(),
263279
)
264280
});
265281
let out_col = self.get_result_column(

0 commit comments

Comments
 (0)