diff --git a/docs/source/quick_start/local_tutorial_tdm.md b/docs/source/quick_start/local_tutorial_tdm.md index 037bf5d..3dc8cd7 100644 --- a/docs/source/quick_start/local_tutorial_tdm.md +++ b/docs/source/quick_start/local_tutorial_tdm.md @@ -42,7 +42,7 @@ python -m tzrec.tools.tdm.init_tree \ --item_id_field adgroup_id \ --cate_id_field cate_id \ --attr_fields cate_id,campaign_id,customer,brand,price \ ---node_edge_output_file data/init_tree +--node_edge_output_file data/init_tree \ --tree_output_dir data/init_tree ``` @@ -81,7 +81,7 @@ torchrun --master_addr=localhost --master_port=32555 \ --nnodes=1 --nproc-per-node=8 --node_rank=0 \ -m tzrec.export \ --pipeline_config_path experiments/tdm_taobao_local/pipeline.config \ - --export_dir experiments/tdm_taobao_local/export + --export_dir experiments/tdm_taobao_local/export \ --asset_files data/init_tree/serving_tree ``` @@ -156,7 +156,7 @@ torchrun --master_addr=localhost --master_port=32555 \ --nnodes=1 --nproc-per-node=8 --node_rank=0 \ -m tzrec.export \ --pipeline_config_path experiments/tdm_taobao_local_learnt/pipeline.config \ - --export_dir experiments/tdm_taobao_local_learnt/export + --export_dir experiments/tdm_taobao_local_learnt/export \ --asset_files data/learnt_tree/serving_tree ``` diff --git a/tzrec/datasets/dataset.py b/tzrec/datasets/dataset.py index 11d1f8e..47b192b 100644 --- a/tzrec/datasets/dataset.py +++ b/tzrec/datasets/dataset.py @@ -95,79 +95,30 @@ def _expand_tdm_sample( expand label: [1, 1, 1, 1, 1, 1, 0, 0, 0, 0] """ - sampler_type = data_config.WhichOneof("sampler") - sampler_config = getattr(data_config, sampler_type) - tree_level = len(sampler_config.layer_num_sample) - num_all_layer_neg = sum(sampler_config.layer_num_sample) - layer_num_sample = sampler_config.layer_num_sample - item_fea_names = pos_sampled.keys() all_fea_names = input_data.keys() label_fields = set(data_config.label_fields) user_fea_names = all_fea_names - item_fea_names - label_fields - remain_ratio = sampler_config.remain_ratio - if remain_ratio < 1.0: - probability_type = sampler_config.probabilty_type - if probability_type == "UNIFORM": - p = np.array([1 / (tree_level - 1)] * (tree_level - 1)) - elif probability_type == "ARITHMETIC": - p = np.arange(1, tree_level) / sum(np.arange(1, tree_level)) - elif probability_type == "RECIPROCAL": - p = 1 / np.arange(tree_level - 1, 0, -1) - p = p / sum(p) - else: - raise ValueError( - f"probability_type: [{probability_type}]" "is not supported now." - ) - remain_layer = np.random.choice( - range(tree_level - 1), - int(remain_ratio * (tree_level - 1)), - replace=False, - p=p, - ) - remain_layer.sort() - else: - remain_layer = list(range(tree_level - 1)) - - num_remain_layer_neg = ( - sum([layer_num_sample[i] for i in remain_layer]) + layer_num_sample[-1] - ) for item_fea_name in item_fea_names: - batch_size = len(input_data[item_fea_name]) - pos_index = ( - (remain_layer[None, :] + (tree_level - 1) * np.arange(batch_size)[:, None]) - .flatten() - .astype(np.int64) - ) - neg_index = ( - ( - np.concatenate( - [ - range(sum(layer_num_sample[:i]), sum(layer_num_sample[: i + 1])) - for i in np.append(remain_layer, tree_level - 1) - ] - )[None, :] - + num_all_layer_neg * np.arange(batch_size)[:, None] - ) - .flatten() - .astype(np.int64) - ) input_data[item_fea_name] = pa.concat_arrays( [ input_data[item_fea_name], - pos_sampled[item_fea_name].take(pos_index), - neg_sampled[item_fea_name].take(neg_index), + pos_sampled[item_fea_name], + neg_sampled[item_fea_name], ] ) # In the sampling results, the sampled outcomes for each item are contiguous. + batch_size = len(input_data[list(label_fields)[0]]) + num_pos_sampled = len(pos_sampled[list(item_fea_names)[0]]) + num_neg_sampled = len(neg_sampled[list(item_fea_names)[0]]) + user_pos_index = np.repeat(np.arange(batch_size), num_pos_sampled // batch_size) + user_neg_index = np.repeat(np.arange(batch_size), num_neg_sampled // batch_size) for user_fea_name in user_fea_names: user_fea = input_data[user_fea_name] - pos_index = np.repeat(np.arange(len(user_fea)), len(remain_layer)) - neg_index = np.repeat(np.arange(len(user_fea)), num_remain_layer_neg) - pos_expand_user_fea = user_fea.take(pos_index) - neg_expand_user_fea = user_fea.take(neg_index) + pos_expand_user_fea = user_fea.take(user_pos_index) + neg_expand_user_fea = user_fea.take(user_neg_index) input_data[user_fea_name] = pa.concat_arrays( [ input_data[user_fea_name], @@ -180,14 +131,8 @@ def _expand_tdm_sample( input_data[label_field] = pa.concat_arrays( [ input_data[label_field].cast(pa.int64()), - pa.array( - [1] * (len(input_data[label_field]) * len(remain_layer)), - type=pa.int64(), - ), - pa.array( - [0] * (len(input_data[label_field]) * num_remain_layer_neg), - type=pa.int64(), - ), + pa.array([1] * num_pos_sampled, type=pa.int64()), + pa.array([0] * num_neg_sampled, type=pa.int64()), ] ) diff --git a/tzrec/datasets/dataset_test.py b/tzrec/datasets/dataset_test.py index e7eb611..e9bd06e 100644 --- a/tzrec/datasets/dataset_test.py +++ b/tzrec/datasets/dataset_test.py @@ -432,13 +432,15 @@ def test_dataset_with_tdm_sampler_and_remain_ratio(self): self._temp_files.append(node) node.write("id:int64\tweight:float\tattrs:string\n") for i in range(63): - node.write(f"{i}\t{1}\t{int(math.log(i+1,2))}:{i}:{i+1000}:我们{i}\n") + node.write(f"{i}\t{1}\t{int(math.log(i+1,2))}:{i}:{i+1000}:{i*2}\n") node.flush() - def _ancesstor(code): + def _ancestor(code): ancs = [] - while code > 0: + while True: code = int((code - 1) / 2) + if code <= 0: + break ancs.append(code) return ancs @@ -446,7 +448,7 @@ def _ancesstor(code): self._temp_files.append(edge) edge.write("src_id:int64\tdst_id:int\tweight:float\n") for i in range(31, 63): - for anc in _ancesstor(i): + for anc in _ancestor(i): edge.write(f"{i}\t{anc}\t{1.0}\n") edge.flush() @@ -488,9 +490,7 @@ def _childern(code): raw_feature=feature_pb2.RawFeature(feature_name="float_d") ), ] - features = create_features( - feature_cfgs, neg_fields=["int_a", "float_b", "str_c"] - ) + features = create_features(feature_cfgs) dataset = _TestDataset( data_config=data_pb2.DataConfig( @@ -498,13 +498,13 @@ def _childern(code): dataset_type=data_pb2.DatasetType.OdpsDataset, fg_encoded=True, label_fields=["label"], - negative_sampler=sampler_pb2.TDMSampler( - input_input_path=node.name, + tdm_sampler=sampler_pb2.TDMSampler( + item_input_path=node.name, edge_input_path=edge.name, predict_edge_input_path=predict_edge.name, attr_fields=["tree_level", "int_a", "float_b", "str_c"], item_id_field="int_a", - layer_num_sample=[1, 1, 1, 1, 1, 5], + layer_num_sample=[0, 1, 1, 1, 1, 5], field_delimiter=",", remain_ratio=0.4, probability_type="UNIFORM", diff --git a/tzrec/datasets/sampler.py b/tzrec/datasets/sampler.py index c41a199..4ca7092 100644 --- a/tzrec/datasets/sampler.py +++ b/tzrec/datasets/sampler.py @@ -695,21 +695,40 @@ def __init__( self._item_id_field = config.item_id_field self._max_level = len(config.layer_num_sample) self._layer_num_sample = config.layer_num_sample + assert self._layer_num_sample[0] == 0, "sample num of tree root must be 0" self._last_layer_num_sample = config.layer_num_sample[-1] self._pos_sampler = None self._neg_sampler_list = [] + self._remain_ratio = config.remain_ratio + if self._remain_ratio < 1.0: + if config.probability_type == "UNIFORM": + p = np.array([1 / (self._max_level - 2)] * (self._max_level - 2)) + elif config.probability_type == "ARITHMETIC": + p = np.arange(1, self._max_level - 1) / sum( + np.arange(1, self._max_level - 1) + ) + elif config.probability_type == "RECIPROCAL": + p = 1 / np.arange(self._max_level - 2, 0, -1) + p = p / sum(p) + else: + raise ValueError( + f"probability_type: [{config.probability_type}]" + "is not supported now." + ) + self._remain_p = p + def init(self) -> None: """Init sampler client and samplers.""" super().init() self._pos_sampler = self._g.neighbor_sampler( meta_path=["ancestor"], - expand_factor=self._max_level - 1, + expand_factor=self._max_level - 2, strategy="random_without_replacement", ) # TODO: only use one conditional smapler - for i in range(self._max_level): + for i in range(1, self._max_level): self._neg_sampler_list.append( self._g.negative_sampler( "item", @@ -745,6 +764,8 @@ def get(self, input_data: Dict[str, pa.Array]) -> Dict[str, pa.Array]: .to_numpy() .reshape(-1, 1) ) + batch_size = len(ids) + num_fea = len(self._attr_names[1:]) # positive node. pos_nodes = self._pos_sampler.get(ids).layer_nodes(1) @@ -752,42 +773,69 @@ def get(self, input_data: Dict[str, pa.Array]) -> Dict[str, pa.Array]: # the ids of non-leaf nodes is arranged in ascending order. pos_non_leaf_ids = np.sort(pos_nodes.ids, axis=1) pos_ids = np.concatenate((pos_non_leaf_ids, ids), axis=1) - pos_fea_result = self._parse_nodes(pos_nodes)[1:] - pos_result_dict = dict(zip(self._attr_names[1:], pos_fea_result)) + + # randomly select layers to keep + if self._remain_ratio < 1.0: + remain_layer = np.random.choice( + range(1, self._max_level - 1), + int(round(self._remain_ratio * (self._max_level - 2))), + replace=False, + p=self._remain_p, + ) + else: + remain_layer = np.array(range(1, self._max_level - 1)) + remain_layer.sort() + + if self._remain_ratio < 1.0: + pos_fea_index = np.concatenate( + [ + remain_layer - 1 + j * (self._max_level - 2) + for j in range(batch_size) + ] + ) + pos_fea_result = [ + pos_fea_result[i].take(pos_fea_index) for i in range(num_fea) + ] # negative sample layer by layer. neg_fea_layer = [] - for i in range(1, self._max_level): - neg_nodes = self._neg_sampler_list[i].get(pos_ids[:, i], pos_ids[:, i]) + for i in np.append(remain_layer, self._max_level - 1): + neg_nodes = self._neg_sampler_list[i - 1].get( + pos_ids[:, i - 1], pos_ids[:, i - 1] + ) features = self._parse_nodes(neg_nodes)[1:] neg_fea_layer.append(features) # concatenate the features of each layer and # ensure that the negative sample features of the same user are adjacent. neg_fea_result = [] - num_fea = len(neg_fea_layer[0]) - batch_size = len(ids) - cum_layer_num = np.cumsum([0] + self._layer_num_sample[:-1]) - same_user_index = np.concatenate( + cum_layer_num = np.cumsum( + [0] + + [ + self._layer_num_sample[i] if i in remain_layer else 0 + for i in range(self._max_level - 1) + ] + ) + neg_fea_index = np.concatenate( [ np.concatenate( [ np.arange(self._layer_num_sample[i]) + j * self._layer_num_sample[i] + batch_size * cum_layer_num[i] - for i in range(self._max_level) + for i in np.append(remain_layer, self._max_level - 1) ] ) for j in range(batch_size) ] ) neg_fea_result = [ - pa.concat_arrays([array[i] for array in neg_fea_layer]).take( - same_user_index - ) + pa.concat_arrays([array[i] for array in neg_fea_layer]).take(neg_fea_index) for i in range(num_fea) ] + + pos_result_dict = dict(zip(self._attr_names[1:], pos_fea_result)) neg_result_dict = dict(zip(self._attr_names[1:], neg_fea_result)) return pos_result_dict, neg_result_dict diff --git a/tzrec/datasets/sampler_test.py b/tzrec/datasets/sampler_test.py index f2ec81d..c7b861f 100644 --- a/tzrec/datasets/sampler_test.py +++ b/tzrec/datasets/sampler_test.py @@ -88,10 +88,12 @@ def _create_item_gl_data_for_tdm(self): return f def _create_edge_gl_data_for_tdm(self): - def _ancesstor(code): + def _ancestor(code): ancs = [] - while code > 0: + while True: code = int((code - 1) / 2) + if code <= 0: + break ancs.append(code) return ancs @@ -99,7 +101,7 @@ def _ancesstor(code): self._temp_files.append(f) f.write("src_id:int64\tdst_id:int\tweight:float\n") for i in range(31, 63): - for anc in _ancesstor(i): + for anc in _ancestor(i): f.write(f"{i}\t{anc}\t{1.0}\n") f.flush() return f @@ -227,7 +229,6 @@ def _sampler_worker( procs.append(p) for i, p in enumerate(procs): p.join() - print(f"{local_rank}, {group_rank} done.") if p.exitcode != 0: raise RuntimeError(f"client-{i} of worker-{rank} failed.") @@ -456,9 +457,9 @@ def _sampler_worker(pos_res, neg_res): p.join() if p.exitcode != 0: raise RuntimeError("worker failed.") - self.assertEqual(len(pos_res["int_a"]), 4 * 5) - self.assertEqual(len(pos_res["float_b"]), 4 * 5) - self.assertEqual(len(pos_res["str_c"]), 4 * 5) + self.assertEqual(len(pos_res["int_a"]), 4 * 4) + self.assertEqual(len(pos_res["float_b"]), 4 * 4) + self.assertEqual(len(pos_res["str_c"]), 4 * 4) self.assertEqual(len(neg_res["int_a"]), 4 * 15) self.assertEqual(len(neg_res["float_b"]), 4 * 15) self.assertEqual(len(neg_res["str_c"]), 4 * 15) diff --git a/tzrec/protos/sampler.proto b/tzrec/protos/sampler.proto index 2350a7c..33d69dc 100644 --- a/tzrec/protos/sampler.proto +++ b/tzrec/protos/sampler.proto @@ -135,5 +135,5 @@ message TDMSampler { optional float remain_ratio = 10 [default=1.0]; // The type of probability for selecting and retaining // each layer in the middle layers of the tree - optional string probabilty_type = 11 [default="UNIFORM"]; + optional string probability_type = 11 [default="UNIFORM"]; } diff --git a/tzrec/tools/tdm/gen_tree/tree_search_util.py b/tzrec/tools/tdm/gen_tree/tree_search_util.py index ce9984a..e2cac31 100644 --- a/tzrec/tools/tdm/gen_tree/tree_search_util.py +++ b/tzrec/tools/tdm/gen_tree/tree_search_util.py @@ -145,7 +145,8 @@ def save(self) -> None: dst_ids = [] weight = [] for travel in self.travel_list: - for i in range(self.max_level): + # do not include edge from leaf to root + for i in range(self.max_level - 1): src_ids.append(travel[0]) dst_ids.append(travel[i + 1]) weight.append(1.0) @@ -173,7 +174,8 @@ def save(self) -> None: with open(os.path.join(self.output_file, "edge_table.txt"), "w") as f: f.write("src_id:int64\tdst_id:int64\tweight:float\n") for travel in self.travel_list: - for i in range(self.max_level): + # do not include edge from leaf to root + for i in range(self.max_level - 1): f.write(f"{travel[0]}\t{travel[i+1]}\t{1.0}\n") def save_predict_edge(self) -> None: diff --git a/tzrec/tools/tdm/gen_tree/tree_search_util_test.py b/tzrec/tools/tdm/gen_tree/tree_search_util_test.py index ebe276b..163325c 100644 --- a/tzrec/tools/tdm/gen_tree/tree_search_util_test.py +++ b/tzrec/tools/tdm/gen_tree/tree_search_util_test.py @@ -73,7 +73,7 @@ def test_tree_search(self) -> None: serving_tree.append(line) self.assertEqual(len(node_table), 14) - self.assertEqual(len(edge_table), 19) + self.assertEqual(len(edge_table), 13) self.assertEqual(len(predict_edge_table), 13) self.assertEqual(len(serving_tree), 14)