Skip to content

Commit

Permalink
decouple transformation from odfvs
Browse files Browse the repository at this point in the history
Signed-off-by: tokoko <[email protected]>
  • Loading branch information
tokoko committed Feb 12, 2024
1 parent 4e450ad commit 5a5278d
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 50 deletions.
4 changes: 3 additions & 1 deletion protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ message OnDemandFeatureViewSpec {
// Map of sources for this feature view.
map<string, OnDemandSource> sources = 4;

UserDefinedFunction user_defined_function = 5;
oneof transformation {
UserDefinedFunction user_defined_function = 5;
}

// Description of the on demand feature view.
string description = 6;
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:

odfv_dict["spec"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.udf_string
] = on_demand_feature_view.transformation.udf_string
registry_dict["onDemandFeatureViews"].append(odfv_dict)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
Expand Down
69 changes: 34 additions & 35 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import functools
import warnings
from datetime import datetime
from types import FunctionType
from typing import Any, Dict, List, Optional, Type, Union

import dill
Expand All @@ -16,6 +15,7 @@
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand All @@ -24,9 +24,6 @@
OnDemandFeatureViewSpec,
OnDemandSource,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand All @@ -51,8 +48,7 @@ class OnDemandFeatureView(BaseFeatureView):
sources with type FeatureViewProjection.
source_request_sources: A map from input source names to the actual input
sources with type RequestSource.
udf: The user defined transformation function, which must take pandas dataframes
as inputs.
transformation: The user defined transformation.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the on demand feature view, typically the email of the primary
Expand All @@ -63,8 +59,7 @@ class OnDemandFeatureView(BaseFeatureView):
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
udf: FunctionType
udf_string: str
transformation: Union[OnDemandPandasTransformation]
description: str
tags: Dict[str, str]
owner: str
Expand All @@ -82,8 +77,7 @@ def __init__( # noqa: C901
FeatureViewProjection,
]
],
udf: FunctionType,
udf_string: str = "",
transformation: Union[OnDemandPandasTransformation],
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand All @@ -98,9 +92,7 @@ def __init__( # noqa: C901
sources: A map from input source names to the actual input sources, which may be
feature views, or request data sources. These sources serve as inputs to the udf,
which will refer to them by name.
udf: The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
transformation: The user defined transformation.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the on demand feature view, typically the email
Expand All @@ -126,8 +118,7 @@ def __init__( # noqa: C901
odfv_source.name
] = odfv_source.projection

self.udf = udf # type: ignore
self.udf_string = udf_string
self.transformation = transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
Expand All @@ -139,8 +130,7 @@ def __copy__(self):
schema=self.features,
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
udf=self.udf,
udf_string=self.udf_string,
transformation=self.transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -161,8 +151,7 @@ def __eq__(self, other):
self.source_feature_view_projections
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
or self.transformation != other.transformation
):
return False

Expand Down Expand Up @@ -200,11 +189,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
name=self.name,
features=[feature.to_proto() for feature in self.features],
sources=sources,
user_defined_function=UserDefinedFunctionProto(
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
),
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
else None,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down Expand Up @@ -243,6 +230,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
RequestSource.from_proto(on_demand_source.request_data_source)
)

if (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
== "user_defined_function"
):
transformation = OnDemandPandasTransformation.from_proto(
on_demand_feature_view_proto.spec.user_defined_function
)
else:
raise Exception("At least one transformation type needs to be provided")

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
schema=[
Expand All @@ -253,10 +250,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
for feature in on_demand_feature_view_proto.spec.features
],
sources=sources,
udf=dill.loads(
on_demand_feature_view_proto.spec.user_defined_function.body
),
udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text,
transformation=transformation,
description=on_demand_feature_view_proto.spec.description,
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
Expand Down Expand Up @@ -315,7 +309,8 @@ def get_transformed_features_df(
columns_to_cleanup.append(full_feature_ref)

# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

df_with_transformed_features = self.transformation.transform(df_with_features)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
Expand All @@ -335,7 +330,7 @@ def get_transformed_features_df(
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features.rename(columns=rename_columns)

def infer_features(self):
def infer_features(self) -> None:
"""
Infers the set of features associated to this feature view from the input source.
Expand Down Expand Up @@ -365,7 +360,7 @@ def infer_features(self):
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype)
output_df: pd.DataFrame = self.udf.__call__(df)
output_df: pd.DataFrame = self.transformation.transform(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
inferred_features.append(
Expand Down Expand Up @@ -396,7 +391,9 @@ def infer_features(self):
)

@staticmethod
def get_requested_odfvs(feature_refs, project, registry):
def get_requested_odfvs(
feature_refs, project, registry
) -> List["OnDemandFeatureView"]:
all_on_demand_feature_views = registry.list_on_demand_feature_views(
project, allow_cache=True
)
Expand Down Expand Up @@ -438,7 +435,7 @@ def on_demand_feature_view(
of the primary maintainer.
"""

def mainify(obj):
def mainify(obj) -> None:
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
# name as the original file defining the ODFV.
if obj.__module__ != "__main__":
Expand All @@ -447,15 +444,17 @@ def mainify(obj):
def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)

transformation = OnDemandPandasTransformation(user_function, udf_string)

on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
sources=sources,
schema=schema,
udf=user_function,
transformation=transformation,
description=description,
tags=tags,
owner=owner,
udf_string=udf_string,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
56 changes: 56 additions & 0 deletions sdk/python/feast/on_demand_pandas_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from types import FunctionType

import dill
import pandas as pd

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)


