Skip to content

Commit

Permalink
Merge pull request #5 from sysbio-vo/refactor-parallel
Browse files Browse the repository at this point in the history
Refactor parallel
  • Loading branch information
alnf authored Mar 26, 2018
2 parents 1439ab5 + 9022f9a commit 19d7927
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 93 deletions.
174 changes: 89 additions & 85 deletions BNfinder/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,10 @@ def get_prior(self,vertex,parent):
return self.prior.get((vertex.name,parent.name),self.prior.get(parent.name,1))


def learn(self,score,data_factor,alg,prior=None,cores=False,subset=False, \
def learn(self,score,data_factor,alg=None,prior=None,cores=False,subset=None, \
verbose=None,n_min=1,limit=None,min_empty=None,min_optim=None,fpr=None,max_tries=None):

scr=score
distribs = []

# calculate fpr_factor
if fpr:
Expand Down Expand Up @@ -664,91 +663,95 @@ def learn(self,score,data_factor,alg,prior=None,cores=False,subset=False, \
if x.name in subset:
vertices.append(x)

if cores:
# Some preparatory work
try:
import multiprocessing
import multiprocessing.pool
except ImportError:
print "Problem invoking multiprocessing module. Running on a single CPU"
cores=False
else:
# Daemonic processes are not allowed to have children
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# Need to check if we just want to concatenate previously obtained results
if (not subset) or (subset != "concat"):
if cores:
# Some preparatory work
try:
import multiprocessing
import multiprocessing.pool
except ImportError:
print "Problem invoking multiprocessing module. Running on a single CPU"
cores=False
else:
# Daemonic processes are not allowed to have children
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)

class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess

# We need to define how to distribute cores among the tasks
# In case of limit<=3 it is efficient to use hybrid alg
# There are two situations: number of cores >= number of vertices and opposite one.
# So, we need to handle it separately to distritube cores equaly
lim = 0
if (alg=="set"):
lim = 0
if (alg=="variable"):
lim = 999
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess

# There are two different algorithms for parallelization: set-wise and hybrid
# Set-wise algorithm is default and uses all the cores to compute parents set of one variable before proceeding further
# Hybrid algorithm evenly distributes cores between variables.
# It is recommended to use only if underlying network is supposed to be homogeneous
# with respect to the number of parents per variable or
# in case when computational complexity is low)

pool=MyPool(1)
if (limit is not None) and (int(limit)<=lim) and (cores>int(limit)):
if (cores>=len(vertices)):
pool=MyPool(len(vertices))
for counter in range(1, len(vertices)+1):
if (counter<=(cores%len(vertices))):
distribs.append(cores/len(vertices) + 1)
else:
distribs.append(cores/len(vertices))
else:
count = cores/int(limit)
if (cores%int(limit)>0):
count = count + 1

for counter in range(1, count+1):
if (counter<=(cores%count)):
distribs.append(cores/count + 1)
else:
distribs.append(cores/count)
pool=MyPool(1)
distribs = []
# Computational part
if verbose:
print "Computing in parallel"

temp = []
for i in range (1, len(vertices)+1):
temp.append(distribs[i%count])
distribs = temp
pool=MyPool(count)

# Computational part
if verbose:
print "Computing in parallel"
from itertools import izip,repeat
import timeit
start = timeit.default_timer()

if (alg=="setwise"):
if (self.regulators):
for v in vertices:
if v in self.regulators[0]:
distribs.append(1)
else:
distribs.append(cores)

from itertools import izip,repeat
import timeit
start = timeit.default_timer()
result = map(learn_x, [(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, y) for x, y in zip(vertices, distribs)])
else:
result = map(learn_x, [(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in vertices])

# Need to check if we just want to concatenate previously obtained results
if (subset==False) or (subset != "concat"):
# Hybrid alg
if (limit is not None) and (int(limit)<=lim) and (cores>int(limit)):
if (alg=="hybrid"):
# There are two situations: number of cores >= number of vertices and opposite one.
# So, we need to handle it separately to distritube cores equaly
if (self.regulators):
length = len(vertices) - len(self.regulators[0])
else:
length = len(vertices)
if (cores>=length):
pool=MyPool(length)
if (self.regulators):
for counter in range(len(self.regulators[0])):
distribs.append(1)

for counter in range(length):
if (counter<=(cores%length)):
distribs.append(cores/length + 1)
else:
distribs.append(cores/length)
else:
pool=MyPool(cores)
distribs = [1 for i in range(len(vertices))]

result=pool.map(learn_x,[(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, y) for x, y in zip(vertices, distribs)])
# Simple alg
else:
result = map(learn_x, [(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in vertices])

pool.close()
pool.join()

stop = timeit.default_timer()
s = str(stop - start)
if verbose:
print "----------------------------------------"
print "Time, secs: ", s
# Save computational time to file
with open("time"+str(cores)+".txt", 'w') as file:
file.write(s)
pool.close()
pool.join()

stop = timeit.default_timer()
s = str(stop - start)

if verbose:
print "----------------------------------------"
print "Time, secs: ", s
# Save computational time to file
with open("time"+str(cores)+".txt", 'w') as file:
file.write(s)

# Save computed subset of genes to file
if (subset) and (subset != "concat"):
Expand All @@ -757,12 +760,12 @@ class MyPool(multiprocessing.pool.Pool):
pickle.dump(result, file_pi)
exit()

# Computing without any parallelizing
else:
result=map(learn_x,[(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in self.vertices])
# Computing without any parallelizing
else:
result=map(learn_x,[(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in self.vertices])

# Concatenating previously computed subsets of genes
if subset == "concat":
elif subset == "concat":
import pickle
result = []
names = []
Expand All @@ -771,9 +774,10 @@ class MyPool(multiprocessing.pool.Pool):
for name in files:
if name.startswith("genes"):
names.append(name)

break

for name in names:
file_pi = open(name)
file_pi = open(os.getcwd()+"/"+name)
result.extend(pickle.load(file_pi))

#print result
Expand Down
15 changes: 11 additions & 4 deletions BNfinder/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst

# if not self.sloops:
# selected_data.rm_sloops()

import timeit
start = timeit.default_timer()

v = selected_data.vertex
nd = len(selected_data)
parents=selected_data.parents
Expand Down Expand Up @@ -159,7 +161,7 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst
while min_set.accepts(mg+mindata) and (size<=lim): #we can possibly add (sub-)optimal scores

# Parallelized version
if cores:
if (cores) and (cores>1):
import multiprocessing
import multiprocessing.pool

Expand Down Expand Up @@ -189,7 +191,7 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst
print "Using algorithm 1"

# Parallelized version
if cores:
if (cores) and (cores>1):
import multiprocessing
import multiprocessing.pool
pool=multiprocessing.Pool(cores)
Expand Down Expand Up @@ -239,8 +241,13 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst
mg_succ=self.graph_score(n,v,weights_succ,nd)
heappush(subsets,(mg_succ,weights_succ,sub_succ))

stop = timeit.default_timer()
s = str(stop - start)
if verbose:
print 'done', min_set
print "\n----------------------------------------"
print 'done ' + v.name, min_set
print "Time, secs: ", s
print "----------------------------------------"
return min_set.optimal, min_set.tolist()

# def learn_all(self,vertices,data,n_points):
Expand Down
8 changes: 4 additions & 4 deletions bnf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ if __name__=="__main__":
parser = OptionParser()
parser.add_option("-s", "--score", dest="score",default="BDE",help="Scoring function: BDE (default) or MDL or MIT")
parser.add_option("-d", "--data-factor", dest="df",default=1.0,type=float,help="Factor multiplying the data set")
parser.add_option("-e", "--expr", dest="expr",help="Expression data file")
parser.add_option("-e", "--expr", dest="expr",help="Expression data file (can be used with other data types as well)")
parser.add_option("-t", "--txt", dest="txt",help="Output file with the suboptimal parents sets")
parser.add_option("-n", "--net", dest="net",help="Output file with the network structure")
parser.add_option("-c", "--cpd", dest="cpd",help="Output file with the conditional probability distributions")
Expand All @@ -30,9 +30,9 @@ if __name__=="__main__":
parser.add_option("-x", "--max-permutations", dest="max_tries", type=int, default=None, help="Maximal number of permutations in type I error rate predetermination")
parser.add_option("-a", "--chi", dest="chi", type=float, default=.9999, help="alpha value for the chi-square distribution used in MIT score (default=.9999)")
parser.add_option("-g", "--sloops", dest="sloops", action='store_true', default=False, help="Allow self-loops in Dynamic Bayesian Networks (no self-loops by default)")
parser.add_option("-k", "--cpu", dest="cores", type=int, default=0, help="Number of cores you want to use for parallelization")
parser.add_option("-j", "--subset", dest="subset", help="Specify the file with subset of genes (divided by space character), parents of which you want to find. Use 'concat' value to combine the results. BNFinder will automaticaly search for files 'genes_*'")
parser.add_option("-q", "--algorithm", dest="alg", default="set", help="Choose the parralelization algorithm:'set' (default, give all the cores to compute parents set of one variable) or 'variable' (evenly distributes cores between variables)")
parser.add_option("-k", "--cpu", dest="cores", type=int, default=0, help="Number of cores to use for parallelization")
parser.add_option("-j", "--subset", dest="subset", help="Input file with subset of variables (divided by space character). The feature is useful when the inference complexity is too high, allowing distributed usage of BNFinder. After all the variables are processed (including regulators), put resulting 'genes_*' files into one folder and use 'concat' instead of file name. If only particular variables are subject for parents set inference it is advisable to simply edit input data file instead of using this argument")
parser.add_option("-q", "--algorithm", dest="alg", default="setwise", help="Parallelization algorithm: 'setwise' (default, uses each provided core to compute parents set of one variable before proceeding further) or 'hybrid' (evenly distributes cores between variables, it is recommended to use when underlying network is supposed to be homogeneous with respect to the number of parents per variable, in cases when computational complexity is low or small parents set limit is used)")

(options, args) = parser.parse_args()

Expand Down
Binary file added doc/doc_all.pdf
Binary file not shown.
1 change: 1 addition & 0 deletions doc/doc_all.tex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
\documentclass{howto}
\let\ifpdf\relax
%\documentclass{article}
\usepackage[T1]{fontenc}
\usepackage[utf8]{inputenc}
Expand Down
5 changes: 5 additions & 0 deletions doc/manual.tex
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ \subsubsection{bnf}
allow self-loops in Dynamic Bayesian Networks (by default self-loops are disabled)
\item[\texttt{-k, -\hspace{0pt}-cpu <number>}]~\\
use number of processes for multiprocessing - by default 0 is used (no multiprocessing)
\item[\texttt{-j, -\hspace{0pt}-subset <file> | "concat"}]~\\
input file with subset of variables (divided by space character); the feature is useful when the inference complexity is too high, allowing distributed usage of BNFinder; after all the variables are processed (including regulators), put resulting \texttt{genes_*} files into one folder and use \texttt{concat} instead of file name; if only particular variables are subject for parents set inference, it is advisable to simply edit input data file instead of using this argument
\item[\texttt{-q, -\hspace{0pt}-algorithm <name>}]~\\
parallelization algorithm: \texttt{setwise} (default, uses each provided core to compute parents set of one variable before proceeding further) or \texttt{hybrid} (evenly distributes cores between variables, it is recommended to use when underlying network is supposed to be homogeneous with respect to the number of parents per variable, in cases when computational complexity is low or small parents set limit is used)

\end{description}

\subsubsection{bnf-cv}
Expand Down

0 comments on commit 19d7927

Please sign in to comment.