diff --git a/python/hsfs/builtin_transformations.py b/python/hsfs/builtin_transformations.py index 9e2daa0d24..ae24cd4274 100644 --- a/python/hsfs/builtin_transformations.py +++ b/python/hsfs/builtin_transformations.py @@ -23,26 +23,26 @@ feature_statistics = TransformationStatistics("feature") -@udf(float) +@udf(float, drop=["feature"]) def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series: return (feature - statistics.feature.min) / ( statistics.feature.max - statistics.feature.min ) -@udf(float) +@udf(float, drop=["feature"]) def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series: return (feature - statistics.feature.mean) / statistics.feature.stddev -@udf(float) +@udf(float, drop=["feature"]) def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series: return (feature - statistics.feature.percentiles[49]) / ( statistics.feature.percentiles[74] - statistics.feature.percentiles[24] ) -@udf(int) +@udf(int, drop=["feature"]) def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series: unique_data = sorted( [value for value in statistics.feature.extended_statistics["unique_values"]] @@ -53,7 +53,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie ) -@udf(bool) +@udf(bool, drop=["feature"]) def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series: unique_data = [ value for value in statistics.feature.extended_statistics["unique_values"] diff --git a/python/hsfs/core/feature_group_api.py b/python/hsfs/core/feature_group_api.py index 11fdbbbdc6..c6b0a1a70f 100644 --- a/python/hsfs/core/feature_group_api.py +++ b/python/hsfs/core/feature_group_api.py @@ -51,6 +51,9 @@ def save( feature_group_instance.feature_store_id, "featuregroups", ] + query_params = { + "expand": ["features", "expectationsuite", "transformationfunctions"] + } headers = {"content-type": "application/json"} feature_group_object = feature_group_instance.update_from_response_json( _client._send_request( @@ -58,6 +61,7 @@ def save( path_params, headers=headers, data=feature_group_instance.json(), + query_params=query_params, ), ) return feature_group_object @@ -93,7 +97,11 @@ def get( "featuregroups", name, ] - query_params = None if version is None else {"version": version} + query_params = { + "expand": ["features", "expectationsuite", "transformationfunctions"] + } + if version is not None: + query_params["version"] = version fg_objs = [] # In principle unique names are enforced across fg type and this should therefore @@ -157,8 +165,10 @@ def get_by_id( "featuregroups", feature_group_id, ] - - fg_json = _client._send_request("GET", path_params) + query_params = { + "expand": ["features", "expectationsuite", "transformationfunctions"] + } + fg_json = _client._send_request("GET", path_params, query_params) if ( fg_json["type"] == FeatureGroupApi.BACKEND_FG_STREAM or fg_json["type"] == FeatureGroupApi.BACKEND_FG_BATCH diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 3e88805eda..010810f6cc 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -88,7 +88,9 @@ def insert( validation_options: dict = None, ): dataframe_features = engine.get_instance().parse_schema_feature_group( - feature_dataframe, feature_group.time_travel_format + feature_dataframe, + feature_group.time_travel_format, + feature_group.transformation_functions, ) util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features @@ -281,7 +283,9 @@ def insert_stream( ) dataframe_features = engine.get_instance().parse_schema_feature_group( - dataframe, feature_group.time_travel_format + dataframe, + feature_group.time_travel_format, + feature_group.transformation_functions, ) util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features diff --git a/python/hsfs/core/feature_view_api.py b/python/hsfs/core/feature_view_api.py index 1bc6b46115..50355f3d5f 100644 --- a/python/hsfs/core/feature_view_api.py +++ b/python/hsfs/core/feature_view_api.py @@ -17,7 +17,7 @@ from typing import List, Optional, Union -from hsfs import client, feature_view, training_dataset, transformation_function +from hsfs import client, feature_view, training_dataset from hsfs.client.exceptions import RestAPIError from hsfs.constructor import query, serving_prepared_statement from hsfs.core import explicit_provenance, job, training_dataset_job_conf @@ -206,28 +206,6 @@ def get_serving_prepared_statement( self._client._send_request("GET", path, query_params, headers=headers) ) - def get_attached_transformation_fn( - self, name: str, version: int - ) -> List["transformation_function.TransformationFunction"]: - """ - Get transformation functions attached to a feature view form the backend - - # Arguments - name `str`: Name of feature view. - version `ìnt`: Version of feature view. - - # Returns - `List[TransformationFunction]` : List of transformation functions attached to the feature view. - - # Raises - `RestAPIError`: If the feature view cannot be found from the backend. - `ValueError`: If the feature group associated with the feature view cannot be found. - """ - path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION] - return transformation_function.TransformationFunction.from_response_json( - self._client._send_request("GET", path) - ) - def create_training_dataset( self, name: str, diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 070be9b821..f85529163f 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -25,7 +25,6 @@ feature_group, feature_view, training_dataset_feature, - transformation_function, util, ) from hsfs.client import exceptions @@ -265,28 +264,6 @@ def get_batch_query_string( return fs_query.pit_query return fs_query.query - def get_attached_transformation_fn( - self, name: str, version: int - ) -> List[transformation_function.TransformationFunction]: - """ - Get transformation functions attached to a feature view form the backend - - # Arguments - name `str`: Name of feature view. - version `ìnt`: Version of feature view. - - # Returns - `List[TransformationFunction]` : List of transformation functions attached to the feature view. - - # Raises - `RestAPIError`: If the feature view cannot be found from the backend. - `ValueError`: If the feature group associated with the feature view cannot be found. - """ - transformation_functions = ( - self._feature_view_api.get_attached_transformation_fn(name, version) - ) - return transformation_functions - def create_training_dataset( self, feature_view_obj, diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index ec5de0810b..6bdbff13c9 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -147,21 +147,12 @@ def get_ready_to_use_transformation_fns( feature_view: feature_view.FeatureView, training_dataset_version: Optional[int] = None, ) -> List[transformation_function.TransformationFunction]: - # get attached transformation functions - transformation_functions = ( - feature_view._feature_view_engine.get_attached_transformation_fn( - feature_view.name, feature_view.version - ) - ) - - transformation_functions = ( - [transformation_functions] - if not isinstance(transformation_functions, list) - else transformation_functions - ) - + # check if transformation functions require statistics is_stat_required = any( - [tf.hopsworks_udf.statistics_required for tf in transformation_functions] + [ + tf.hopsworks_udf.statistics_required + for tf in feature_view.transformation_functions + ] ) if not is_stat_required: td_tffn_stats = None @@ -188,11 +179,11 @@ def get_ready_to_use_transformation_fns( ) if is_stat_required: - for transformation_function in transformation_functions: + for transformation_function in feature_view.transformation_functions: transformation_function.hopsworks_udf.transformation_statistics = ( td_tffn_stats.feature_descriptive_statistics ) - return feature_view._sort_transformation_functions(transformation_functions) + return feature_view.transformation_functions @staticmethod def compute_and_set_feature_statistics( diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 9d39d81e09..403cbb2522 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -107,7 +107,10 @@ def __init__( self._transformation_function_engine = ( tf_engine_mod.TransformationFunctionEngine(feature_store_id) ) - self._transformation_functions: List[ + self._model_dependent_transformation_functions: List[ + transformation_function.TransformationFunction + ] = [] + self._on_demand_transformation_functions: List[ transformation_function.TransformationFunction ] = [] self._sql_client = None @@ -183,13 +186,23 @@ def init_batch_scoring( def init_transformation( self, - entity: Union[feature_view.FeatureView], + entity: feature_view.FeatureView, ): # attach transformation functions - self._transformation_functions = tf_engine_mod.TransformationFunctionEngine.get_ready_to_use_transformation_fns( + self._model_dependent_transformation_functions = tf_engine_mod.TransformationFunctionEngine.get_ready_to_use_transformation_fns( entity, self._training_dataset_version, ) + self._on_demand_transformation_functions = [ + feature.on_demand_transformation_function + for feature in entity.features + if feature.on_demand_transformation_function + ] + self._on_demand_feature_names = [ + feature.name + for feature in entity.features + if feature.on_demand_transformation_function + ] def setup_sql_client( self, @@ -242,6 +255,7 @@ def get_feature_vector( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, + request_parameters: Optional[Dict[str, Any]] = None, ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[Any], Dict[str, Any]]: """Assembles serving vector from online feature store.""" online_client_choice = self.which_client_and_ensure_initialised( @@ -273,8 +287,8 @@ def get_feature_vector( vector_db_result=vector_db_features or {}, allow_missing=allow_missing, client=online_client_choice, + request_parameters=request_parameters, ) - return self.handle_feature_vector_return_type( vector, batch=False, inference_helper=False, return_type=return_type ) @@ -287,6 +301,7 @@ def get_feature_vectors( ] = None, passed_features: Optional[List[Dict[str, Any]]] = None, vector_db_features: Optional[List[Dict[str, Any]]] = None, + request_parameters: Optional[List[Dict[str, Any]]] = None, allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, @@ -305,6 +320,12 @@ def get_feature_vectors( or len(vector_db_features) == 0 or len(vector_db_features) == len(entries) ), "Vector DB features should be None, empty or have the same length as the entries" + assert ( + request_parameters is None + or len(request_parameters) == 0 + or isinstance(request_parameters, dict) + or len(request_parameters) == len(entries) + ), "Request Parameters should be a Dictionary, None, empty or have the same length as the entries" online_client_choice = self.which_client_and_ensure_initialised( force_rest_client=force_rest_client, force_sql_client=force_sql_client @@ -347,14 +368,23 @@ def get_feature_vectors( skipped_empty_entries.pop(0) if len(skipped_empty_entries) > 0 else None ) vectors = [] + + # If request parameter is a dictionary then copy it to list with the same length as that of entires + request_parameters = ( + [request_parameters] * len(entries) + if isinstance(request_parameters, dict) + else request_parameters + ) for ( idx, passed_values, vector_db_result, + request_parameter, ) in itertools.zip_longest( range(len(entries)), passed_features or [], vector_db_features or [], + request_parameters or [], fillvalue=None, ): if next_skipped == idx: @@ -374,6 +404,7 @@ def get_feature_vectors( vector_db_result=vector_db_result, allow_missing=allow_missing, client=online_client_choice, + request_parameters=request_parameter, ) if vector is not None: @@ -390,6 +421,7 @@ def assemble_feature_vector( vector_db_result: Optional[Dict[str, Any]], allow_missing: bool, client: Literal["rest", "sql"], + request_parameters: Optional[Dict[str, Any]] = None, ) -> Optional[List[Any]]: """Assembles serving vector from online feature store.""" # Errors in batch requests are returned as None values @@ -404,9 +436,52 @@ def assemble_feature_vector( _logger.debug("Updating with passed features: %s", passed_values) result_dict.update(passed_values) - missing_features = set(self.feature_vector_col_name).difference( - result_dict.keys() + missing_features = ( + set(self.feature_vector_col_name) + .difference(result_dict.keys()) + .difference(self._on_demand_feature_names) ) + + # TODO : Optimize this + request_parameters = {} if not request_parameters else request_parameters + available_parameters = set((result_dict | request_parameters).keys()) + missing_request_parameters_features = {} + + for on_demand_feature, on_demand_transformation in zip( + self._on_demand_feature_names, self._on_demand_transformation_functions + ): + missing_request_parameter = ( + set(on_demand_transformation.hopsworks_udf.transformation_features) + - available_parameters + ) + if missing_request_parameter: + missing_request_parameters_features[on_demand_feature] = sorted( + list( + set( + on_demand_transformation.hopsworks_udf.transformation_features + ) + - available_parameters + ) + ) + + if missing_request_parameters_features: + error = "Missing Request parameters to compute the following the on-demand Features:\n" + for ( + feature, + missing_request_parameter, + ) in missing_request_parameters_features.items(): + missing_request_parameter = "', '".join(missing_request_parameter) + error += f"On-Demand Feature '{feature}' requires features '{missing_request_parameter}'\n" + error += ( + "Possible reasons: " + "1. There is no match in the given entry." + " Please check if the entry exists in the online feature store" + " or provide the feature as passed_feature. " + f"2. Required entries [{', '.join(self.required_serving_keys)}] or " + f"[{', '.join(set(sk.feature_name for sk in self._serving_keys))}] are not provided." + ) + raise exceptions.FeatureStoreException(error) + # for backward compatibility, before 3.4, if result is empty, # instead of throwing error, it skips the result # Maybe we drop this behaviour for 4.0 @@ -426,8 +501,11 @@ def assemble_feature_vector( if len(self.return_feature_value_handlers) > 0: self.apply_return_value_handlers(result_dict, client=client) - if len(self.transformation_functions) > 0: - self.apply_transformation(result_dict) + if ( + len(self.model_dependent_transformation_functions) > 0 + or len(self.on_demand_transformation_functions) > 0 + ): + self.apply_transformation(result_dict, request_parameters) _logger.debug("Assembled and transformed dict feature vector: %s", result_dict) @@ -473,17 +551,19 @@ def handle_feature_vector_return_type( return pd.DataFrame([feature_vectorz]) elif batch: return pd.DataFrame( - feature_vectorz, columns=self._feature_vector_col_name + feature_vectorz, columns=self.transformed_feature_vector_col_name ) else: pandas_df = pd.DataFrame(feature_vectorz).transpose() - pandas_df.columns = self._feature_vector_col_name + pandas_df.columns = self.transformed_feature_vector_col_name return pandas_df elif return_type.lower() == "polars": _logger.debug("Returning feature vector as polars dataframe") return pl.DataFrame( feature_vectorz if batch else [feature_vectorz], - schema=self._feature_vector_col_name if not inference_helper else None, + schema=self.transformed_feature_vector_col_name + if not inference_helper + else None, orient="row", ) else: @@ -630,9 +710,24 @@ def _set_default_client( self.default_client = self.DEFAULT_SQL_CLIENT self._init_sql_client = True - def apply_transformation(self, row_dict: dict): - _logger.debug("Applying transformation functions.") - for tf in self.transformation_functions: + def apply_transformation(self, row_dict: dict, request_parameter: Dict[str, Any]): + _logger.debug("Applying On-Demand transformation functions.") + for tf in self._on_demand_transformation_functions: + # Check if feature provided as request parameter if not get it from retrieved feature vector. + features = [ + pd.Series(request_parameter[feature]) + if feature in request_parameter.keys() + else pd.Series(row_dict[feature]) + for feature in tf.hopsworks_udf.transformation_features + ] + on_demand_feature = tf.hopsworks_udf.get_udf(force_python_udf=True)( + *features + ) # Get only python compatible UDF irrespective of engine + + row_dict[on_demand_feature.name] = on_demand_feature.values[0] + + _logger.debug("Applying Model-Dependent transformation functions.") + for tf in self.model_dependent_transformation_functions: features = [ pd.Series(row_dict[feature]) for feature in tf.hopsworks_udf.transformation_features @@ -995,10 +1090,16 @@ def per_serving_key_features(self) -> Dict[str, set[str]]: return self._per_serving_key_features @property - def transformation_functions( + def model_dependent_transformation_functions( + self, + ) -> Optional[List[transformation_function.TransformationFunction]]: + return self._model_dependent_transformation_functions + + @property + def on_demand_transformation_functions( self, - ) -> Optional[List[transformation_functions.TransformationFunction]]: - return self._transformation_functions + ) -> Optional[List[transformation_function.TransformationFunction]]: + return self._on_demand_transformation_functions @property def return_feature_value_handlers(self) -> Dict[str, Callable]: @@ -1070,7 +1171,9 @@ def transformed_feature_vector_col_name(self): if self._transformed_feature_vector_col_name is None: transformation_features = [] output_column_names = [] - for transformation_function in self._transformation_functions: + for ( + transformation_function + ) in self._model_dependent_transformation_functions: transformation_features += ( transformation_function.hopsworks_udf.transformation_features ) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index cc50428632..fea3dd0301 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -804,6 +804,9 @@ def parse_schema_feature_group( self, dataframe: Union[pd.DataFrame, pl.DataFrame], time_travel_format: Optional[str] = None, + transformation_functions: Optional[ + List[transformation_function.TransformationFunction] + ] = None, ) -> List[feature.Feature]: if isinstance(dataframe, pd.DataFrame): arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False) @@ -812,6 +815,19 @@ def parse_schema_feature_group( ): arrow_schema = dataframe.to_arrow().schema features = [] + transformed_features = [] + dropped_features = [] + + if transformation_functions: + for tf in transformation_functions: + transformed_features.append( + feature.Feature( + tf.hopsworks_udf.output_column_names[0], + tf.hopsworks_udf.return_types[0], + on_demand=True, + ) + ) + dropped_features.extend(tf.hopsworks_udf.dropped_features) for feat_name in arrow_schema.names: name = util.autofix_feature_name(feat_name) try: @@ -820,8 +836,10 @@ def parse_schema_feature_group( ) except ValueError as e: raise FeatureStoreException(f"Feature '{name}': {str(e)}") from e - features.append(feature.Feature(name, converted_type)) - return features + if name not in dropped_features: + features.append(feature.Feature(name, converted_type)) + + return features + transformed_features def parse_schema_training_dataset( self, dataframe: Union[pd.DataFrame, pl.DataFrame] @@ -842,6 +860,11 @@ def save_dataframe( online_write_options: Dict[str, Any], validation_id: Optional[int] = None, ) -> Optional[job.Job]: + if feature_group.transformation_functions: + dataframe = self._apply_transformation_function( + feature_group.transformation_functions, dataframe + ) + if ( isinstance(feature_group, ExternalFeatureGroup) and feature_group.online_enabled @@ -1319,7 +1342,7 @@ def _apply_transformation_function( # Raises `FeatureStoreException`: If any of the features mentioned in the transformation function is not present in the Feature View. """ - transformed_features = set() + dropped_features = set() if isinstance(dataset, pl.DataFrame) or isinstance( dataset, pl.dataframe.frame.DataFrame @@ -1342,7 +1365,7 @@ def _apply_transformation_function( f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly." ) - transformed_features.update(tf.hopsworks_udf.transformation_features) + dropped_features.update(tf.hopsworks_udf.dropped_features) dataset = pd.concat( [ dataset, @@ -1357,7 +1380,7 @@ def _apply_transformation_function( ], axis=1, ) - dataset = dataset.drop(transformed_features, axis=1) + dataset = dataset.drop(dropped_features, axis=1) return dataset @staticmethod @@ -1536,8 +1559,11 @@ def acked(err: Exception, msg: Any) -> None: elif not isinstance( feature_group, ExternalFeatureGroup ) and self._start_offline_materialization(offline_write_options): - if (not offline_write_options.get("skip_offsets", False) - and self._job_api.last_execution(feature_group.materialization_job)): # always skip offsets if executing job for the first time + if not offline_write_options.get( + "skip_offsets", False + ) and self._job_api.last_execution( + feature_group.materialization_job + ): # always skip offsets if executing job for the first time # don't provide the current offsets (read from where the job last left off) initial_check_point = "" # provide the initial_check_point as it will reduce the read amplification of materialization job diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index a22be38cc0..60f5f14854 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -351,6 +351,10 @@ def save_dataframe( validation_id=None, ): try: + if feature_group.transformation_functions: + dataframe = self._apply_transformation_function( + feature_group.transformation_functions, dataframe + ) if ( isinstance(feature_group, fg_mod.ExternalFeatureGroup) and feature_group.online_enabled @@ -395,6 +399,11 @@ def save_stream_dataframe( checkpoint_dir, write_options, ): + if feature_group.transformation_functions: + dataframe = self._apply_transformation_function( + feature_group.transformation_functions, dataframe + ) + write_options = self._get_kafka_config( feature_group.feature_store_id, write_options ) @@ -1115,8 +1124,29 @@ def read_options(self, data_format, provided_options): options.update(provided_options) return options - def parse_schema_feature_group(self, dataframe, time_travel_format=None): + def parse_schema_feature_group( + self, + dataframe, + time_travel_format=None, + transformation_functions: Optional[ + List[transformation_function.TransformationFunction] + ] = None, + ): features = [] + transformed_features = [] + dropped_features = [] + + if transformation_functions: + for tf in transformation_functions: + transformed_features.append( + feature.Feature( + tf.hopsworks_udf.output_column_names[0], + tf.hopsworks_udf.return_types[0], + on_demand=True, + ) + ) + dropped_features.extend(tf.hopsworks_udf.dropped_features) + using_hudi = time_travel_format == "HUDI" for feat in dataframe.schema: name = util.autofix_feature_name(feat.name) @@ -1126,12 +1156,13 @@ def parse_schema_feature_group(self, dataframe, time_travel_format=None): ) except ValueError as e: raise FeatureStoreException(f"Feature '{feat.name}': {str(e)}") from e - features.append( - feature.Feature( - name, converted_type, feat.metadata.get("description", None) + if name not in dropped_features: + features.append( + feature.Feature( + name, converted_type, feat.metadata.get("description", None) + ) ) - ) - return features + return features + transformed_features def parse_schema_training_dataset(self, dataframe): return [ @@ -1244,7 +1275,7 @@ def _apply_transformation_function( # Raises `FeatureStoreException`: If any of the features mentioned in the transformation function is not present in the Feature View. """ - transformed_features = set() + dropped_features = set() transformations = [] transformation_features = [] output_col_names = [] @@ -1260,7 +1291,7 @@ def _apply_transformation_function( f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly." ) - transformed_features.update(tf.hopsworks_udf.transformation_features) + dropped_features.update(tf.hopsworks_udf.dropped_features) pandas_udf = hopsworks_udf.get_udf() output_col_name = hopsworks_udf.output_column_names[0] @@ -1276,7 +1307,7 @@ def _apply_transformation_function( untransformed_columns = [] # Untransformed column maintained as a list since order is imported while selecting features. for column in dataset.columns: - if column not in transformed_features: + if column not in dropped_features: untransformed_columns.append(column) # Applying transformations transformed_dataset = dataset.select( diff --git a/python/hsfs/feature.py b/python/hsfs/feature.py index 89f19b060d..412929a75e 100644 --- a/python/hsfs/feature.py +++ b/python/hsfs/feature.py @@ -53,6 +53,7 @@ def __init__( "hsfs.feature_group.SpineGroup", ] ] = None, + on_demand: bool = False, **kwargs, ) -> None: self._name = util.autofix_feature_name(name) @@ -67,6 +68,7 @@ def __init__( self._feature_group_id = feature_group.id else: self._feature_group_id = feature_group_id + self._on_demand = on_demand def to_dict(self) -> Dict[str, Any]: """Get structured info about specific Feature in python dictionary format. @@ -93,6 +95,7 @@ def to_dict(self) -> Dict[str, Any]: "onlineType": self._online_type, "defaultValue": self._default_value, "featureGroupId": self._feature_group_id, + "onDemand": self.on_demand, } def json(self) -> str: @@ -206,6 +209,15 @@ def default_value(self, default_value: Optional[str]) -> None: def feature_group_id(self) -> Optional[int]: return self._feature_group_id + @property + def on_demand(self) -> bool: + """Whether the feature is a on-demand feature computed using on-demand transformation functions""" + return self._on_demand + + @on_demand.setter + def on_demand(self, on_demand) -> None: + self._on_demand = on_demand + def __lt__(self, other: Any) -> "filter.Filter": return filter.Filter(self, filter.Filter.LT, other) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index de5577417c..0bbeb26552 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -73,8 +73,10 @@ from hsfs.embedding import EmbeddingIndex from hsfs.expectation_suite import ExpectationSuite from hsfs.ge_validation_result import ValidationResult +from hsfs.hopsworks_udf import HopsworksUdf, UDFType from hsfs.statistics import Statistics from hsfs.statistics_config import StatisticsConfig +from hsfs.transformation_function import TransformationFunction from hsfs.validation_report import ValidationReport @@ -543,8 +545,13 @@ def get_storage_connector(self): """ storage_connector_provenance = self.get_storage_connector_provenance() - if storage_connector_provenance.inaccessible or storage_connector_provenance.deleted: - _logger.info("The parent storage connector is deleted or inaccessible. For more details access `get_storage_connector_provenance`") + if ( + storage_connector_provenance.inaccessible + or storage_connector_provenance.deleted + ): + _logger.info( + "The parent storage connector is deleted or inaccessible. For more details access `get_storage_connector_provenance`" + ) if storage_connector_provenance.accessible: return storage_connector_provenance.accessible[0] @@ -2022,6 +2029,9 @@ def __init__( Union[Dict[str, Any], "deltastreamer_jobconf.DeltaStreamerJobConf"] ] = None, deprecated: bool = False, + transformation_functions: Optional[ + List[Union[TransformationFunction, HopsworksUdf]] + ] = None, **kwargs, ) -> None: super().__init__( @@ -2124,6 +2134,44 @@ def __init__( self._feature_writers: Optional[Dict[str, callable]] = None self._writer: Optional[callable] = None + # On-Demand Transformation Functions + self._transformation_functions: List[TransformationFunction] = ( + [ + TransformationFunction( + featurestore_id, + hopsworks_udf=transformation_function, + version=1, + transformation_type=UDFType.ON_DEMAND, + ) + if not isinstance(transformation_function, TransformationFunction) + else transformation_function + for transformation_function in transformation_functions + ] + if transformation_functions + else [] + ) + + if self._transformation_functions: + self._transformation_functions = ( + FeatureGroup._sort_transformation_functions( + self._transformation_functions + ) + ) + + @staticmethod + def _sort_transformation_functions( + transformation_functions: List[TransformationFunction], + ) -> List[TransformationFunction]: + """ + Function that sorts transformation functions in the order of the output column names. + The list of transformation functions are sorted based on the output columns names to maintain consistent ordering. + # Arguments + transformation_functions: `List[TransformationFunction]`. List of transformation functions to be sorted + # Returns + `List[TransformationFunction]`: List of transformation functions to be sorted + """ + return sorted(transformation_functions, key=lambda x: x.output_column_names[0]) + def read( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None, @@ -3204,6 +3252,17 @@ def from_response_json( json_decamelized["embedding_index"] = EmbeddingIndex.from_response_json( json_decamelized["embedding_index"] ) + if "transformation_functions" in json_decamelized: + transformation_functions = json_decamelized["transformation_functions"] + json_decamelized["transformation_functions"] = [ + TransformationFunction.from_response_json( + { + **transformation_function, + "transformation_type": UDFType.ON_DEMAND, + } + ) + for transformation_function in transformation_functions + ] return cls(**json_decamelized) for fg in json_decamelized: if "type" in fg: @@ -3214,6 +3273,17 @@ def from_response_json( fg["embedding_index"] = EmbeddingIndex.from_response_json( fg["embedding_index"] ) + if "transformation_functions" in fg: + transformation_functions = fg["transformation_functions"] + fg["transformation_functions"] = [ + TransformationFunction.from_response_json( + { + **transformation_function, + "transformation_type": UDFType.ON_DEMAND, + } + ) + for transformation_function in transformation_functions + ] return [cls(**fg) for fg in json_decamelized] def update_from_response_json(self, json_dict: Dict[str, Any]) -> "FeatureGroup": @@ -3224,6 +3294,17 @@ def update_from_response_json(self, json_dict: Dict[str, Any]) -> "FeatureGroup" json_decamelized["embedding_index"] = EmbeddingIndex.from_response_json( json_decamelized["embedding_index"] ) + if "transformation_functions" in json_decamelized: + transformation_functions = json_decamelized["transformation_functions"] + json_decamelized["transformation_functions"] = [ + TransformationFunction.from_response_json( + { + **transformation_function, + "transformation_type": UDFType.ON_DEMAND, + } + ) + for transformation_function in transformation_functions + ] self.__init__(**json_decamelized) return self @@ -3270,6 +3351,7 @@ def to_dict(self) -> Dict[str, Any]: "topicName": self.topic_name, "notificationTopicName": self.notification_topic_name, "deprecated": self.deprecated, + "transformationFunctions": self._transformation_functions, } if self.embedding_index: fg_meta_dict["embeddingIndex"] = self.embedding_index.to_dict() @@ -3376,6 +3458,13 @@ def statistics(self) -> "Statistics": ) return super().statistics + @property + def transformation_functions( + self, + ) -> List[TransformationFunction]: + """Get transformation functions.""" + return self._transformation_functions + @description.setter def description(self, new_description: Optional[str]) -> None: self._description = new_description @@ -3402,6 +3491,13 @@ def stream(self, stream: bool) -> None: def parents(self, new_parents: "explicit_provenance.Links") -> None: self._parents = new_parents + @transformation_functions.setter + def transformation_functions( + self, + transformation_functions: List[TransformationFunction], + ) -> None: + self._transformation_functions = transformation_functions + @typechecked class ExternalFeatureGroup(FeatureGroupBase): diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 11eeac1983..4da096d80c 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -510,6 +510,9 @@ def create_feature_group( parents: Optional[List[feature_group.FeatureGroup]] = None, topic_name: Optional[str] = None, notification_topic_name: Optional[str] = None, + transformation_functions: Optional[ + List[Union[TransformationFunction, HopsworksUdf]] + ] = None, ) -> "feature_group.FeatureGroup": """Create a feature group metadata object. @@ -592,6 +595,7 @@ def create_feature_group( defaults to using project topic. notification_topic_name: Optionally, define the name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If left undefined no notifications are sent. + transformation_functions: A list of Hopsworks UDF's. Defaults to `None`, no transformations. # Returns `FeatureGroup`. The feature group metadata object. @@ -616,6 +620,7 @@ def create_feature_group( parents=parents or [], topic_name=topic_name, notification_topic_name=notification_topic_name, + transformation_functions=transformation_functions, ) feature_group_object.feature_store = self return feature_group_object @@ -642,6 +647,9 @@ def get_or_create_feature_group( parents: Optional[List[feature_group.FeatureGroup]] = None, topic_name: Optional[str] = None, notification_topic_name: Optional[str] = None, + transformation_functions: Optional[ + List[Union[TransformationFunction, HopsworksUdf]] + ] = None, ) -> Union[ "feature_group.FeatureGroup", "feature_group.ExternalFeatureGroup", @@ -726,6 +734,7 @@ def get_or_create_feature_group( defaults to using project topic. notification_topic_name: Optionally, define the name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If left undefined no notifications are sent. + transformation_functions: A list of Hopsworks UDF's. Defaults to `None`, no transformations. # Returns `FeatureGroup`. The feature group metadata object. @@ -759,6 +768,7 @@ def get_or_create_feature_group( parents=parents or [], topic_name=topic_name, notification_topic_name=notification_topic_name, + transformation_functions=transformation_functions, ) feature_group_object.feature_store = self return feature_group_object diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 9ca317a473..f2f5019160 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -54,7 +54,7 @@ from hsfs.core.vector_db_client import VectorDbClient from hsfs.decorators import typechecked from hsfs.feature import Feature -from hsfs.hopsworks_udf import HopsworksUdf +from hsfs.hopsworks_udf import HopsworksUdf, UDFType from hsfs.statistics import Statistics from hsfs.statistics_config import StatisticsConfig from hsfs.training_dataset_split import TrainingDatasetSplit @@ -126,6 +126,7 @@ def __init__( self.featurestore_id, hopsworks_udf=transformation_function, version=1, + transformation_type=UDFType.MODEL_DEPENDENT, ) if not isinstance(transformation_function, TransformationFunction) else transformation_function @@ -493,6 +494,7 @@ def get_feature_vector( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, + request_parameters: Optional[Dict[str, Any]] = None, ) -> Union[List[Any], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vector from online feature store. Call [`feature_view.init_serving`](#init_serving) before this method if the following configurations are needed. @@ -566,6 +568,7 @@ def get_feature_vector( force_sql_client: boolean, defaults to False. If set to True, reads from online feature store using the SQL client if initialised. allow_missing: Setting to `True` returns feature vectors with missing values. + request_parameters: Request parameters required by on-demand transformation functions. # Returns `list`, `pd.DataFrame`, `polars.DataFrame` or `np.ndarray` if `return type` is set to `"list"`, `"pandas"`, `"polars"` or `"numpy"` @@ -591,6 +594,7 @@ def get_feature_vector( vector_db_features=vector_db_features, force_rest_client=force_rest_client, force_sql_client=force_sql_client, + request_parameters=request_parameters, ) def get_feature_vectors( @@ -602,6 +606,7 @@ def get_feature_vectors( allow_missing: bool = False, force_rest_client: bool = False, force_sql_client: bool = False, + request_parameters: Optional[List[Dict[str, Any]]] = None, ) -> Union[List[List[Any]], pd.DataFrame, np.ndarray, pl.DataFrame]: """Returns assembled feature vectors in batches from online feature store. Call [`feature_view.init_serving`](#init_serving) before this method if the following configurations are needed. @@ -700,6 +705,7 @@ def get_feature_vectors( vector_db_features=vector_db_features, force_rest_client=force_rest_client, force_sql_client=force_sql_client, + request_parameters=request_parameters, ) def get_inference_helper( @@ -853,6 +859,10 @@ def find_neighbors( the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed. + !!! warning "Duplicate column error in Polars" + If the feature view has duplicate column names, attempting to create a polars DataFrame + will raise an error. To avoid this, set `return_type` to `"list"` or `"pandas"`. + # Arguments embedding: The target embedding for which neighbors are to be found. feature: The feature used to compute similarity score. Required only if there @@ -1024,7 +1034,7 @@ def get_batch_data( start_time, end_time, self._batch_scoring_server.training_dataset_version, - self._batch_scoring_server._transformation_functions, + self._batch_scoring_server._model_dependent_transformation_functions, read_options, spine, primary_keys, @@ -3442,7 +3452,12 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> "FeatureView": featurestore_name=json_decamelized.get("featurestore_name", None), serving_keys=serving_keys, transformation_functions=[ - TransformationFunction.from_response_json(transformation_function) + TransformationFunction.from_response_json( + { + **transformation_function, + "transformation_type": UDFType.MODEL_DEPENDENT, + } + ) for transformation_function in transformation_functions ] if transformation_functions diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index b9f8bde5bb..0a005134a6 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -21,6 +21,7 @@ import warnings from dataclasses import dataclass from datetime import date, datetime, time +from enum import Enum from typing import Any, Callable, Dict, List, Optional, Tuple, Union import humps @@ -31,7 +32,14 @@ from hsfs.transformation_statistics import TransformationStatistics -def udf(return_type: Union[List[type], type]) -> "HopsworksUdf": +class UDFType(Enum): + MODEL_DEPENDENT = "model_dependent" + ON_DEMAND = "on_demand" + + +def udf( + return_type: Union[List[type], type], drop: Optional[Union[str, List[str]]] = None +) -> "HopsworksUdf": """ Create an User Defined Function that can be and used within the Hopsworks Feature Store. @@ -46,13 +54,14 @@ def udf(return_type: Union[List[type], type]) -> "HopsworksUdf": ```python from hopsworks import udf - @udf(float) + @udf(float) def add_one(data1 : pd.Series): return data1 + 1 ``` # Arguments return_type: `list`. The output types of the defined UDF + drop: `List[str]`. The features to be dropped after application of transformation functions # Returns `HopsworksUdf`: The metadata object for hopsworks UDF's. @@ -62,7 +71,7 @@ def add_one(data1 : pd.Series): """ def wrapper(func: Callable) -> HopsworksUdf: - udf = HopsworksUdf(func=func, return_types=return_type) + udf = HopsworksUdf(func=func, return_types=return_type, dropped_features=drop) return udf return wrapper @@ -127,11 +136,17 @@ def __init__( return_types: Union[List[type], type, List[str], str], name: Optional[str] = None, transformation_features: Optional[List[TransformationFeature]] = None, + dropped_features: Optional[List[str]] = None, + feature_name_prefix: Optional[str] = None, ): self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types( return_types ) + self._feature_name_prefix: Optional[str] = ( + feature_name_prefix # Prefix to be added to feature names + ) + self._function_name: str = func.__name__ if name is None else name self._function_source: str = ( @@ -152,9 +167,55 @@ def __init__( HopsworksUdf._format_source_code(self._function_source) ) + self._dropped_features: List[str] = ( + HopsworksUdf._validate_and_convert_drop_features( + dropped_features, self.transformation_features, feature_name_prefix + ) + ) + self._statistics: Optional[TransformationStatistics] = None - self._output_column_names: List[str] = self._get_output_column_names() + self._udf_type: UDFType = None + + self._output_column_names: List[str] = [] + + @staticmethod + def _validate_and_convert_drop_features( + dropped_features: Union[str, List[str]], + transformation_feature: List[str], + feature_name_prefix: str, + ) -> List[str]: + """ + Function that converts dropped features to a list and validates if the dropped feature is present in the transformation function + # Arguments + dropped_features: `Union[str, List[str]]`. Features of be dropped. + transformation_feature: `List[str]`. Features to be transformed in the UDF + # Returns + `List[str]`: A list of features to be dropped. + """ + if not dropped_features: + return [] + + dropped_features = ( + [dropped_features] + if not isinstance(dropped_features, list) + else dropped_features + ) + + feature_name_prefix = feature_name_prefix if feature_name_prefix else "" + + missing_drop_features = [] + for dropped_feature in dropped_features: + if feature_name_prefix + dropped_feature not in transformation_feature: + missing_drop_features.append(dropped_feature) + + if missing_drop_features: + missing_drop_features = "', '".join(missing_drop_features) + raise FeatureStoreException( + f"Cannot drop features '{missing_drop_features}' as they are not features given as arguments in the defined UDF." + ) + + return dropped_features @staticmethod def _validate_and_convert_output_types( @@ -365,13 +426,18 @@ def _get_output_column_names(self) -> str: # Returns `List[str]`: List of feature names for the transformed columns """ - _BASE_COLUMN_NAME = ( - f'{self.function_name}_{"_".join(self.transformation_features)}_' - ) - if len(self.return_types) > 1: - return [f"{_BASE_COLUMN_NAME}{i}" for i in range(len(self.return_types))] - else: - return [f"{_BASE_COLUMN_NAME}"] + if self._udf_type == UDFType.MODEL_DEPENDENT: + _BASE_COLUMN_NAME = ( + f'{self.function_name}_{"-".join(self.transformation_features)}_' + ) + if len(self.return_types) > 1: + return [ + f"{_BASE_COLUMN_NAME}{i}" for i in range(len(self.return_types)) + ] + else: + return [f"{_BASE_COLUMN_NAME}"] + elif self._udf_type == UDFType.ON_DEMAND: + return [self.function_name] def _create_pandas_udf_return_schema_from_list(self) -> str: """ @@ -479,6 +545,13 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf": raise FeatureStoreException( f'Feature names provided must be string "{arg}" is not string' ) + transformation_feature_name = self.transformation_features + index_dropped_features = [ + transformation_feature_name.index(dropped_feature) + for dropped_feature in self.dropped_features + ] + updated_dropped_features = [features[index] for index in index_dropped_features] + # Create a copy of the UDF to associate it with new feature names. udf = copy.deepcopy(self) @@ -491,6 +564,7 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf": ) ] udf.output_column_names = udf._get_output_column_names() + udf.dropped_features = updated_dropped_features return udf def update_return_type_one_hot(self): @@ -541,10 +615,12 @@ def to_dict(self) -> Dict[str, Any]: "sourceCode": self._function_source, "outputTypes": self.return_types, "transformationFeatures": self.transformation_features, + "droppedFeatures": self.dropped_features, "statisticsArgumentNames": self._statistics_argument_names if self.statistics_required else None, "name": self._function_name, + "featureNamePrefix": self._feature_name_prefix, } def json(self) -> str: @@ -572,12 +648,17 @@ def from_response_json( json_decamelized = humps.decamelize(json_dict) function_source_code = json_decamelized["source_code"] function_name = json_decamelized["name"] + feature_name_prefix = json_decamelized.get("feature_name_prefix", None) output_types = [ output_type.strip() for output_type in json_decamelized["output_types"] ] transformation_features = [ feature.strip() for feature in json_decamelized["transformation_features"] ] + dropped_features = [ + dropped_feature.strip() + for dropped_feature in json_decamelized["dropped_features"] + ] statistics_features = ( [ feature.strip() @@ -590,10 +671,6 @@ def from_response_json( # Reconstructing statistics arguments. arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code) - transformation_features = ( - arg_list if not transformation_features else transformation_features - ) - if statistics_features: transformation_features = [ TransformationFeature( @@ -615,11 +692,28 @@ def from_response_json( return_types=output_types, name=function_name, transformation_features=transformation_features, + dropped_features=dropped_features, + feature_name_prefix=feature_name_prefix, ) # Set transformation features if already set. return hopsworks_udf + def _validate_udf_type(self): + if self.udf_type is None: + raise FeatureStoreException("UDF Type cannot be None") + + if self._udf_type == UDFType.ON_DEMAND: + if len(self.return_types) > 1: + raise FeatureStoreException( + "On-Demand Transformation functions can only return one column as output" + ) + + if self.statistics_required: + raise FeatureStoreException( + "On-Demand Transformation functions cannot use statistics, please remove statistics parameters from the functions" + ) + @property def return_types(self) -> List[str]: """Get the output types of the UDF""" @@ -648,17 +742,30 @@ def transformation_statistics( @property def output_column_names(self) -> List[str]: """Output columns names of the transformation function""" - return self._output_column_names + if self._feature_name_prefix: + return [ + self._feature_name_prefix + output_col_name + for output_col_name in self._output_column_names + ] + else: + return self._output_column_names @property def transformation_features(self) -> List[str]: """ List of feature names to be used in the User Defined Function. """ - return [ - transformation_feature.feature_name - for transformation_feature in self._transformation_features - ] + if self._feature_name_prefix: + return [ + self._feature_name_prefix + transformation_feature.feature_name + for transformation_feature in self._transformation_features + ] + + else: + return [ + transformation_feature.feature_name + for transformation_feature in self._transformation_features + ] @property def statistics_features(self) -> List[str]: @@ -692,6 +799,33 @@ def _statistics_argument_names(self) -> List[str]: if transformation_feature.statistic_argument_name is not None ] + @property + def udf_type(self) -> UDFType: + """Type of the UDF : Can be \"model dependent\" or \"on-demand\" """ + return self._udf_type + + @udf_type.setter + def udf_type(self, udf_type: UDFType) -> None: + self._udf_type = udf_type + self._validate_udf_type() + self._output_column_names = self._get_output_column_names() + + @property + def dropped_features(self) -> List[str]: + if self._feature_name_prefix: + return [ + self._feature_name_prefix + dropped_feature + for dropped_feature in self._dropped_features + ] + else: + return self._dropped_features + + @dropped_features.setter + def dropped_features(self, features: List[str]) -> None: + self._dropped_features = HopsworksUdf._validate_and_convert_drop_features( + features, self.transformation_features, self._feature_name_prefix + ) + @transformation_statistics.setter def transformation_statistics( self, statistics: List[FeatureDescriptiveStatistics] @@ -713,3 +847,6 @@ def output_column_names(self, output_col_names: Union[str, List[str]]) -> None: ) else: self._output_column_names = output_col_names + + def __repr__(self): + return f'{self.function_name}({", ".join(self.transformation_features)})' diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index a06637abe2..3aa3f6a81f 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -15,10 +15,14 @@ # from __future__ import annotations +from typing import Optional + import humps from hsfs import feature as feature_mod from hsfs import feature_group as feature_group_mod from hsfs import util +from hsfs.hopsworks_udf import UDFType +from hsfs.transformation_function import TransformationFunction class TrainingDatasetFeature: @@ -32,6 +36,7 @@ def __init__( label=False, inference_helper_column=False, training_helper_column=False, + transformation_function: Optional[TransformationFunction] = None, **kwargs, ): self._name = util.autofix_feature_name(name) @@ -47,6 +52,10 @@ def __init__( self._inference_helper_column = inference_helper_column self._training_helper_column = training_helper_column + self._on_demand_transformation_function: Optional[TransformationFunction] = ( + transformation_function if transformation_function else None + ) + def to_dict(self): return { "name": self._name, @@ -57,11 +66,21 @@ def to_dict(self): "trainingHelperColumn": self._training_helper_column, "featureGroupFeatureName": self._feature_group_feature_name, "featuregroup": self._feature_group, + "transformation_function": self._on_demand_transformation_function, } @classmethod def from_response_json(cls, json_dict): json_decamelized = humps.decamelize(json_dict) + if json_decamelized.get("transformation_function", False): + json_decamelized["transformation_function"]["transformation_type"] = ( + UDFType.ON_DEMAND + ) + json_decamelized["transformation_function"] = ( + TransformationFunction.from_response_json( + json_decamelized.get("transformation_function") + ) + ) return cls(**json_decamelized) def is_complex(self): @@ -110,6 +129,11 @@ def inference_helper_column(self): def inference_helper_column(self, inference_helper_column): self._inference_helper_column = inference_helper_column + @property + def on_demand_transformation_function(self) -> TransformationFunction: + """Whether the feature is a on-demand feature computed using on-demand transformation functions""" + return self._on_demand_transformation_function + @property def training_helper_column(self): """Indicator if it is feature.""" @@ -128,4 +152,4 @@ def feature_group_feature_name(self): return self._feature_group_feature_name def __repr__(self): - return f"Training Dataset Feature({self._name!r}, {self._type!r}, {self._index!r}, {self._label}, {self._feature_group_feature_name}, {self._feature_group.id!r})" + return f"Training Dataset Feature({self._name!r}, {self._type!r}, {self._index!r}, {self._label}, {self._feature_group_feature_name}, {self._feature_group.id!r}, {self.on_demand_transformation_function})" diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index a3f6a295d7..65535aa539 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -23,7 +23,7 @@ from hsfs.client.exceptions import FeatureStoreException from hsfs.core import transformation_function_engine from hsfs.decorators import typechecked -from hsfs.hopsworks_udf import HopsworksUdf +from hsfs.hopsworks_udf import HopsworksUdf, UDFType @typechecked @@ -44,6 +44,7 @@ def __init__( hopsworks_udf: HopsworksUdf, version: Optional[int] = None, id: Optional[int] = None, + transformation_type: Optional[UDFType] = None, type=None, items=None, count=None, @@ -65,6 +66,7 @@ def __init__( ) self._hopsworks_udf: HopsworksUdf = hopsworks_udf + self._hopsworks_udf.udf_type = transformation_type def save(self) -> None: """Save a transformation function into the backend. @@ -233,3 +235,11 @@ def hopsworks_udf(self) -> HopsworksUdf: def output_column_names(self) -> List[str]: """Output column names of transformation functions""" return self._hopsworks_udf._output_column_names + + def __repr__(self): + if self.hopsworks_udf._udf_type == UDFType.MODEL_DEPENDENT: + return ( + f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}" + ) + else: + return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}" diff --git a/python/tests/test_hopswork_udf.py b/python/tests/test_hopswork_udf.py index 8494d018f1..6595207ed3 100644 --- a/python/tests/test_hopswork_udf.py +++ b/python/tests/test_hopswork_udf.py @@ -19,7 +19,7 @@ import pandas as pd import pytest from hsfs.client.exceptions import FeatureStoreException -from hsfs.hopsworks_udf import HopsworksUdf, TransformationFeature, udf +from hsfs.hopsworks_udf import HopsworksUdf, TransformationFeature, UDFType, udf class TestHopsworksUdf: @@ -330,14 +330,21 @@ def test_generate_output_column_names_one_argument_one_output_type(self): def test_func(col1): return col1 + 1 + test_func.udf_type = UDFType.MODEL_DEPENDENT assert test_func._get_output_column_names() == ["test_func_col1_"] + test_func.udf_type = UDFType.ON_DEMAND + assert test_func._get_output_column_names() == ["test_func"] + def test_generate_output_column_names_multiple_argument_one_output_type(self): @udf(int) def test_func(col1, col2, col3): return col1 + 1 - assert test_func._get_output_column_names() == ["test_func_col1_col2_col3_"] + test_func.udf_type = UDFType.MODEL_DEPENDENT + assert test_func._get_output_column_names() == ["test_func_col1-col2-col3_"] + test_func.udf_type = UDFType.ON_DEMAND + assert test_func._get_output_column_names() == ["test_func"] def test_generate_output_column_names_single_argument_multiple_output_type(self): @udf([int, float, int]) @@ -346,6 +353,7 @@ def test_func(col1): {"col1": [col1 + 1], "col2": [col1 + 1], "col3": [col1 + 1]} ) + test_func.udf_type = UDFType.MODEL_DEPENDENT assert test_func._get_output_column_names() == [ "test_func_col1_0", "test_func_col1_1", @@ -359,10 +367,11 @@ def test_func(col1, col2, col3): {"col1": [col1 + 1], "col2": [col2 + 1], "col3": [col3 + 1]} ) + test_func.udf_type = UDFType.MODEL_DEPENDENT assert test_func._get_output_column_names() == [ - "test_func_col1_col2_col3_0", - "test_func_col1_col2_col3_1", - "test_func_col1_col2_col3_2", + "test_func_col1-col2-col3_0", + "test_func_col1-col2-col3_1", + "test_func_col1-col2-col3_2", ] def test_create_pandas_udf_return_schema_from_list_one_output_type(self): @@ -388,30 +397,45 @@ def test_func(col1): } ) + test_func.udf_type = UDFType.MODEL_DEPENDENT + assert ( test_func._create_pandas_udf_return_schema_from_list() == "`test_func_col1_0` bigint, `test_func_col1_1` double, `test_func_col1_2` string, `test_func_col1_3` date, `test_func_col1_4` timestamp, `test_func_col1_5` timestamp, `test_func_col1_6` boolean" ) def test_hopsworks_wrapper_single_output(self): + test_dataframe = pd.DataFrame({"col1": [1, 2, 3, 4]}) + @udf(int) def test_func(col1): return col1 + 1 - renaming_wrapper_function = test_func.hopsworksUdf_wrapper() + test_func.udf_type = UDFType.MODEL_DEPENDENT - test_dataframe = pd.DataFrame({"col1": [1, 2, 3, 4]}) + renaming_wrapper_function = test_func.hopsworksUdf_wrapper() result = renaming_wrapper_function(test_dataframe["col1"]) assert result.name == "test_func_col1_" assert result.values.tolist() == [2, 3, 4, 5] + test_func.udf_type = UDFType.ON_DEMAND + + renaming_wrapper_function = test_func.hopsworksUdf_wrapper() + + result = renaming_wrapper_function(test_dataframe["col1"]) + + assert result.name == "test_func" + assert result.values.tolist() == [2, 3, 4, 5] + def test_hopsworks_wrapper_multiple_output(self): @udf([int, float]) def test_func(col1, col2): return pd.DataFrame({"out1": col1 + 1, "out2": col2 + 2}) + test_func.udf_type = UDFType.MODEL_DEPENDENT + renaming_wrapper_function = test_func.hopsworksUdf_wrapper() test_dataframe = pd.DataFrame( @@ -422,7 +446,7 @@ def test_func(col1, col2): test_dataframe["column1"], test_dataframe["column2"] ) - assert all(result.columns == ["test_func_col1_col2_0", "test_func_col1_col2_1"]) + assert all(result.columns == ["test_func_col1-col2_0", "test_func_col1-col2_1"]) assert result.values.tolist() == [[2, 12], [3, 22], [4, 32], [5, 42]] def test_HopsworkUDf_call_one_argument(self):