Skip to content

Commit

Permalink
on-deamnd tranformations working
Browse files Browse the repository at this point in the history
  • Loading branch information
manu-sj committed Jul 1, 2024
1 parent 6ebd9f4 commit c020210
Show file tree
Hide file tree
Showing 17 changed files with 589 additions and 141 deletions.
10 changes: 5 additions & 5 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@
feature_statistics = TransformationStatistics("feature")


@udf(float)
@udf(float, drop=["feature"])
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.min) / (
statistics.feature.max - statistics.feature.min
)


@udf(float)
@udf(float, drop=["feature"])
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.mean) / statistics.feature.stddev


@udf(float)
@udf(float, drop=["feature"])
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.percentiles[49]) / (
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
)


@udf(int)
@udf(int, drop=["feature"])
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = sorted(
[value for value in statistics.feature.extended_statistics["unique_values"]]
Expand All @@ -53,7 +53,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie
)


@udf(bool)
@udf(bool, drop=["feature"])
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = [
value for value in statistics.feature.extended_statistics["unique_values"]
Expand Down
16 changes: 13 additions & 3 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ def save(
feature_group_instance.feature_store_id,
"featuregroups",
]
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
headers = {"content-type": "application/json"}
feature_group_object = feature_group_instance.update_from_response_json(
_client._send_request(
"POST",
path_params,
headers=headers,
data=feature_group_instance.json(),
query_params=query_params,
),
)
return feature_group_object
Expand Down Expand Up @@ -93,7 +97,11 @@ def get(
"featuregroups",
name,
]
query_params = None if version is None else {"version": version}
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
if version is not None:
query_params["version"] = version

fg_objs = []
# In principle unique names are enforced across fg type and this should therefore
Expand Down Expand Up @@ -157,8 +165,10 @@ def get_by_id(
"featuregroups",
feature_group_id,
]

fg_json = _client._send_request("GET", path_params)
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
fg_json = _client._send_request("GET", path_params, query_params)
if (
fg_json["type"] == FeatureGroupApi.BACKEND_FG_STREAM
or fg_json["type"] == FeatureGroupApi.BACKEND_FG_BATCH
Expand Down
8 changes: 6 additions & 2 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def insert(
validation_options: dict = None,
):
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe, feature_group.time_travel_format
feature_dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down Expand Up @@ -281,7 +283,9 @@ def insert_stream(
)

dataframe_features = engine.get_instance().parse_schema_feature_group(
dataframe, feature_group.time_travel_format
dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down
24 changes: 1 addition & 23 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from typing import List, Optional, Union

from hsfs import client, feature_view, training_dataset, transformation_function
from hsfs import client, feature_view, training_dataset
from hsfs.client.exceptions import RestAPIError
from hsfs.constructor import query, serving_prepared_statement
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
Expand Down Expand Up @@ -206,28 +206,6 @@ def get_serving_prepared_statement(
self._client._send_request("GET", path, query_params, headers=headers)
)

def get_attached_transformation_fn(
self, name: str, version: int
) -> List["transformation_function.TransformationFunction"]:
"""
Get transformation functions attached to a feature view form the backend
# Arguments
name `str`: Name of feature view.
version `ìnt`: Version of feature view.
# Returns
`List[TransformationFunction]` : List of transformation functions attached to the feature view.
# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION]
return transformation_function.TransformationFunction.from_response_json(
self._client._send_request("GET", path)
)

def create_training_dataset(
self,
name: str,
Expand Down
23 changes: 0 additions & 23 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
feature_group,
feature_view,
training_dataset_feature,
transformation_function,
util,
)
from hsfs.client import exceptions
Expand Down Expand Up @@ -265,28 +264,6 @@ def get_batch_query_string(
return fs_query.pit_query
return fs_query.query

def get_attached_transformation_fn(
self, name: str, version: int
) -> List[transformation_function.TransformationFunction]:
"""
Get transformation functions attached to a feature view form the backend
# Arguments
name `str`: Name of feature view.
version `ìnt`: Version of feature view.
# Returns
`List[TransformationFunction]` : List of transformation functions attached to the feature view.
# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
transformation_functions = (
self._feature_view_api.get_attached_transformation_fn(name, version)
)
return transformation_functions

def create_training_dataset(
self,
feature_view_obj,
Expand Down
23 changes: 7 additions & 16 deletions python/hsfs/core/transformation_function_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,12 @@ def get_ready_to_use_transformation_fns(
feature_view: feature_view.FeatureView,
training_dataset_version: Optional[int] = None,
) -> List[transformation_function.TransformationFunction]:
# get attached transformation functions
transformation_functions = (
feature_view._feature_view_engine.get_attached_transformation_fn(
feature_view.name, feature_view.version
)
)

transformation_functions = (
[transformation_functions]
if not isinstance(transformation_functions, list)
else transformation_functions
)

# check if transformation functions require statistics
is_stat_required = any(
[tf.hopsworks_udf.statistics_required for tf in transformation_functions]
[
tf.hopsworks_udf.statistics_required
for tf in feature_view.transformation_functions
]
)
if not is_stat_required:
td_tffn_stats = None
Expand All @@ -188,11 +179,11 @@ def get_ready_to_use_transformation_fns(
)

if is_stat_required:
for transformation_function in transformation_functions:
for transformation_function in feature_view.transformation_functions:
transformation_function.hopsworks_udf.transformation_statistics = (
td_tffn_stats.feature_descriptive_statistics
)
return feature_view._sort_transformation_functions(transformation_functions)
return feature_view.transformation_functions

@staticmethod
def compute_and_set_feature_statistics(
Expand Down
Loading

0 comments on commit c020210

Please sign in to comment.