Skip to content

Commit 835cda8

Browse files
committed
fix: Fix issue with apply feature view
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
1 parent 8fc01a4 commit 835cda8

4 files changed

Lines changed: 67 additions & 37 deletions

File tree

sdk/python/feast/feature_view.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -520,14 +520,17 @@ def get_ttl_duration(self):
520520
return ttl_duration
521521

522522
@classmethod
523-
def from_proto(cls, feature_view_proto: FeatureViewProto) -> "FeatureView":
524-
return cls._from_proto_internal(feature_view_proto, seen={})
523+
def from_proto(
524+
cls, feature_view_proto: FeatureViewProto, skip_udf: bool = False
525+
) -> "FeatureView":
526+
return cls._from_proto_internal(feature_view_proto, seen={}, skip_udf=skip_udf)
525527

526528
@classmethod
527529
def _from_proto_internal(
528530
cls,
529531
feature_view_proto: FeatureViewProto,
530532
seen: Dict[str, Union[None, "FeatureView"]],
533+
skip_udf: bool = False,
531534
) -> "FeatureView":
532535
"""
533536
Creates a feature view from a protobuf representation of a feature view.
@@ -561,7 +564,7 @@ def _from_proto_internal(
561564
)
562565
source_views = [
563566
FeatureView._from_proto_internal(
564-
FeatureViewProto(spec=view_spec, meta=None), seen
567+
FeatureViewProto(spec=view_spec, meta=None), seen, skip_udf=skip_udf
565568
)
566569
for view_spec in feature_view_proto.spec.source_views
567570
]
@@ -581,22 +584,23 @@ def _from_proto_internal(
581584
)
582585
transformation = None
583586

584-
if feature_transformation_proto.HasField("user_defined_function"):
585-
udf_proto = feature_transformation_proto.user_defined_function
586-
if udf_proto.mode:
587-
try:
588-
transformation_class = get_transformation_class_from_type(
589-
udf_proto.mode
590-
)
591-
transformation = transformation_class.from_proto(udf_proto)
592-
except (ValueError, KeyError):
587+
if not skip_udf:
588+
if feature_transformation_proto.HasField("user_defined_function"):
589+
udf_proto = feature_transformation_proto.user_defined_function
590+
if udf_proto.mode:
591+
try:
592+
transformation_class = get_transformation_class_from_type(
593+
udf_proto.mode
594+
)
595+
transformation = transformation_class.from_proto(udf_proto)
596+
except (ValueError, KeyError):
597+
transformation = PythonTransformation.from_proto(udf_proto)
598+
else:
593599
transformation = PythonTransformation.from_proto(udf_proto)
594-
else:
595-
transformation = PythonTransformation.from_proto(udf_proto)
596-
elif feature_transformation_proto.HasField("substrait_transformation"):
597-
transformation = SubstraitTransformation.from_proto(
598-
feature_transformation_proto.substrait_transformation
599-
)
600+
elif feature_transformation_proto.HasField("substrait_transformation"):
601+
transformation = SubstraitTransformation.from_proto(
602+
feature_transformation_proto.substrait_transformation
603+
)
600604

601605
mode: Union[TransformationMode, str]
602606
if feature_view_proto.spec.mode:

sdk/python/feast/on_demand_feature_view.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class OnDemandFeatureView(BaseFeatureView):
140140
features: List[Field]
141141
source_feature_view_projections: dict[str, FeatureViewProjection]
142142
source_request_sources: dict[str, RequestSource]
143-
feature_transformation: Transformation
143+
feature_transformation: Optional[Transformation]
144144
mode: str
145145
description: str
146146
tags: dict[str, str]
@@ -259,9 +259,12 @@ def __init__( # noqa: C901
259259
features.append(field)
260260

261261
self.features = features
262-
self.feature_transformation = (
263-
feature_transformation or self.get_feature_transformation()
264-
)
262+
if feature_transformation is not None:
263+
self.feature_transformation = feature_transformation
264+
elif self.udf is not None:
265+
self.feature_transformation = self.get_feature_transformation()
266+
else:
267+
self.feature_transformation = None
265268
self.write_to_online_store = write_to_online_store
266269
self.singleton = singleton
267270
if self.singleton and self.mode != "python":
@@ -578,11 +581,13 @@ def from_proto(
578581
A OnDemandFeatureView object based on the on-demand feature view protobuf.
579582
"""
580583
# Parse sources from proto
581-
sources = cls._parse_sources_from_proto(on_demand_feature_view_proto)
584+
sources = cls._parse_sources_from_proto(
585+
on_demand_feature_view_proto, skip_udf=skip_udf
586+
)
582587

583-
# Parse transformation from proto
588+
# Parse transformation from proto (skip UDF deserialization if requested)
584589
transformation = cls._parse_transformation_from_proto(
585-
on_demand_feature_view_proto
590+
on_demand_feature_view_proto, skip_udf=skip_udf
586591
)
587592

