Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tiankongdeguiji committed Dec 13, 2024
1 parent 4d25a8f commit 02010b9
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 59 deletions.
115 changes: 70 additions & 45 deletions docs/source/feature/data.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ sample_weight_fields: 'col_name'
- 暂不支持没有header的csv文件
- csv格式数据读性能有瓶颈
### fg_encoded
<!-- 输入数据是否为FG(Feature Generator)进行特征编码后的数据 -->
- 输入数据是否为FG(Feature Generator)进行特征编码后的数据
### fg_mode
- FG(Feature Generator) 的运行模式,支持FG_DAG, FG_NONE, FG_BUCKETIZE, FG_NORMAL
- FG是进入模型推理前的一层特征变换,可以保证离在线特征变换的一致性,特征变换包含Combo/Lookup/Match/Expr等类型,详见[特征](./feature.md)章节。以LookupFeature的一种配置为例,特征变换为从`cate_map`中用`cate_key`查出值后,用`boundaries`进行分箱再进入模型推理
Expand Down Expand Up @@ -134,57 +136,80 @@ sample_weight_fields: 'col_name'
}
```
- **fg_encoded=false:**
#### fg_mode=FG_DAG
- 训练时会在Dataset中执行FG,数据列名与各**特征的FG所依赖字段来源**同名,详见[特征](./feature.md),Dataset会自动分析所有特征依赖的字段来源来读取数据。
- 以上文LookupFeature为例,**特征FG所依赖字段来源**为`[cate_map, cate_key]`,Dataset会从输入表中读取名为`cate_map`和`cate_key`的列来做FG得到`lookup_feat`
- 该模式可以帮忙我们快速验证FG的训练效果,调优FG的配置,但由于训练时多了FG的过程,训练速度会受到一定程度的影响
- 训练时会在Dataset中执行FG,数据列名与各**特征的FG所依赖字段来源**同名,详见[特征](./feature.md),Dataset会自动分析所有特征依赖的字段来源来读取数据。
- 以上文LookupFeature为例,**特征FG所依赖字段来源**为`[cate_map, cate_key]`,Dataset会从输入表中读取名为`cate_map`和`cate_key`的列来做FG得到`lookup_feat`
- 该模式可以帮忙我们快速验证FG的训练效果,调优FG的配置,但由于训练时多了FG的过程,训练速度会受到一定程度的影响
- **fg_encoded=true:**
#### fg_mode=FG_NORMAL
- 则认为输入数据为编码后的数据,数据列名与**特征名**(`feature_name`)同名,Dataset会自动分析所有特征的特征名来读取数据
- 以上文LookupFeature为例,**特征名**为`lookup_feat`,Dataset会从输入表中直接读取编码后的`lookup_feat`列直接进行模型推理
- 该模式训练速度最佳,但需提前对数据提前进行FG编码,目前仅提供MaxCompute方式,步骤如下:
- 在DLC/DSW/Local环境中生成fg json配置,上传至Dataworks的资源中,如果fg_output_dir中有vocab_file等其他文件,也需要上传至资源中
```shell
python -m tzrec.tools.create_fg_json \
--pipeline_config_path ${PIPELINE_CONFIG_PATH} \
--fg_output_dir fg_output
--reserves ${COLS_YOU_WANT_RESERVE}
```
- --pipeline_config_path: 模型配置文件。
- --fg_output_dir: fg json的输出文件夹。
- --reserves: 需要透传到输出表的列,列名用逗号分隔。一般需要保留Label列,也可以保留request_id,user_id,item_id列,注意:如果模型的feature_config中有user_id,item_id作为特征,feature_name需避免与样本中的user_id,item_id列名冲突。
- 在[Dataworks](https://workbench.data.aliyun.com/)的独享资源组中安装pyfg,「资源组列表」- 在一个调度资源组的「操作」栏 点「运维助手」-「创建命令」(选手动输入)-「运行命令」
```shell
/home/tops/bin/pip3 install http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg037-0.3.7-cp37-cp37m-linux_x86_64.whl
```
- 在Dataworks中建立`PyODPS 3`节点运行FG,节点调度参数中配置好bizdate参数
```
from pyfg037 import offline_pyfg
offline_pyfg.run(
o,
input_table="YOU_PROJECT.TABLE_NAME",
output_table="YOU_PROJECT.TABLE_NAME",
fg_json_file="YOU_FG_FILE_NAME",
partition_value=args['bizdate']
)
```
- 训练时会在Dataset中执行FG,但不是以DAG方式运行。因此特征的输入中如果有`feature` side的输入,也需要在输入表中。目前更建议使用`FG_DAG`模式
| 参数 | 默认值 | 说明 |
| ------------------------- | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| input_table | 无 | 输入表 |
| output_table | 无 | 输出表,会自动创建 |
| fg_json_file | 无 | FG 配置文件,json 格式 |
| partition_value | 无 | 指定输入表的分区作为 FG 的输入,可支持多分区表,以逗号分隔 |
| force_delete_output_table | False | 是否删除输出表,设置为 True 时会先自动删除输出表, 再运行任务 |
| force_update_resource | False | 是否更新资源,设置为 True 时会先自动更新资源, 再运行任务 |
| set_sql | 无 | 任务执行的[flag](https://help.aliyun.com/zh/maxcompute/user-guide/flag-parameters?spm=a2c4g.11186623.0.0.383a20d9CQnpaR#concept-2278178),如set odps.stage.mapper.split.size=32;,注意需要以分号结尾 |
#### fg_mode=FG_NONE
- 训练时不会在Dataset中执行FG,输入数据为Fg编码后的数据,数据列名与**特征名**(`feature_name`)同名,Dataset会自动分析所有特征的特征名来读取数据
- 以上文LookupFeature为例,**特征名**为`lookup_feat`,Dataset会从输入表中直接读取编码后的`lookup_feat`列直接进行模型训练和推理
- 该模式训练速度最佳,但需提前对数据提前进行FG编码,目前仅提供MaxCompute方式,步骤如下:
- 在DLC/DSW/Local环境中生成fg json配置,上传至Dataworks的资源中,如果fg_output_dir中有vocab_file等其他文件,也需要上传至资源中
```shell
python -m tzrec.tools.create_fg_json \
--pipeline_config_path ${PIPELINE_CONFIG_PATH} \
--fg_output_dir fg_output \
--reserves ${COLS_YOU_WANT_RESERVE}
```
- --pipeline_config_path: 模型配置文件。
- --fg_output_dir: fg json的输出文件夹。
- --reserves: 需要透传到输出表的列,列名用逗号分隔。一般需要保留Label列,也可以保留request_id,user_id,item_id列,注意:如果模型的feature_config中有user_id,item_id作为特征,feature_name需避免与样本中的user_id,item_id列名冲突。
- 在[Dataworks](https://workbench.data.aliyun.com/)的独享资源组中安装pyfg,「资源组列表」- 在一个调度资源组的「操作」栏 点「运维助手」-「创建命令」(选手动输入)-「运行命令」
```shell
/home/tops/bin/pip3 install http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg039-0.3.9-cp37-cp37m-linux_x86_64.whl
```
- 在Dataworks中建立`PyODPS 3`节点运行FG,节点调度参数中配置好bizdate参数
```
from pyfg039 import offline_pyfg
offline_pyfg.run(
o,
input_table="YOU_PROJECT.TABLE_NAME",
output_table="YOU_PROJECT.TABLE_NAME",
fg_json_file="YOU_FG_FILE_NAME",
partition_value=args['bizdate']
)
```
| 参数 | 默认值 | 说明 |
| ------------------------- | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| input_table | 无 | 输入表 |
| output_table | 无 | 输出表,会自动创建 |
| fg_json_file | 无 | FG 配置文件,json 格式 |
| partition_value | 无 | 指定输入表的分区作为 FG 的输入,可支持多分区表,以逗号分隔 |
| force_delete_output_table | False | 是否删除输出表,设置为 True 时会先自动删除输出表, 再运行任务 |
| force_update_resource | False | 是否更新资源,设置为 True 时会先自动更新资源, 再运行任务 |
| set_sql | 无 | 任务执行的[flag](https://help.aliyun.com/zh/maxcompute/user-guide/flag-parameters?spm=a2c4g.11186623.0.0.383a20d9CQnpaR#concept-2278178),如set odps.stage.mapper.split.size=32;,注意需要以分号结尾 |
#### fg_mode=FG_BUCKETIZE
- 训练时在Dataset中执行FG的Bucketize部分,输入数据为Fg编码但未进行Bucketize的数据,Bucketize配置包含`hash_bucket_size`,`boundaries`,`vocab_dict`,`vocab_list`
- 数据列名与**特征名**(`feature_name`)同名,Dataset会自动分析所有特征的特征名来读取数据
- 以上文LookupFeature为例,**特征名**为`lookup_feat`,Dataset会从输入表中直接读取编码后的`lookup_feat`列直接进行模型训练和推理
- 该模式训练速度介于`FG_DAG`和`FG_NONE`之间,适用于在需要统计Bucketize前的数据分布来设置合适的Bucketize参数的情况,可以避免离线提前跑两次Fg编码
- 注意:在这种模式下,对数据提前进行FG编码时,使用的fg json配置不应该包含Bucketize配置,可以在`create_fg_json`时增加`--remove_bucketizer`参数来去除fg json配置中的Bucketize配置
```shell
python -m tzrec.tools.create_fg_json \
--pipeline_config_path ${PIPELINE_CONFIG_PATH} \
--fg_output_dir fg_output \
--reserves ${COLS_YOU_WANT_RESERVE} \
--remove_bucketizer
```

### fg_threads

- 每个dataloader worker上fg的运行线程数,默认为1,`nproc-per-node * num_workers * fg_threads`建议小于单机CPU核数
- 如果`fg_threads==0`,将不会以DAG模式运行fg,特征的输入中如果有`feature` side的输入,也需要在输入表中

### label_fields

Expand Down
2 changes: 1 addition & 1 deletion tzrec/datasets/data_parser_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DataParserTest(unittest.TestCase):
def tearDown(self):
os.environ.pop("INPUT_TILE", None)

def test_fg_none(self):
def test_nofg(self):
feature_cfgs = [
feature_pb2.FeatureConfig(
id_feature=feature_pb2.IdFeature(
Expand Down
12 changes: 6 additions & 6 deletions tzrec/datasets/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_dataset(self):
data_config=data_pb2.DataConfig(
batch_size=4,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgModel.FG_NONE,
fg_mode=data_pb2.FgMode.FG_NONE,
label_fields=["label"],
),
features=features,
Expand Down Expand Up @@ -189,7 +189,7 @@ def test_dataset_with_sampler(self, force_base_data_group):
data_config=data_pb2.DataConfig(
batch_size=4,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgModel.FG_NONE,
fg_mode=data_pb2.FgMode.FG_NONE,
label_fields=["label"],
negative_sampler=sampler_pb2.NegativeSampler(
input_path=f.name,
Expand Down Expand Up @@ -284,7 +284,7 @@ def test_dataset_with_sample_mask(self):
data_config=data_pb2.DataConfig(
batch_size=32,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgModel.FG_NONE,
fg_mode=data_pb2.FgMode.FG_NONE,
label_fields=["label"],
sample_mask_prob=0.4,
),
Expand Down Expand Up @@ -348,7 +348,7 @@ def test_dataset_with_neg_sample_mask(self):
data_config=data_pb2.DataConfig(
batch_size=32,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgModel.FG_NONE,
fg_mode=data_pb2.FgMode.FG_NONE,
label_fields=["label"],
negative_sample_mask_prob=0.4,
negative_sampler=sampler_pb2.NegativeSampler(
Expand Down Expand Up @@ -408,7 +408,7 @@ def test_dataset_predict_mode(self, debug_level):
data_config=data_pb2.DataConfig(
batch_size=4,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgModel.FG_NONE,
fg_mode=data_pb2.FgMode.FG_NONE,
label_fields=[],
),
features=features,
Expand Down Expand Up @@ -501,7 +501,7 @@ def _childern(code):
data_config=data_pb2.DataConfig(
batch_size=4,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgModel.FG_NONE,
fg_mode=data_pb2.FgMode.FG_NONE,
label_fields=["label"],
tdm_sampler=sampler_pb2.TDMSampler(
item_input_path=node.name,
Expand Down
4 changes: 2 additions & 2 deletions tzrec/datasets/odps_dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_odps_dataset(self, is_orderby_partition):
data_config=data_pb2.DataConfig(
batch_size=8196,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgMode.FG_DAG,
fg_mode=FgMode.FG_DAG,
label_fields=["label"],
is_orderby_partition=is_orderby_partition,
odps_data_quota_name="",
Expand Down Expand Up @@ -226,7 +226,7 @@ def test_odps_dataset_with_sampler(self):
data_config=data_pb2.DataConfig(
batch_size=8196,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_mode=data_pb2.FgMode.FG_DAG,
fg_mode=FgMode.FG_DAG,
label_fields=["label"],
odps_data_quota_name="",
negative_sampler=sampler_pb2.NegativeSampler(
Expand Down
2 changes: 1 addition & 1 deletion tzrec/features/sequence_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def _parse(self, input_data: Dict[str, pa.Array]) -> ParsedData:
value_dim=self.config.value_dim,
**self._fg_encoded_kwargs,
)
elif self.fg_mode == FgMode.FG_NONE:
elif self.fg_mode == FgMode.FG_NORMAL:
input_feat = input_data[self.inputs[0]]
if pa.types.is_list(input_feat.type):
input_feat = input_feat.fill_null([])
Expand Down
9 changes: 7 additions & 2 deletions tzrec/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,9 +1035,14 @@ def create_predict_data(
pipeline_config = config_util.load_pipeline_config(
os.path.join(pipeline_config_path)
)
data_config = pipeline_config.data_config
assert data_config.fg_mode in [
FgMode.FG_NORMAL,
FgMode.FG_DAG,
], "You should not use fg encoded data for input_path."
features = create_features(
pipeline_config.feature_configs,
fg_mode=pipeline_config.data_config.fg_mode,
fg_mode=data_config.fg_mode,
)
user_inputs = []
for feature in features:
Expand All @@ -1048,7 +1053,7 @@ def create_predict_data(
reader = create_reader(
input_path=pipeline_config.train_input_path,
batch_size=batch_size,
quota_name=pipeline_config.data_config.odps_data_quota_name,
quota_name=data_config.odps_data_quota_name,
)

infer_arrow = OrderedDict()
Expand Down
15 changes: 15 additions & 0 deletions tzrec/tools/create_fg_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


import argparse
import copy
import json
import os

Expand Down Expand Up @@ -60,6 +61,12 @@
default=True,
help="if true will update fg.json.",
)
parser.add_argument(
"--remove_bucketizer",
type=bool,
default=False,
help="remove bucktizer params in fg json.",
)
parser.add_argument(
"--debug",
action="store_true",
Expand All @@ -85,6 +92,14 @@
os.makedirs(args.fg_output_dir)

fg_json = create_fg_json(features, asset_dir=args.fg_output_dir)
if args.remove_bucketizer:
fg_json = copy.copy(fg_json)
for feature in fg_json["features"]:
feature.pop("hash_bucket_size")
feature.pop("vocab_dict")
feature.pop("vocab_list")
feature.pop("boundaries")

if args.reserves is not None:
reserves = []
for column in args.reserves.strip().split(","):
Expand Down
4 changes: 2 additions & 2 deletions tzrec/utils/config_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from google.protobuf import json_format, text_format
from google.protobuf.message import Message

from tzrec.protos import pipeline_pb2
from tzrec.protos import data_pb2, pipeline_pb2
from tzrec.protos.data_pb2 import FgMode
from tzrec.utils.logging_util import logger

Expand Down Expand Up @@ -75,7 +75,7 @@ def which_msg(config: Message, oneof_group: str) -> str:
return getattr(config, config.WhichOneof(oneof_group)).__class__.__name__


def _get_compatible_fg_mode(data_config: Message) -> FgMode:
def _get_compatible_fg_mode(data_config: data_pb2.DataConfig) -> FgMode:
"""Compat for fg_encoded."""
if data_config.HasField("fg_encoded"):
logger.warning(
Expand Down

0 comments on commit 02010b9

Please sign in to comment.