diff --git a/docs/source/feature/data.md b/docs/source/feature/data.md index 88d205d..cf6ad66 100644 --- a/docs/source/feature/data.md +++ b/docs/source/feature/data.md @@ -91,9 +91,11 @@ sample_weight_fields: 'col_name' - 暂不支持没有header的csv文件 - csv格式数据读性能有瓶颈 -### fg_encoded + -- 输入数据是否为FG(Feature Generator)进行特征编码后的数据 +### fg_mode + +- FG(Feature Generator) 的运行模式,支持FG_DAG, FG_NONE, FG_BUCKETIZE, FG_NORMAL - FG是进入模型推理前的一层特征变换,可以保证离在线特征变换的一致性,特征变换包含Combo/Lookup/Match/Expr等类型,详见[特征](./feature.md)章节。以LookupFeature的一种配置为例,特征变换为从`cate_map`中用`cate_key`查出值后,用`boundaries`进行分箱再进入模型推理 @@ -134,57 +136,80 @@ sample_weight_fields: 'col_name' } ``` -- **fg_encoded=false:** +#### fg_mode=FG_DAG - - 训练时会在Dataset中执行FG,数据列名与各**特征的FG所依赖字段来源**同名,详见[特征](./feature.md),Dataset会自动分析所有特征依赖的字段来源来读取数据。 - - 以上文LookupFeature为例,**特征FG所依赖字段来源**为`[cate_map, cate_key]`,Dataset会从输入表中读取名为`cate_map`和`cate_key`的列来做FG得到`lookup_feat` - - 该模式可以帮忙我们快速验证FG的训练效果,调优FG的配置,但由于训练时多了FG的过程,训练速度会受到一定程度的影响 +- 训练时会在Dataset中执行FG,数据列名与各**特征的FG所依赖字段来源**同名,详见[特征](./feature.md),Dataset会自动分析所有特征依赖的字段来源来读取数据。 + - 以上文LookupFeature为例,**特征FG所依赖字段来源**为`[cate_map, cate_key]`,Dataset会从输入表中读取名为`cate_map`和`cate_key`的列来做FG得到`lookup_feat` +- 该模式可以帮忙我们快速验证FG的训练效果,调优FG的配置,但由于训练时多了FG的过程,训练速度会受到一定程度的影响 -- **fg_encoded=true:** +#### fg_mode=FG_NORMAL - - 则认为输入数据为编码后的数据,数据列名与**特征名**(`feature_name`)同名,Dataset会自动分析所有特征的特征名来读取数据 - - 以上文LookupFeature为例,**特征名**为`lookup_feat`,Dataset会从输入表中直接读取编码后的`lookup_feat`列直接进行模型推理 - - 该模式训练速度最佳,但需提前对数据提前进行FG编码,目前仅提供MaxCompute方式,步骤如下: - - 在DLC/DSW/Local环境中生成fg json配置,上传至Dataworks的资源中,如果fg_output_dir中有vocab_file等其他文件,也需要上传至资源中 - ```shell - python -m tzrec.tools.create_fg_json \ - --pipeline_config_path ${PIPELINE_CONFIG_PATH} \ - --fg_output_dir fg_output - --reserves ${COLS_YOU_WANT_RESERVE} - ``` - - --pipeline_config_path: 模型配置文件。 - - --fg_output_dir: fg json的输出文件夹。 - - --reserves: 需要透传到输出表的列,列名用逗号分隔。一般需要保留Label列,也可以保留request_id,user_id,item_id列,注意:如果模型的feature_config中有user_id,item_id作为特征,feature_name需避免与样本中的user_id,item_id列名冲突。 - - 在[Dataworks](https://workbench.data.aliyun.com/)的独享资源组中安装pyfg,「资源组列表」- 在一个调度资源组的「操作」栏 点「运维助手」-「创建命令」(选手动输入)-「运行命令」 - ```shell - /home/tops/bin/pip3 install http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg037-0.3.7-cp37-cp37m-linux_x86_64.whl - ``` - - 在Dataworks中建立`PyODPS 3`节点运行FG,节点调度参数中配置好bizdate参数 - ``` - from pyfg037 import offline_pyfg - offline_pyfg.run( - o, - input_table="YOU_PROJECT.TABLE_NAME", - output_table="YOU_PROJECT.TABLE_NAME", - fg_json_file="YOU_FG_FILE_NAME", - partition_value=args['bizdate'] - ) - ``` +- 训练时会在Dataset中执行FG,但不是以DAG方式运行。因此特征的输入中如果有`feature` side的输入,也需要在输入表中。目前更建议使用`FG_DAG`模式 - | 参数 | 默认值 | 说明 | - | ------------------------- | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | - | input_table | 无 | 输入表 | - | output_table | 无 | 输出表,会自动创建 | - | fg_json_file | 无 | FG 配置文件,json 格式 | - | partition_value | 无 | 指定输入表的分区作为 FG 的输入,可支持多分区表,以逗号分隔 | - | force_delete_output_table | False | 是否删除输出表,设置为 True 时会先自动删除输出表, 再运行任务 | - | force_update_resource | False | 是否更新资源,设置为 True 时会先自动更新资源, 再运行任务 | - | set_sql | 无 | 任务执行的[flag](https://help.aliyun.com/zh/maxcompute/user-guide/flag-parameters?spm=a2c4g.11186623.0.0.383a20d9CQnpaR#concept-2278178),如set odps.stage.mapper.split.size=32;,注意需要以分号结尾 | +#### fg_mode=FG_NONE + +- 训练时不会在Dataset中执行FG,输入数据为Fg编码后的数据,数据列名与**特征名**(`feature_name`)同名,Dataset会自动分析所有特征的特征名来读取数据 + - 以上文LookupFeature为例,**特征名**为`lookup_feat`,Dataset会从输入表中直接读取编码后的`lookup_feat`列直接进行模型训练和推理 +- 该模式训练速度最佳,但需提前对数据提前进行FG编码,目前仅提供MaxCompute方式,步骤如下: + - 在DLC/DSW/Local环境中生成fg json配置,上传至Dataworks的资源中,如果fg_output_dir中有vocab_file等其他文件,也需要上传至资源中 + ```shell + python -m tzrec.tools.create_fg_json \ + --pipeline_config_path ${PIPELINE_CONFIG_PATH} \ + --fg_output_dir fg_output \ + --reserves ${COLS_YOU_WANT_RESERVE} + ``` + - --pipeline_config_path: 模型配置文件。 + - --fg_output_dir: fg json的输出文件夹。 + - --reserves: 需要透传到输出表的列,列名用逗号分隔。一般需要保留Label列,也可以保留request_id,user_id,item_id列,注意:如果模型的feature_config中有user_id,item_id作为特征,feature_name需避免与样本中的user_id,item_id列名冲突。 + - 在[Dataworks](https://workbench.data.aliyun.com/)的独享资源组中安装pyfg,「资源组列表」- 在一个调度资源组的「操作」栏 点「运维助手」-「创建命令」(选手动输入)-「运行命令」 + ```shell + /home/tops/bin/pip3 install http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg039-0.3.9-cp37-cp37m-linux_x86_64.whl + ``` + - 在Dataworks中建立`PyODPS 3`节点运行FG,节点调度参数中配置好bizdate参数 + ``` + from pyfg039 import offline_pyfg + offline_pyfg.run( + o, + input_table="YOU_PROJECT.TABLE_NAME", + output_table="YOU_PROJECT.TABLE_NAME", + fg_json_file="YOU_FG_FILE_NAME", + partition_value=args['bizdate'] + ) + ``` + +| 参数 | 默认值 | 说明 | +| ------------------------- | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| input_table | 无 | 输入表 | +| output_table | 无 | 输出表,会自动创建 | +| fg_json_file | 无 | FG 配置文件,json 格式 | +| partition_value | 无 | 指定输入表的分区作为 FG 的输入,可支持多分区表,以逗号分隔 | +| force_delete_output_table | False | 是否删除输出表,设置为 True 时会先自动删除输出表, 再运行任务 | +| force_update_resource | False | 是否更新资源,设置为 True 时会先自动更新资源, 再运行任务 | +| set_sql | 无 | 任务执行的[flag](https://help.aliyun.com/zh/maxcompute/user-guide/flag-parameters?spm=a2c4g.11186623.0.0.383a20d9CQnpaR#concept-2278178),如set odps.stage.mapper.split.size=32;,注意需要以分号结尾 | + +#### fg_mode=FG_BUCKETIZE + +- 训练时在Dataset中执行FG的Bucketize部分,输入数据为Fg编码但未进行Bucketize的数据,Bucketize配置包含`hash_bucket_size`,`boundaries`,`vocab_dict`,`vocab_list` + + - 数据列名与**特征名**(`feature_name`)同名,Dataset会自动分析所有特征的特征名来读取数据 + - 以上文LookupFeature为例,**特征名**为`lookup_feat`,Dataset会从输入表中直接读取编码后的`lookup_feat`列直接进行模型训练和推理 + +- 该模式训练速度介于`FG_DAG`和`FG_NONE`之间,适用于在需要统计Bucketize前的数据分布来设置合适的Bucketize参数的情况,可以避免离线提前跑两次Fg编码 + +- 注意:在这种模式下,对数据提前进行FG编码时,使用的fg json配置不应该包含Bucketize配置,可以在`create_fg_json`时增加`--remove_bucketizer`参数来去除fg json配置中的Bucketize配置 + + ```shell + python -m tzrec.tools.create_fg_json \ + --pipeline_config_path ${PIPELINE_CONFIG_PATH} \ + --fg_output_dir fg_output \ + --reserves ${COLS_YOU_WANT_RESERVE} \ + --remove_bucketizer + + ``` ### fg_threads - 每个dataloader worker上fg的运行线程数,默认为1,`nproc-per-node * num_workers * fg_threads`建议小于单机CPU核数 -- 如果`fg_threads==0`,将不会以DAG模式运行fg,特征的输入中如果有`feature` side的输入,也需要在输入表中 ### label_fields diff --git a/tzrec/datasets/data_parser_test.py b/tzrec/datasets/data_parser_test.py index a91863c..c05838a 100644 --- a/tzrec/datasets/data_parser_test.py +++ b/tzrec/datasets/data_parser_test.py @@ -33,7 +33,7 @@ class DataParserTest(unittest.TestCase): def tearDown(self): os.environ.pop("INPUT_TILE", None) - def test_fg_none(self): + def test_nofg(self): feature_cfgs = [ feature_pb2.FeatureConfig( id_feature=feature_pb2.IdFeature( diff --git a/tzrec/datasets/dataset_test.py b/tzrec/datasets/dataset_test.py index f9ebc06..6def303 100644 --- a/tzrec/datasets/dataset_test.py +++ b/tzrec/datasets/dataset_test.py @@ -122,7 +122,7 @@ def test_dataset(self): data_config=data_pb2.DataConfig( batch_size=4, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgModel.FG_NONE, + fg_mode=data_pb2.FgMode.FG_NONE, label_fields=["label"], ), features=features, @@ -189,7 +189,7 @@ def test_dataset_with_sampler(self, force_base_data_group): data_config=data_pb2.DataConfig( batch_size=4, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgModel.FG_NONE, + fg_mode=data_pb2.FgMode.FG_NONE, label_fields=["label"], negative_sampler=sampler_pb2.NegativeSampler( input_path=f.name, @@ -284,7 +284,7 @@ def test_dataset_with_sample_mask(self): data_config=data_pb2.DataConfig( batch_size=32, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgModel.FG_NONE, + fg_mode=data_pb2.FgMode.FG_NONE, label_fields=["label"], sample_mask_prob=0.4, ), @@ -348,7 +348,7 @@ def test_dataset_with_neg_sample_mask(self): data_config=data_pb2.DataConfig( batch_size=32, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgModel.FG_NONE, + fg_mode=data_pb2.FgMode.FG_NONE, label_fields=["label"], negative_sample_mask_prob=0.4, negative_sampler=sampler_pb2.NegativeSampler( @@ -408,7 +408,7 @@ def test_dataset_predict_mode(self, debug_level): data_config=data_pb2.DataConfig( batch_size=4, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgModel.FG_NONE, + fg_mode=data_pb2.FgMode.FG_NONE, label_fields=[], ), features=features, @@ -501,7 +501,7 @@ def _childern(code): data_config=data_pb2.DataConfig( batch_size=4, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgModel.FG_NONE, + fg_mode=data_pb2.FgMode.FG_NONE, label_fields=["label"], tdm_sampler=sampler_pb2.TDMSampler( item_input_path=node.name, diff --git a/tzrec/datasets/odps_dataset_test.py b/tzrec/datasets/odps_dataset_test.py index f6cea1f..69caf3b 100644 --- a/tzrec/datasets/odps_dataset_test.py +++ b/tzrec/datasets/odps_dataset_test.py @@ -160,7 +160,7 @@ def test_odps_dataset(self, is_orderby_partition): data_config=data_pb2.DataConfig( batch_size=8196, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgMode.FG_DAG, + fg_mode=FgMode.FG_DAG, label_fields=["label"], is_orderby_partition=is_orderby_partition, odps_data_quota_name="", @@ -226,7 +226,7 @@ def test_odps_dataset_with_sampler(self): data_config=data_pb2.DataConfig( batch_size=8196, dataset_type=data_pb2.DatasetType.OdpsDataset, - fg_mode=data_pb2.FgMode.FG_DAG, + fg_mode=FgMode.FG_DAG, label_fields=["label"], odps_data_quota_name="", negative_sampler=sampler_pb2.NegativeSampler( diff --git a/tzrec/features/sequence_feature.py b/tzrec/features/sequence_feature.py index d59a892..18ac53c 100644 --- a/tzrec/features/sequence_feature.py +++ b/tzrec/features/sequence_feature.py @@ -431,7 +431,7 @@ def _parse(self, input_data: Dict[str, pa.Array]) -> ParsedData: value_dim=self.config.value_dim, **self._fg_encoded_kwargs, ) - elif self.fg_mode == FgMode.FG_NONE: + elif self.fg_mode == FgMode.FG_NORMAL: input_feat = input_data[self.inputs[0]] if pa.types.is_list(input_feat.type): input_feat = input_feat.fill_null([]) diff --git a/tzrec/tests/utils.py b/tzrec/tests/utils.py index 791f35c..8cde1b7 100644 --- a/tzrec/tests/utils.py +++ b/tzrec/tests/utils.py @@ -1035,9 +1035,14 @@ def create_predict_data( pipeline_config = config_util.load_pipeline_config( os.path.join(pipeline_config_path) ) + data_config = pipeline_config.data_config + assert data_config.fg_mode in [ + FgMode.FG_NORMAL, + FgMode.FG_DAG, + ], "You should not use fg encoded data for input_path." features = create_features( pipeline_config.feature_configs, - fg_mode=pipeline_config.data_config.fg_mode, + fg_mode=data_config.fg_mode, ) user_inputs = [] for feature in features: @@ -1048,7 +1053,7 @@ def create_predict_data( reader = create_reader( input_path=pipeline_config.train_input_path, batch_size=batch_size, - quota_name=pipeline_config.data_config.odps_data_quota_name, + quota_name=data_config.odps_data_quota_name, ) infer_arrow = OrderedDict() diff --git a/tzrec/tools/create_fg_json.py b/tzrec/tools/create_fg_json.py index c7da4f7..b543925 100644 --- a/tzrec/tools/create_fg_json.py +++ b/tzrec/tools/create_fg_json.py @@ -11,6 +11,7 @@ import argparse +import copy import json import os @@ -60,6 +61,12 @@ default=True, help="if true will update fg.json.", ) + parser.add_argument( + "--remove_bucketizer", + type=bool, + default=False, + help="remove bucktizer params in fg json.", + ) parser.add_argument( "--debug", action="store_true", @@ -85,6 +92,14 @@ os.makedirs(args.fg_output_dir) fg_json = create_fg_json(features, asset_dir=args.fg_output_dir) + if args.remove_bucketizer: + fg_json = copy.copy(fg_json) + for feature in fg_json["features"]: + feature.pop("hash_bucket_size") + feature.pop("vocab_dict") + feature.pop("vocab_list") + feature.pop("boundaries") + if args.reserves is not None: reserves = [] for column in args.reserves.strip().split(","): diff --git a/tzrec/utils/config_util.py b/tzrec/utils/config_util.py index 96c2dec..608f12c 100644 --- a/tzrec/utils/config_util.py +++ b/tzrec/utils/config_util.py @@ -17,7 +17,7 @@ from google.protobuf import json_format, text_format from google.protobuf.message import Message -from tzrec.protos import pipeline_pb2 +from tzrec.protos import data_pb2, pipeline_pb2 from tzrec.protos.data_pb2 import FgMode from tzrec.utils.logging_util import logger @@ -75,7 +75,7 @@ def which_msg(config: Message, oneof_group: str) -> str: return getattr(config, config.WhichOneof(oneof_group)).__class__.__name__ -def _get_compatible_fg_mode(data_config: Message) -> FgMode: +def _get_compatible_fg_mode(data_config: data_pb2.DataConfig) -> FgMode: """Compat for fg_encoded.""" if data_config.HasField("fg_encoded"): logger.warning(