Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 05497c6

Browse files
feat: Support mixed scalar-analytic expressions
1 parent 5663d2a commit 05497c6

6 files changed

Lines changed: 351 additions & 12 deletions

File tree

bigframes/core/agg_expressions.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import functools
2020
import itertools
2121
import typing
22-
from typing import Callable, Mapping, TypeVar
22+
from typing import Callable, Mapping, Tuple, TypeVar
2323

2424
from bigframes import dtypes
2525
from bigframes.core import expression, window_spec
@@ -63,6 +63,10 @@ def inputs(
6363
) -> typing.Tuple[expression.Expression, ...]:
6464
...
6565

66+
@property
67+
def children(self) -> Tuple[expression.Expression, ...]:
68+
return self.inputs
69+
6670
@property
6771
def free_variables(self) -> typing.Tuple[str, ...]:
6872
return tuple(
@@ -73,6 +77,10 @@ def free_variables(self) -> typing.Tuple[str, ...]:
7377
def is_const(self) -> bool:
7478
return all(child.is_const for child in self.inputs)
7579

80+
@functools.cached_property
81+
def is_scalar_expr(self) -> bool:
82+
return False
83+
7684
@abc.abstractmethod
7785
def replace_args(self: TExpression, *arg) -> TExpression:
7886
...
@@ -176,8 +184,13 @@ def output_type(self) -> dtypes.ExpressionType:
176184
def inputs(
177185
self,
178186
) -> typing.Tuple[expression.Expression, ...]:
187+
# TODO: Maybe make the window spec itself an expression?
179188
return (self.analytic_expr, *self.window.expressions)
180189

190+
@property
191+
def children(self) -> Tuple[expression.Expression, ...]:
192+
return self.inputs
193+
181194
@property
182195
def free_variables(self) -> typing.Tuple[str, ...]:
183196
return tuple(
@@ -188,6 +201,10 @@ def free_variables(self) -> typing.Tuple[str, ...]:
188201
def is_const(self) -> bool:
189202
return all(child.is_const for child in self.inputs)
190203

204+
@functools.cached_property
205+
def is_scalar_expr(self) -> bool:
206+
return False
207+
191208
def transform_children(
192209
self: WindowExpression,
193210
t: Callable[[expression.Expression], expression.Expression],

bigframes/core/array_value.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,24 @@
1616
from dataclasses import dataclass
1717
import datetime
1818
import functools
19+
import itertools
1920
import typing
2021
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
2122

2223
import google.cloud.bigquery
2324
import pandas
2425
import pyarrow as pa
2526

26-
from bigframes.core import agg_expressions, bq_data
27+
from bigframes.core import (
28+
agg_expressions,
29+
bq_data,
30+
expression_factoring,
31+
join_def,
32+
local_data,
33+
)
2734
import bigframes.core.expression as ex
2835
import bigframes.core.guid
2936
import bigframes.core.identifiers as ids
30-
import bigframes.core.join_def as join_def
31-
import bigframes.core.local_data as local_data
3237
import bigframes.core.nodes as nodes
3338
from bigframes.core.ordering import OrderingExpression
3439
import bigframes.core.ordering as orderings
@@ -261,6 +266,23 @@ def compute_values(self, assignments: Sequence[ex.Expression]):
261266
col_ids,
262267
)
263268

269+
def compute_general_expression(self, assignments: Sequence[ex.Expression]):
270+
named_exprs = [
271+
expression_factoring.NamedExpression(expr, ids.ColumnId.unique())
272+
for expr in assignments
273+
]
274+
# TODO: Push this to rewrite later to go from block expression to planning form
275+
# TODO: Jointly fragmentize expressions to more efficiently reuse common sub-expressions
276+
fragments = tuple(
277+
itertools.chain.from_iterable(
278+
expression_factoring.fragmentize_expression(expr)
279+
for expr in named_exprs
280+
)
281+
)
282+
target_ids = tuple(named_expr.name for named_expr in named_exprs)
283+
new_root = expression_factoring.push_into_tree(self.node, fragments, target_ids)
284+
return (ArrayValue(new_root), target_ids)
285+
264286
def project_to_id(self, expression: ex.Expression):
265287
array_val, ids = self.compute_values(
266288
[expression],

bigframes/core/block_transforms.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -399,15 +399,18 @@ def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block:
399399
window_spec = windows.unbound()
400400

401401
original_columns = block.value_columns
402-
block, shift_columns = block.multi_apply_window_op(
403-
original_columns, agg_ops.ShiftOp(periods), window_spec=window_spec
404-
)
405402
exprs = []
406-
for original_col, shifted_col in zip(original_columns, shift_columns):
407-
change_expr = ops.sub_op.as_expr(original_col, shifted_col)
408-
pct_change_expr = ops.div_op.as_expr(change_expr, shifted_col)
403+
for original_col in original_columns:
404+
shift_expr = agg_expressions.WindowExpression(
405+
agg_expressions.UnaryAggregation(
406+
agg_ops.ShiftOp(periods), ex.deref(original_col)
407+
),
408+
window_spec,
409+
)
410+
change_expr = ops.sub_op.as_expr(original_col, shift_expr)
411+
pct_change_expr = ops.div_op.as_expr(change_expr, shift_expr)
409412
exprs.append(pct_change_expr)
410-
return block.project_exprs(exprs, labels=column_labels, drop=True)
413+
return block.project_block_exprs(exprs, labels=column_labels, drop=True)
411414

412415

413416
def rank(

bigframes/core/blocks.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,26 @@ def project_exprs(
11541154
index_labels=self._index_labels,
11551155
)
11561156

1157+
# This is a new experimental version of the project_exprs that supports mixing analytic and scalar expressions
1158+
def project_block_exprs(
1159+
self,
1160+
exprs: Sequence[ex.Expression],
1161+
labels: Union[Sequence[Label], pd.Index],
1162+
drop=False,
1163+
) -> Block:
1164+
new_array, _ = self.expr.compute_general_expression(exprs)
1165+
if drop:
1166+
new_array = new_array.drop_columns(self.value_columns)
1167+
1168+
return Block(
1169+
new_array,
1170+
index_columns=self.index_columns,
1171+
column_labels=labels
1172+
if drop
1173+
else self.column_labels.append(pd.Index(labels)),
1174+
index_labels=self._index_labels,
1175+
)
1176+
11571177
def apply_window_op(
11581178
self,
11591179
column: str,

bigframes/core/expression.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
from __future__ import annotations
1616

1717
import abc
18+
import collections
1819
import dataclasses
1920
import functools
2021
import itertools
2122
import typing
22-
from typing import Callable, Generator, Mapping, TypeVar, Union
23+
from typing import Callable, Dict, Generator, Mapping, Tuple, TypeVar, Union
2324

2425
import pandas as pd
2526

@@ -43,6 +44,7 @@ def free_var(id: str) -> UnboundVariableExpression:
4344
return UnboundVariableExpression(id)
4445

4546

47+
T = TypeVar("T")
4648
TExpression = TypeVar("TExpression", bound="Expression")
4749

4850

@@ -136,6 +138,11 @@ def is_identity(self) -> bool:
136138
"""True for identity operation that does not transform input."""
137139
return False
138140

141+
@functools.cached_property
142+
def is_scalar_expr(self) -> bool:
143+
"""True if expression represents scalar value or expression over scalar values (no windows or aggregations)"""
144+
return all(expr.is_scalar_expr for expr in self.children)
145+
139146
@abc.abstractmethod
140147
def transform_children(self, t: Callable[[Expression], Expression]) -> Expression:
141148
...
@@ -150,6 +157,57 @@ def walk(self) -> Generator[Expression, None, None]:
150157
for child in self.children:
151158
yield from child.children
152159

160+
def unique_nodes(
161+
self: Expression,
162+
) -> Generator[Expression, None, None]:
163+
"""Walks the tree for unique nodes"""
164+
seen = set()
165+
stack: list[Expression] = [self]
166+
while stack:
167+
item = stack.pop()
168+
if item not in seen:
169+
yield item
170+
seen.add(item)
171+
stack.extend(item.children)
172+
173+
def iter_nodes_topo(
174+
self: Expression,
175+
) -> Generator[Expression, None, None]:
176+
"""Returns nodes in reverse topological order, using Kahn's algorithm."""
177+
child_to_parents: Dict[Expression, list[Expression]] = collections.defaultdict(
178+
list
179+
)
180+
out_degree: Dict[Expression, int] = collections.defaultdict(int)
181+
182+
queue: collections.deque["Expression"] = collections.deque()
183+
for node in list(self.unique_nodes()):
184+
num_children = len(node.children)
185+
out_degree[node] = num_children
186+
if num_children == 0:
187+
queue.append(node)
188+
for child in node.children:
189+
child_to_parents[child].append(node)
190+
191+
while queue:
192+
item = queue.popleft()
193+
yield item
194+
parents = child_to_parents.get(item, [])
195+
for parent in parents:
196+
out_degree[parent] -= 1
197+
if out_degree[parent] == 0:
198+
queue.append(parent)
199+
200+
def reduce_up(self, reduction: Callable[[Expression, Tuple[T, ...]], T]) -> T:
201+
"""Apply a bottom-up reduction to the tree."""
202+
results: dict[Expression, T] = {}
203+
for node in list(self.iter_nodes_topo()):
204+
# child nodes have already been transformed
205+
child_results = tuple(results[child] for child in node.children)
206+
result = reduction(node, child_results)
207+
results[node] = result
208+
209+
return results[self]
210+
153211

154212
@dataclasses.dataclass(frozen=True)
155213
class ScalarConstantExpression(Expression):

0 commit comments

Comments
 (0)