Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Add dump walk #528

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions apps/PGLBox/src/cluster_train_and_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@
import sys
import time
import glob
import yaml
import shutil
import argparse
import traceback
import pickle as pkl
import numpy as np
import helper
from datetime import datetime

import paddle
import paddle.fluid as fluid
import paddle.static as static
from place import get_cuda_places
from pgl.utils.logger import log
Expand Down Expand Up @@ -61,7 +58,7 @@ def train(args, exe, model_dict, dataset):

epoch_loss = 0
train_pass_num = 0
for pass_dataset in dataset.pass_generator():
for pass_dataset in dataset.pass_generator(epoch):
exe.train_from_dataset(
model_dict.train_program, pass_dataset, debug=False)

Expand Down Expand Up @@ -161,7 +158,7 @@ def train_with_multi_metapath(args, exe, model_dict, dataset):
fleet.barrier_worker()
savemodel_end = time.time()
log.info("STAGE [SAVE MODEL] for epoch [%d] finished, time cost: %f sec" \
% (epoch + 1, savemodel_end - savemodel_begin))
% (epoch, savemodel_end - savemodel_begin))

train_end_time = time.time()
log.info("STAGE [TRAIN MODEL] finished, time cost: % sec" %
Expand Down Expand Up @@ -216,7 +213,7 @@ def run_worker(args, exe, model_dict, infer_model_dict):

dist_graph.load_edge()
ret = dist_graph.load_node()
if ret is not 0:
if ret != 0:
return -1

if args.warm_start_from:
Expand Down Expand Up @@ -260,6 +257,13 @@ def run_worker(args, exe, model_dict, infer_model_dict):
log.info("STAGE: need_inference is %s, skip inference process" %
args.need_inference)

if args.need_dump_walk is True:
upload_dump_begin = time.time()
util.upload_dump_walk(args, args.local_dump_path)
upload_dump_end = time.time()
log.info("STAGE [UPLOAD DUMP WALK] finished, time cost: %f sec" %
(upload_dump_end - upload_dump_begin))

return 0


Expand Down Expand Up @@ -314,7 +318,7 @@ def main(args):

elif fleet.is_worker():
ret = run_worker(args, exe, model_dict, infer_model_dict)
if ret is not 0:
if ret != 0:
fleet.stop_worker()
return -1

Expand All @@ -333,6 +337,7 @@ def main(args):
config.local_result_path = "./embedding"
config.model_save_path = os.path.join(config.working_root, "model")
config.infer_result_path = os.path.join(config.working_root, 'embedding')
config.dump_save_path = os.path.join(config.working_root, 'dump_walk')
config.max_steps = config.max_steps if config.max_steps else 0
config.metapath_split_opt = config.metapath_split_opt if config.metapath_split_opt else False
print("#===================PRETTY CONFIG============================#")
Expand Down
12 changes: 7 additions & 5 deletions apps/PGLBox/src/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

import os
import time
import paddle
import threading

import paddle
from paddle.distributed import fleet
import util
import paddle.fluid as fluid
from place import get_cuda_places
from pgl.utils.logger import log

import util
from place import get_cuda_places
import models.model_util as model_util


class BaseDataset(object):
def __init__(self,
Expand Down Expand Up @@ -221,14 +224,13 @@ def __init__(self,
dist_graph=dist_graph,
is_predict=False)

def pass_generator(self):
def pass_generator(self, epoch=None):
# open a thread for processing the data
dataset_list = []
t = threading.Thread(target=self.preload_thread, args=(dataset_list, ))
t.setDaemon(True)
t.start()

epoch_loss = 0
pass_id = 0
while 1:
self.ins_ready_sem.acquire()
Expand Down
5 changes: 3 additions & 2 deletions apps/PGLBox/src/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def load_edge(self):
if not self.metapath_split_opt:
load_begin_time = time.time()
for i in range(len(self.etype_list)):
log.info("begin to upload edge_type: [%s] to GPU" % self.etype_list[i])
log.info("begin to upload edge_type: [%s] to GPU" %
self.etype_list[i])
self.graph.upload_batch(0, i,
len(get_cuda_places()),
self.etype_list[i])
Expand All @@ -138,7 +139,7 @@ def load_node(self):
ret = self.graph.load_node_file(self.node_types, self.root_dir,
self.num_parts)

if ret is not 0:
if ret != 0:
log.info("Fail to load node, ntype2files[%s] path[%s] num_part[%d]" \
% (self.node_types, self.root_dir, self.num_parts))
self.graph.release_graph_node()
Expand Down
19 changes: 19 additions & 0 deletions apps/PGLBox/src/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,22 @@ def put(src, dest, hadoop_bin=None, fs_name=None, fs_ugi=None):
cmd += " 2>%s" % ERR_LOG
ret = os.system(cmd)
return ret


def replace(src, dest, hadoop_bin=None, fs_name=None, fs_ugi=None):
"""hadoop replace"""
hadoop_bin, fs_name, fs_ugi = parse_account(hadoop_bin, fs_name, fs_ugi)
src = check_hadoop_path(src, fs_name, fs_ugi)
dest = check_hadoop_path(dest, fs_name, fs_ugi)

tmp = dest + "_" + str(int(time.time()))
cmd = make_base_cmd(hadoop_bin, fs_name, fs_ugi)
cmd += " -mv " + dest + " " + tmp + " && "

cmd += make_base_cmd(hadoop_bin, fs_name, fs_ugi)
cmd += " -put " + src + " " + dest + " && "

cmd += make_base_cmd(hadoop_bin, fs_name, fs_ugi)
cmd += " -rmr " + tmp
ret = os.system(cmd)
return ret
2 changes: 1 addition & 1 deletion apps/PGLBox/src/models/model_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def get_sparse_embedding(config,
use_cvm=use_cvm)
for bow in slot_bows:
slot_embedding = bow[:, 1:]
slot_embedding = paddle.nn.softsign(slot_embedding)
slot_embedding = paddle.nn.functional.softsign(slot_embedding)
slot_embedding_list.append(slot_embedding)

return id_embedding, slot_embedding_list
Expand Down
33 changes: 33 additions & 0 deletions apps/PGLBox/src/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def get_global_value(value_sum, value_cnt):
return value_sum / np.maximum(value_cnt, 1)


def get_batch_num(value_cnt):
""" get global value """
value_cnt = np.array(fluid.global_scope().find_var(value_cnt.name)
.get_tensor())
return value_cnt


def parse_path(path):
"""
Args:
Expand Down Expand Up @@ -264,6 +271,32 @@ def upload_embedding(args, local_embed_path):
log.info("embedding has been saved in local path: %s" % working_root)


def upload_dump_walk(args, local_dump_path):
mode, dump_save_path = parse_path(args.dump_save_path)
_, working_root = parse_path(args.working_root)
if mode == "hdfs":
HFS.rm(dump_save_path)
HFS.mkdir(dump_save_path)

log.info("being to upload walk_path to: %s " % dump_save_path)
for file in glob.glob(os.path.join(local_dump_path, "*")):
basename = os.path.basename(file)
HFS.put(file, dump_save_path)
log.info("[hadoop put] walk_path has been upload to: %s " %
dump_save_path)
elif mode == "afs":
log.info("being to upload walk_path to: %s " % dump_save_path)
# HFS.rm(dump_save_path)
user, passwd = args.fs_ugi.split(',')
gzshell_upload(args.fs_name, user, passwd, local_dump_path,
"afs:%s" % working_root)
log.info("[gzshell] walk_path has been upload to: %s" % dump_save_path)
else:
make_dir(working_root)
run_cmd("mv %s %s" % (local_dump_path, working_root))
log.info("walk_path has been saved in local path: %s" % working_root)


def hadoop_touch_done(path):
""" touch hadoop done """
if fleet.worker_index() == 0:
Expand Down
2 changes: 2 additions & 0 deletions apps/PGLBox/user_configs/lightgcn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ need_inference: True
# 预估embedding的时候,需要的参数,保持默认即可.
dump_node_name: "src_node___id"
dump_node_emb_name: "src_node___emb"
# 是否需要dump游走路径
need_dump_walk: False

# ---------------------------train param config---------------------------------------------#
epochs: 1
Expand Down
2 changes: 2 additions & 0 deletions apps/PGLBox/user_configs/metapath.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ need_inference: True
# 预估embedding的时候,需要的参数,保持默认即可.
dump_node_name: "src_node___id"
dump_node_emb_name: "src_node___emb"
# 是否需要dump游走路径
need_dump_walk: False

# ---------------------------train param config---------------------------------------------#
epochs: 1
Expand Down