588593
# Parse optional fields with defaults
@@ -643,7 +648,7 @@ def from_proto(
643648

644649
@classmethod
645650
def _parse_sources_from_proto(
646-
cls, proto: OnDemandFeatureViewProto
651+
cls, proto: OnDemandFeatureViewProto, skip_udf: bool = False
647652
) -> List[OnDemandSourceType]:
648653
"""Parse and convert sources from the protobuf representation."""
649654
sources: List[OnDemandSourceType] = []
@@ -652,7 +657,9 @@ def _parse_sources_from_proto(
652657

653658
if source_type == "feature_view":
654659
sources.append(
655-
FeatureView.from_proto(on_demand_source.feature_view).projection
660+
FeatureView.from_proto(
661+
on_demand_source.feature_view, skip_udf=skip_udf
662+
).projection
656663
)
657664
elif source_type == "feature_view_projection":
658665
sources.append(
@@ -673,9 +680,12 @@ def _parse_sources_from_proto(
673680

674681
@classmethod
675682
def _parse_transformation_from_proto(
676-
cls, proto: OnDemandFeatureViewProto
677-
) -> Transformation:
683+
cls, proto: OnDemandFeatureViewProto, skip_udf: bool = False
684+
) -> Optional[Transformation]:
678685
"""Parse and convert the transformation from the protobuf representation."""
686+
if skip_udf:
687+
return None
688+
679689
feature_transformation = proto.spec.feature_transformation
680690
transformation_type = feature_transformation.WhichOneof("transformation")
681691
mode = proto.spec.mode
@@ -898,6 +908,7 @@ def transform_arrow(
898908
pa_table, columns_to_cleanup = self._preprocess_arrow_table(pa_table)
899909

900910
# Apply the transformation
911+
assert self.feature_transformation is not None
901912
transformed_table = self.feature_transformation.transform_arrow(
902913
pa_table, self.features
903914
)
@@ -983,6 +994,7 @@ def transform_dict(
983994
)
984995

985996
# Apply the appropriate transformation based on mode
997+
assert self.feature_transformation is not None
986998
if self.singleton and self.mode == "python":
987999
output_dict = self.feature_transformation.transform_singleton(
9881000
preprocessed_dict
@@ -1024,6 +1036,7 @@ def _preprocess_feature_dict(
10241036
return preprocessed_dict, columns_to_cleanup
10251037

10261038
def infer_features(self) -> None:
1039+
assert self.feature_transformation is not None
10271040
random_input = self._construct_random_input(singleton=self.singleton)
10281041
inferred_features = self.feature_transformation.infer_features(
10291042
random_input=random_input, singleton=self.singleton

sdk/python/feast/registry_server.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -342,22 +342,35 @@ def ApplyFeatureView(
342342
self, request: RegistryServer_pb2.ApplyFeatureViewRequest, context
343343
):
344344
feature_view_type = request.WhichOneof("base_feature_view")
345+
345346
if feature_view_type == "feature_view":
346-
feature_view = FeatureView.from_proto(request.feature_view)
347+
feature_view_meta = FeatureView.from_proto(
348+
request.feature_view, skip_udf=True
349+
)
347350
elif feature_view_type == "on_demand_feature_view":
348-
feature_view = OnDemandFeatureView.from_proto(
349-
request.on_demand_feature_view
351+
feature_view_meta = OnDemandFeatureView.from_proto(
352+
request.on_demand_feature_view, skip_udf=True
350353
)
351354
elif feature_view_type == "stream_feature_view":
352-
feature_view = StreamFeatureView.from_proto(request.stream_feature_view)
355+
feature_view_meta = StreamFeatureView.from_proto(
356+
request.stream_feature_view, skip_udf=True
357+
)
353358

354359
assert_permissions_to_update(
355-
resource=feature_view,
356-
# Will replace with the new get_any_feature_view method later
360+
resource=feature_view_meta,
357361
getter=self.proxied_registry.get_feature_view,
358362
project=request.project,
359363
)
360364

365+
if feature_view_type == "feature_view":
366+
feature_view = FeatureView.from_proto(request.feature_view)
367+
elif feature_view_type == "on_demand_feature_view":
368+
feature_view = OnDemandFeatureView.from_proto(
369+
request.on_demand_feature_view
370+
)
371+
elif feature_view_type == "stream_feature_view":
372+
feature_view = StreamFeatureView.from_proto(request.stream_feature_view)
373+
361374
(
362375
self.proxied_registry.apply_feature_view(
363376
feature_view=feature_view,

sdk/python/feast/stream_feature_view.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ def to_proto(self):
320320
return StreamFeatureViewProto(spec=spec, meta=meta)
321321

322322
@classmethod
323-
def from_proto(cls, sfv_proto):
323+
def from_proto(cls, sfv_proto, skip_udf: bool = False):
324324
batch_source = (
325325
DataSource.from_proto(sfv_proto.spec.batch_source)
326326
if sfv_proto.spec.HasField("batch_source")
@@ -333,7 +333,7 @@ def from_proto(cls, sfv_proto):
333333
)
334334
udf = (
335335
dill.loads(sfv_proto.spec.user_defined_function.body)
336-
if sfv_proto.spec.HasField("user_defined_function")
336+
if sfv_proto.spec.HasField("user_defined_function") and not skip_udf
337337
else None
338338
)
339339
udf_string = (

0 commit comments

Comments
 (0)