@@ -104,48 +104,109 @@ describing how to do this.
104104When not to use a UDF
105105^^^^^^^^^^^^^^^^^^^^^
106106
107- A UDF is the right tool when the computation genuinely cannot be expressed
108- with built-in functions. It is often the *wrong * tool for a compound
109- predicate that happens to be easier to write in Python. The optimizer
110- cannot push a UDF through joins or filters, so a Python-side predicate
111- prevents otherwise obvious rewrites and forces a per-row Python callback.
107+ A UDF is the right tool when the per-row computation genuinely cannot be
108+ expressed with built-in functions. It is often the *wrong * tool for a
109+ predicate that happens to be easier to write in Python. A UDF is opaque
110+ to the optimizer, which means filters expressed as UDFs lose several
111+ rewrites that the engine applies to filters built from native
112+ expressions. The most visible of these is **Parquet predicate pushdown **:
113+ a native predicate can prune entire row groups using the min/max
114+ statistics in the Parquet footer, while a UDF predicate cannot.
115+
116+ The following example writes a small Parquet file, then filters it two
117+ ways: first with a native expression, then with a UDF that computes the
118+ same result. The filter itself is simple on purpose so we can compare
119+ the plans side by side.
112120
113- Consider a filter that selects rows falling into one of three brand-specific
114- buckets, each with its own containers, quantity range, and size range:
121+ .. ipython :: python
115122
116- .. code-block :: python
123+ import tempfile, os
124+ import pyarrow as pa
125+ import pyarrow.parquet as pq
126+ from datafusion import SessionContext, col, lit, udf
127+
128+ tmpdir = tempfile.mkdtemp()
129+ parquet_path = os.path.join(tmpdir, " items.parquet" )
130+ pq.write_table(
131+ pa.table({
132+ " id" : list (range (100 )),
133+ " brand" : [" A" , " B" , " C" , " D" ] * 25 ,
134+ " qty" : [i * 10 for i in range (100 )],
135+ }),
136+ parquet_path,
137+ )
138+
139+ ctx = SessionContext()
140+ items = ctx.read_parquet(parquet_path)
141+
142+ **Native-expression predicate. ** The filter is a plain boolean tree
143+ over column references and literals, so the optimizer can analyze it:
117144
118- # Anti-pattern: the predicate is a plain disjunction, but hidden inside a UDF.
119- def is_of_interest (brand , container , quantity , size ):
120- result = []
121- for b, c, q, s in zip (brand, container, quantity, size):
122- b = b.as_py()
123- if b == " Brand#12" :
124- result.append(c.as_py() in (" SM CASE" , " SM BOX" ) and 1 <= q.as_py() <= 11 and 1 <= s.as_py() <= 5 )
125- elif b == " Brand#23" :
126- result.append(c.as_py() in (" MED BAG" , " MED BOX" ) and 10 <= q.as_py() <= 20 and 1 <= s.as_py() <= 10 )
127- else :
128- result.append(False )
129- return pa.array(result)
130-
131- df = df.filter(udf_is_of_interest(col(" brand" ), col(" container" ), col(" quantity" ), col(" size" )))
132-
133- The native equivalent keeps the bucket definitions as plain Python data
134- (a dict) and builds an ``Expr `` from them. The optimizer sees a disjunction
135- of simple predicates it can analyze and push down:
145+ .. ipython :: python
146+
147+ native_filtered = items.filter(
148+ (col(" brand" ) == lit(" A" )) & (col(" qty" ) >= lit(150 ))
149+ )
150+ print (native_filtered.execution_plan().display_indent())
151+
152+ Notice the ``DataSourceExec `` line. It carries three annotations the
153+ optimizer computed from the predicate:
154+
155+ - ``predicate=brand@1 = A AND qty@2 >= 150 `` — the filter is pushed
156+ into the Parquet scan itself, so the scan only reads matching rows.
157+ - ``pruning_predicate=... brand_min@0 <= A AND A <= brand_max@1 ...
158+ qty_max@4 >= 150 `` — the scan prunes whole row groups by consulting
159+ the Parquet min/max statistics in the footer *before * reading any
160+ column data.
161+ - ``required_guarantees=[brand in (A)] `` — the scan uses this when a
162+ bloom filter or dictionary is available to skip pages.
163+
164+ **UDF predicate. ** Now wrap the same logic in a Python UDF:
165+
166+ .. ipython :: python
167+
168+ def brand_qty_filter (brand_arr : pa.Array, qty_arr : pa.Array) -> pa.Array:
169+ return pa.array([
170+ b.as_py() == " A" and q.as_py() >= 150
171+ for b, q in zip (brand_arr, qty_arr)
172+ ])
173+
174+ pred_udf = udf(
175+ brand_qty_filter, [pa.string(), pa.int64()], pa.bool_(), " stable" ,
176+ )
177+ udf_filtered = items.filter(pred_udf(col(" brand" ), col(" qty" )))
178+ print (udf_filtered.execution_plan().display_indent())
179+
180+ The ``DataSourceExec `` now carries only ``predicate=brand_qty_filter(...) ``.
181+ There is no ``pruning_predicate `` and no ``required_guarantees ``: the
182+ scan has to materialize every row group and hand each row to the
183+ Python callback just to decide whether to keep it.
184+
185+ At small scale the cost difference is invisible; on a Parquet file with
186+ many row groups, or data whose min/max statistics line up well with
187+ the predicate, the native form can skip most of the file. The UDF form
188+ reads all of it.
189+
190+ **Takeaway. ** Reach for a UDF when the per-row computation is genuinely
191+ not expressible as a tree of built-in functions (custom numerical work,
192+ external lookups, complex business rules). When it *is * expressible —
193+ even if the native form is a little more verbose — build the ``Expr ``
194+ tree directly so the optimizer can see through it. For disjunctive
195+ predicates the idiom is to produce one clause per bucket and combine
196+ them with ``| ``:
136197
137198.. code-block :: python
138199
139200 from functools import reduce
140201 from operator import or_
141202 from datafusion import col, lit, functions as f
142203
143- items_of_interest = {
204+ buckets = {
144205 " Brand#12" : {" containers" : [" SM CASE" , " SM BOX" ], " min_qty" : 1 , " max_size" : 5 },
145206 " Brand#23" : {" containers" : [" MED BAG" , " MED BOX" ], " min_qty" : 10 , " max_size" : 10 },
146207 }
147208
148- def brand_clause (brand , spec ):
209+ def bucket_clause (brand , spec ):
149210 return (
150211 (col(" brand" ) == lit(brand))
151212 & f.in_list(col(" container" ), [lit(c) for c in spec[" containers" ]])
@@ -155,13 +216,9 @@ of simple predicates it can analyze and push down:
155216 & (col(" size" ) <= lit(spec[" max_size" ]))
156217 )
157218
158- predicate = reduce (or_, (brand_clause (b, s) for b, s in items_of_interest .items()))
219+ predicate = reduce (or_, (bucket_clause (b, s) for b, s in buckets .items()))
159220 df = df.filter(predicate)
160221
161- Reach for a UDF when the per-row computation is not expressible as a tree
162- of built-in functions. When it *is * expressible, build the ``Expr `` tree
163- directly.
164-
165222 Aggregate Functions
166223-------------------
167224
0 commit comments