Skip to content

Commit a310eaf

Browse files
abhijeet-dhumalntkathole
authored andcommitted
fix(spark): BatchFeatureView with TransformationMode.PYTHON now reads all source columns
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
1 parent e1b1d2d commit a310eaf

2 files changed

Lines changed: 150 additions & 4 deletions

File tree

sdk/python/feast/infra/compute_engines/feature_builder.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,14 @@ def get_column_info(
158158
# we need to read ALL source columns, not just the output feature columns.
159159
# This is specifically for transformations that create new columns or need raw data.
160160
mode = getattr(getattr(view, "feature_transformation", None), "mode", None)
161-
if mode in ("ray", "pandas") or getattr(mode, "value", None) in (
161+
if mode in ("ray", "pandas", "python") or getattr(mode, "value", None) in (
162162
"ray",
163163
"pandas",
164+
"python",
164165
):
165-
# Signal to read all columns by passing empty list for feature_cols
166-
# The transformation will produce the output columns defined in the schema
166+
# Signal to read all columns by passing empty list for feature_cols.
167+
# "python" (BatchFeatureView) transformations need all raw source columns — the
168+
# UDF computes output features from raw input, not from pre-existing feature cols.
167169
feature_cols = []
168170

169171
return ColumnInfo(

sdk/python/tests/unit/infra/compute_engines/test_feature_builder.py

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from unittest.mock import MagicMock
1+
from unittest.mock import MagicMock, patch
2+
3+
import pytest
24

35
from feast.data_source import DataSource
46
from feast.infra.compute_engines.dag.context import ExecutionContext
@@ -7,6 +9,7 @@
79
from feast.infra.compute_engines.dag.plan import ExecutionPlan
810
from feast.infra.compute_engines.dag.value import DAGValue
911
from feast.infra.compute_engines.feature_builder import FeatureBuilder
12+
from feast.transformation.mode import TransformationMode
1013

1114
# ---------------------------
1215
# Minimal Mock DAGNode for testing
@@ -143,3 +146,144 @@ def test_recursive_featureview_build():
143146
- Source(hourly_driver_stats)"""
144147

145148
assert execution_plan.to_dag() == expected_output
149+
150+
151+
# ---------------------------------------------------------------------------
152+
# Helpers for get_column_info tests
153+
# ---------------------------------------------------------------------------
154+
155+
# Stable return value for _get_column_names: (join_keys, feature_cols, ts_col, created_ts_col)
156+
_MOCK_COLUMN_NAMES = (
157+
["user_id"],
158+
["user_avg_rating", "user_review_count"],
159+
"event_timestamp",
160+
None,
161+
)
162+
163+
164+
def _make_transformation(mode):
165+
"""Return a minimal transformation stub with the given mode."""
166+
t = MagicMock()
167+
t.mode = mode
168+
return t
169+
170+
171+
def _make_builder_for_column_info(transformation):
172+
"""
173+
Build a MockFeatureBuilder whose task.feature_view carries the given
174+
transformation. registry.get_entity is stubbed out per entity name.
175+
"""
176+
view = MagicMock()
177+
view.entities = ["user"]
178+
view.feature_transformation = transformation
179+
view.batch_source = MagicMock()
180+
view.batch_source.field_mapping = {}
181+
view.stream_source = None
182+
183+
task = MagicMock()
184+
task.project = "test_project"
185+
task.feature_view = view
186+
task.only_latest = False
187+
188+
registry = MagicMock()
189+
registry.get_entity.return_value = MagicMock(join_key="user_id")
190+
191+
builder = MockFeatureBuilder.__new__(MockFeatureBuilder)
192+
builder.registry = registry
193+
builder.task = task
194+
builder.nodes = []
195+
return builder, view
196+
197+
198+
# ---------------------------------------------------------------------------
199+
# Bug fix: TransformationMode.PYTHON must set feature_cols=[]
200+
#
201+
# Previously only "ray" and "pandas" were handled. "python" (the default mode
202+
# for @batch_feature_view) was missing, causing get_column_info to forward
203+
# the BFV *output* feature names (e.g. user_avg_rating) to the offline store
204+
# read step — columns that don't exist in raw source data — resulting in
205+
# UNRESOLVED_COLUMN errors at Spark analysis time.
206+
# ---------------------------------------------------------------------------
207+
208+
209+
@pytest.mark.parametrize(
210+
"mode",
211+
[
212+
TransformationMode.PYTHON,
213+
TransformationMode.PANDAS,
214+
TransformationMode.RAY,
215+
# String forms (getattr(mode, "value", None) path)
216+
"python",
217+
"pandas",
218+
"ray",
219+
],
220+
)
221+
def test_get_column_info_clears_feature_cols_for_udf_modes(mode):
222+
"""
223+
For transformation modes that compute output features from raw input
224+
(python, pandas, ray), get_column_info must set feature_cols=[] so the
225+
offline store read step issues SELECT * instead of projecting the output
226+
feature names that don't exist in the raw source schema.
227+
"""
228+
builder, view = _make_builder_for_column_info(_make_transformation(mode))
229+
230+
with patch(
231+
"feast.infra.compute_engines.feature_builder._get_column_names",
232+
return_value=_MOCK_COLUMN_NAMES,
233+
):
234+
col_info = builder.get_column_info(view)
235+
236+
assert col_info.feature_cols == [], (
237+
f"Expected feature_cols=[] for TransformationMode {mode!r}, "
238+
f"got {col_info.feature_cols!r}. "
239+
"The offline store read step must not project output feature names "
240+
"that don't exist in the raw source schema."
241+
)
242+
assert col_info.join_keys == ["user_id"]
243+
assert col_info.ts_col == "event_timestamp"
244+
245+
246+
@pytest.mark.parametrize(
247+
"mode",
248+
[
249+
TransformationMode.SPARK_SQL,
250+
TransformationMode.SQL,
251+
TransformationMode.SPARK,
252+
"spark_sql",
253+
"sql",
254+
],
255+
)
256+
def test_get_column_info_preserves_feature_cols_for_non_udf_modes(mode):
257+
"""
258+
SQL/Spark-SQL transformations operate on already-projected columns and
259+
should NOT get feature_cols cleared — the source read must still select
260+
the named feature columns explicitly.
261+
"""
262+
builder, view = _make_builder_for_column_info(_make_transformation(mode))
263+
264+
with patch(
265+
"feast.infra.compute_engines.feature_builder._get_column_names",
266+
return_value=_MOCK_COLUMN_NAMES,
267+
):
268+
col_info = builder.get_column_info(view)
269+
270+
assert col_info.feature_cols == ["user_avg_rating", "user_review_count"], (
271+
f"Expected feature_cols to be preserved for mode {mode!r}, "
272+
f"got {col_info.feature_cols!r}."
273+
)
274+
275+
276+
def test_get_column_info_preserves_feature_cols_with_no_transformation():
277+
"""
278+
A plain FeatureView (no transformation) must retain its feature column
279+
names so the offline store read step selects only the required columns.
280+
"""
281+
builder, view = _make_builder_for_column_info(None)
282+
283+
with patch(
284+
"feast.infra.compute_engines.feature_builder._get_column_names",
285+
return_value=_MOCK_COLUMN_NAMES,
286+
):
287+
col_info = builder.get_column_info(view)
288+
289+
assert col_info.feature_cols == ["user_avg_rating", "user_review_count"]

0 commit comments

Comments
 (0)