Skip to content

Commit

Permalink
[feat] refactor remain ratio to TDMSampler and remove root node in po…
Browse files Browse the repository at this point in the history
…sitive sampled data (#5)
  • Loading branch information
tiankongdeguiji authored Oct 9, 2024
1 parent 2faf03d commit e25bc04
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 104 deletions.
6 changes: 3 additions & 3 deletions docs/source/quick_start/local_tutorial_tdm.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ python -m tzrec.tools.tdm.init_tree \
--item_id_field adgroup_id \
--cate_id_field cate_id \
--attr_fields cate_id,campaign_id,customer,brand,price \
--node_edge_output_file data/init_tree
--node_edge_output_file data/init_tree \
--tree_output_dir data/init_tree
```

Expand Down Expand Up @@ -81,7 +81,7 @@ torchrun --master_addr=localhost --master_port=32555 \
--nnodes=1 --nproc-per-node=8 --node_rank=0 \
-m tzrec.export \
--pipeline_config_path experiments/tdm_taobao_local/pipeline.config \
--export_dir experiments/tdm_taobao_local/export
--export_dir experiments/tdm_taobao_local/export \
--asset_files data/init_tree/serving_tree
```

Expand Down Expand Up @@ -156,7 +156,7 @@ torchrun --master_addr=localhost --master_port=32555 \
--nnodes=1 --nproc-per-node=8 --node_rank=0 \
-m tzrec.export \
--pipeline_config_path experiments/tdm_taobao_local_learnt/pipeline.config \
--export_dir experiments/tdm_taobao_local_learnt/export
--export_dir experiments/tdm_taobao_local_learnt/export \
--asset_files data/learnt_tree/serving_tree
```

Expand Down
77 changes: 11 additions & 66 deletions tzrec/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,79 +95,30 @@ def _expand_tdm_sample(
expand label: [1, 1, 1, 1, 1, 1, 0, 0, 0, 0]
"""
sampler_type = data_config.WhichOneof("sampler")
sampler_config = getattr(data_config, sampler_type)
tree_level = len(sampler_config.layer_num_sample)
num_all_layer_neg = sum(sampler_config.layer_num_sample)
layer_num_sample = sampler_config.layer_num_sample

item_fea_names = pos_sampled.keys()
all_fea_names = input_data.keys()
label_fields = set(data_config.label_fields)
user_fea_names = all_fea_names - item_fea_names - label_fields

remain_ratio = sampler_config.remain_ratio
if remain_ratio < 1.0:
probability_type = sampler_config.probabilty_type
if probability_type == "UNIFORM":
p = np.array([1 / (tree_level - 1)] * (tree_level - 1))
elif probability_type == "ARITHMETIC":
p = np.arange(1, tree_level) / sum(np.arange(1, tree_level))
elif probability_type == "RECIPROCAL":
p = 1 / np.arange(tree_level - 1, 0, -1)
p = p / sum(p)
else:
raise ValueError(
f"probability_type: [{probability_type}]" "is not supported now."
)
remain_layer = np.random.choice(
range(tree_level - 1),
int(remain_ratio * (tree_level - 1)),
replace=False,
p=p,
)
remain_layer.sort()
else:
remain_layer = list(range(tree_level - 1))

num_remain_layer_neg = (
sum([layer_num_sample[i] for i in remain_layer]) + layer_num_sample[-1]
)
for item_fea_name in item_fea_names:
batch_size = len(input_data[item_fea_name])
pos_index = (
(remain_layer[None, :] + (tree_level - 1) * np.arange(batch_size)[:, None])
.flatten()
.astype(np.int64)
)
neg_index = (
(
np.concatenate(
[
range(sum(layer_num_sample[:i]), sum(layer_num_sample[: i + 1]))
for i in np.append(remain_layer, tree_level - 1)
]
)[None, :]
+ num_all_layer_neg * np.arange(batch_size)[:, None]
)
.flatten()
.astype(np.int64)
)
input_data[item_fea_name] = pa.concat_arrays(
[
input_data[item_fea_name],
pos_sampled[item_fea_name].take(pos_index),
neg_sampled[item_fea_name].take(neg_index),
pos_sampled[item_fea_name],
neg_sampled[item_fea_name],
]
)

# In the sampling results, the sampled outcomes for each item are contiguous.
batch_size = len(input_data[list(label_fields)[0]])
num_pos_sampled = len(pos_sampled[list(item_fea_names)[0]])
num_neg_sampled = len(neg_sampled[list(item_fea_names)[0]])
user_pos_index = np.repeat(np.arange(batch_size), num_pos_sampled // batch_size)
user_neg_index = np.repeat(np.arange(batch_size), num_neg_sampled // batch_size)
for user_fea_name in user_fea_names:
user_fea = input_data[user_fea_name]
pos_index = np.repeat(np.arange(len(user_fea)), len(remain_layer))
neg_index = np.repeat(np.arange(len(user_fea)), num_remain_layer_neg)
pos_expand_user_fea = user_fea.take(pos_index)
neg_expand_user_fea = user_fea.take(neg_index)
pos_expand_user_fea = user_fea.take(user_pos_index)
neg_expand_user_fea = user_fea.take(user_neg_index)
input_data[user_fea_name] = pa.concat_arrays(
[
input_data[user_fea_name],
Expand All @@ -180,14 +131,8 @@ def _expand_tdm_sample(
input_data[label_field] = pa.concat_arrays(
[
input_data[label_field].cast(pa.int64()),
pa.array(
[1] * (len(input_data[label_field]) * len(remain_layer)),
type=pa.int64(),
),
pa.array(
[0] * (len(input_data[label_field]) * num_remain_layer_neg),
type=pa.int64(),
),
pa.array([1] * num_pos_sampled, type=pa.int64()),
pa.array([0] * num_neg_sampled, type=pa.int64()),
]
)

Expand Down
20 changes: 10 additions & 10 deletions tzrec/datasets/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,21 +432,23 @@ def test_dataset_with_tdm_sampler_and_remain_ratio(self):
self._temp_files.append(node)
node.write("id:int64\tweight:float\tattrs:string\n")
for i in range(63):
node.write(f"{i}\t{1}\t{int(math.log(i+1,2))}:{i}:{i+1000}:我们{i}\n")
node.write(f"{i}\t{1}\t{int(math.log(i+1,2))}:{i}:{i+1000}:{i*2}\n")
node.flush()

def _ancesstor(code):
def _ancestor(code):
ancs = []
while code > 0:
while True:
code = int((code - 1) / 2)
if code <= 0:
break
ancs.append(code)
return ancs

edge = tempfile.NamedTemporaryFile("w")
self._temp_files.append(edge)
edge.write("src_id:int64\tdst_id:int\tweight:float\n")
for i in range(31, 63):
for anc in _ancesstor(i):
for anc in _ancestor(i):
edge.write(f"{i}\t{anc}\t{1.0}\n")
edge.flush()

Expand Down Expand Up @@ -488,23 +490,21 @@ def _childern(code):
raw_feature=feature_pb2.RawFeature(feature_name="float_d")
),
]
features = create_features(
feature_cfgs, neg_fields=["int_a", "float_b", "str_c"]
)
features = create_features(feature_cfgs)

dataset = _TestDataset(
data_config=data_pb2.DataConfig(
batch_size=4,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_encoded=True,
label_fields=["label"],
negative_sampler=sampler_pb2.TDMSampler(
input_input_path=node.name,
tdm_sampler=sampler_pb2.TDMSampler(
item_input_path=node.name,
edge_input_path=edge.name,
predict_edge_input_path=predict_edge.name,
attr_fields=["tree_level", "int_a", "float_b", "str_c"],
item_id_field="int_a",
layer_num_sample=[1, 1, 1, 1, 1, 5],
layer_num_sample=[0, 1, 1, 1, 1, 5],
field_delimiter=",",
remain_ratio=0.4,
probability_type="UNIFORM",
Expand Down
76 changes: 62 additions & 14 deletions tzrec/datasets/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,21 +695,40 @@ def __init__(
self._item_id_field = config.item_id_field
self._max_level = len(config.layer_num_sample)
self._layer_num_sample = config.layer_num_sample
assert self._layer_num_sample[0] == 0, "sample num of tree root must be 0"
self._last_layer_num_sample = config.layer_num_sample[-1]
self._pos_sampler = None
self._neg_sampler_list = []

self._remain_ratio = config.remain_ratio
if self._remain_ratio < 1.0:
if config.probability_type == "UNIFORM":
p = np.array([1 / (self._max_level - 2)] * (self._max_level - 2))
elif config.probability_type == "ARITHMETIC":
p = np.arange(1, self._max_level - 1) / sum(
np.arange(1, self._max_level - 1)
)
elif config.probability_type == "RECIPROCAL":
p = 1 / np.arange(self._max_level - 2, 0, -1)
p = p / sum(p)
else:
raise ValueError(
f"probability_type: [{config.probability_type}]"
"is not supported now."
)
self._remain_p = p

def init(self) -> None:
"""Init sampler client and samplers."""
super().init()
self._pos_sampler = self._g.neighbor_sampler(
meta_path=["ancestor"],
expand_factor=self._max_level - 1,
expand_factor=self._max_level - 2,
strategy="random_without_replacement",
)

# TODO: only use one conditional smapler
for i in range(self._max_level):
for i in range(1, self._max_level):
self._neg_sampler_list.append(
self._g.negative_sampler(
"item",
Expand Down Expand Up @@ -745,49 +764,78 @@ def get(self, input_data: Dict[str, pa.Array]) -> Dict[str, pa.Array]:
.to_numpy()
.reshape(-1, 1)
)
batch_size = len(ids)
num_fea = len(self._attr_names[1:])

# positive node.
pos_nodes = self._pos_sampler.get(ids).layer_nodes(1)

# the ids of non-leaf nodes is arranged in ascending order.
pos_non_leaf_ids = np.sort(pos_nodes.ids, axis=1)
pos_ids = np.concatenate((pos_non_leaf_ids, ids), axis=1)

pos_fea_result = self._parse_nodes(pos_nodes)[1:]
pos_result_dict = dict(zip(self._attr_names[1:], pos_fea_result))

# randomly select layers to keep
if self._remain_ratio < 1.0:
remain_layer = np.random.choice(
range(1, self._max_level - 1),
int(round(self._remain_ratio * (self._max_level - 2))),
replace=False,
p=self._remain_p,
)
else:
remain_layer = np.array(range(1, self._max_level - 1))
remain_layer.sort()

if self._remain_ratio < 1.0:
pos_fea_index = np.concatenate(
[
remain_layer - 1 + j * (self._max_level - 2)
for j in range(batch_size)
]
)
pos_fea_result = [
pos_fea_result[i].take(pos_fea_index) for i in range(num_fea)
]

# negative sample layer by layer.
neg_fea_layer = []
for i in range(1, self._max_level):
neg_nodes = self._neg_sampler_list[i].get(pos_ids[:, i], pos_ids[:, i])
for i in np.append(remain_layer, self._max_level - 1):
neg_nodes = self._neg_sampler_list[i - 1].get(
pos_ids[:, i - 1], pos_ids[:, i - 1]
)
features = self._parse_nodes(neg_nodes)[1:]
neg_fea_layer.append(features)

# concatenate the features of each layer and
# ensure that the negative sample features of the same user are adjacent.
neg_fea_result = []
num_fea = len(neg_fea_layer[0])
batch_size = len(ids)
cum_layer_num = np.cumsum([0] + self._layer_num_sample[:-1])
same_user_index = np.concatenate(
cum_layer_num = np.cumsum(
[0]
+ [
self._layer_num_sample[i] if i in remain_layer else 0
for i in range(self._max_level - 1)
]
)
neg_fea_index = np.concatenate(
[
np.concatenate(
[
np.arange(self._layer_num_sample[i])
+ j * self._layer_num_sample[i]
+ batch_size * cum_layer_num[i]
for i in range(self._max_level)
for i in np.append(remain_layer, self._max_level - 1)
]
)
for j in range(batch_size)
]
)
neg_fea_result = [
pa.concat_arrays([array[i] for array in neg_fea_layer]).take(
same_user_index
)
pa.concat_arrays([array[i] for array in neg_fea_layer]).take(neg_fea_index)
for i in range(num_fea)
]

pos_result_dict = dict(zip(self._attr_names[1:], pos_fea_result))
neg_result_dict = dict(zip(self._attr_names[1:], neg_fea_result))

return pos_result_dict, neg_result_dict
Expand Down
15 changes: 8 additions & 7 deletions tzrec/datasets/sampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,20 @@ def _create_item_gl_data_for_tdm(self):
return f

def _create_edge_gl_data_for_tdm(self):
def _ancesstor(code):
def _ancestor(code):
ancs = []
while code > 0:
while True:
code = int((code - 1) / 2)
if code <= 0:
break
ancs.append(code)
return ancs

f = tempfile.NamedTemporaryFile("w")
self._temp_files.append(f)
f.write("src_id:int64\tdst_id:int\tweight:float\n")
for i in range(31, 63):
for anc in _ancesstor(i):
for anc in _ancestor(i):
f.write(f"{i}\t{anc}\t{1.0}\n")
f.flush()
return f
Expand Down Expand Up @@ -227,7 +229,6 @@ def _sampler_worker(
procs.append(p)
for i, p in enumerate(procs):
p.join()
print(f"{local_rank}, {group_rank} done.")
if p.exitcode != 0:
raise RuntimeError(f"client-{i} of worker-{rank} failed.")

Expand Down Expand Up @@ -456,9 +457,9 @@ def _sampler_worker(pos_res, neg_res):
p.join()
if p.exitcode != 0:
raise RuntimeError("worker failed.")
self.assertEqual(len(pos_res["int_a"]), 4 * 5)
self.assertEqual(len(pos_res["float_b"]), 4 * 5)
self.assertEqual(len(pos_res["str_c"]), 4 * 5)
self.assertEqual(len(pos_res["int_a"]), 4 * 4)
self.assertEqual(len(pos_res["float_b"]), 4 * 4)
self.assertEqual(len(pos_res["str_c"]), 4 * 4)
self.assertEqual(len(neg_res["int_a"]), 4 * 15)
self.assertEqual(len(neg_res["float_b"]), 4 * 15)
self.assertEqual(len(neg_res["str_c"]), 4 * 15)
Expand Down
2 changes: 1 addition & 1 deletion tzrec/protos/sampler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,5 @@ message TDMSampler {
optional float remain_ratio = 10 [default=1.0];
// The type of probability for selecting and retaining
// each layer in the middle layers of the tree
optional string probabilty_type = 11 [default="UNIFORM"];
optional string probability_type = 11 [default="UNIFORM"];
}
6 changes: 4 additions & 2 deletions tzrec/tools/tdm/gen_tree/tree_search_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def save(self) -> None:
dst_ids = []
weight = []
for travel in self.travel_list:
for i in range(self.max_level):
# do not include edge from leaf to root
for i in range(self.max_level - 1):
src_ids.append(travel[0])
dst_ids.append(travel[i + 1])
weight.append(1.0)
Expand Down Expand Up @@ -164,7 +165,8 @@ def save(self) -> None:
with open(os.path.join(self.output_file, "edge_table.txt"), "w") as f:
f.write("src_id:int64\tdst_id:int64\tweight:float\n")
for travel in self.travel_list:
for i in range(self.max_level):
# do not include edge from leaf to root
for i in range(self.max_level - 1):
f.write(f"{travel[0]}\t{travel[i+1]}\t{1.0}\n")

def save_predict_edge(self) -> None:
Expand Down
Loading

0 comments on commit e25bc04

Please sign in to comment.