class OnDemandPandasTransformation:
def __init__(self, udf: FunctionType, udf_string: str = ""):
"""
Creates an OnDemandPandasTransformation object.
Args:
udf: The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
"""
self.udf = udf
self.udf_string = udf_string

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
return self.udf.__call__(df)

def __eq__(self, other):
if not isinstance(other, OnDemandPandasTransformation):
raise TypeError(
"Comparisons should only involve OnDemandPandasTransformation class objects."
)

if not super().__eq__(other):
return False

if (
self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
):
return False

return True

def to_proto(self) -> UserDefinedFunctionProto:
return UserDefinedFunctionProto(
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
)

@classmethod
def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto):
return OnDemandPandasTransformation(
udf=dill.loads(user_defined_function_proto.body),
udf_string=user_defined_function_proto.body_text,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
StreamFeatureView,
)
from feast.data_source import DataSource, RequestSource
from feast.on_demand_feature_view import OnDemandPandasTransformation
from feast.types import Array, FeastType, Float32, Float64, Int32, Int64
from tests.integration.feature_repos.universal.entities import (
customer,
Expand Down Expand Up @@ -69,8 +70,9 @@ def conv_rate_plus_100_feature_view(
name=conv_rate_plus_100.__name__,
schema=[] if infer_features else _features,
sources=sources,
udf=conv_rate_plus_100,
udf_string="raw udf source",
transformation=OnDemandPandasTransformation(
udf=conv_rate_plus_100, udf_string="raw udf source"
),
)


Expand Down Expand Up @@ -107,8 +109,9 @@ def similarity_feature_view(
name=similarity.__name__,
sources=sources,
schema=[] if infer_features else _fields,
udf=similarity,
udf_string="similarity raw udf",
transformation=OnDemandPandasTransformation(
udf=similarity, udf_string="similarity raw udf"
),
)


Expand Down
37 changes: 28 additions & 9 deletions sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from feast.feature_view import FeatureView
from feast.field import Field
from feast.infra.offline_stores.file_source import FileSource
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.on_demand_feature_view import (
OnDemandFeatureView,
OnDemandPandasTransformation,
)
from feast.types import Float32


Expand Down Expand Up @@ -54,8 +57,9 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf1,
udf_string="udf1 source code",
transformation=OnDemandPandasTransformation(
udf=udf1, udf_string="udf1 source code"
),
)
on_demand_feature_view_2 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -64,8 +68,9 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf1,
udf_string="udf1 source code",
transformation=OnDemandPandasTransformation(
udf=udf1, udf_string="udf1 source code"
),
)
on_demand_feature_view_3 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -74,8 +79,9 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf2,
udf_string="udf2 source code",
transformation=OnDemandPandasTransformation(
udf=udf2, udf_string="udf2 source code"
),
)
on_demand_feature_view_4 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -84,8 +90,21 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf2,
udf_string="udf2 source code",
transformation=OnDemandPandasTransformation(
udf=udf2, udf_string="udf2 source code"
),
description="test",
)
on_demand_feature_view_4 = OnDemandFeatureView(
name="my-on-demand-feature-view",
sources=sources,
schema=[
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
transformation=OnDemandPandasTransformation(
udf=udf2, udf_string="udf2 source code"
),
description="test",
)

Expand Down

0 comments on commit 5a5278d

Please sign in to comment.