diff --git a/BNfinder/data.py b/BNfinder/data.py index 353da57..3811041 100644 --- a/BNfinder/data.py +++ b/BNfinder/data.py @@ -629,11 +629,10 @@ def get_prior(self,vertex,parent): return self.prior.get((vertex.name,parent.name),self.prior.get(parent.name,1)) - def learn(self,score,data_factor,alg,prior=None,cores=False,subset=False, \ + def learn(self,score,data_factor,alg=None,prior=None,cores=False,subset=None, \ verbose=None,n_min=1,limit=None,min_empty=None,min_optim=None,fpr=None,max_tries=None): scr=score - distribs = [] # calculate fpr_factor if fpr: @@ -664,91 +663,95 @@ def learn(self,score,data_factor,alg,prior=None,cores=False,subset=False, \ if x.name in subset: vertices.append(x) - if cores: - # Some preparatory work - try: - import multiprocessing - import multiprocessing.pool - except ImportError: - print "Problem invoking multiprocessing module. Running on a single CPU" - cores=False - else: - # Daemonic processes are not allowed to have children - class NoDaemonProcess(multiprocessing.Process): - # make 'daemon' attribute always return False - def _get_daemon(self): - return False - def _set_daemon(self, value): - pass - daemon = property(_get_daemon, _set_daemon) + # Need to check if we just want to concatenate previously obtained results + if (not subset) or (subset != "concat"): + if cores: + # Some preparatory work + try: + import multiprocessing + import multiprocessing.pool + except ImportError: + print "Problem invoking multiprocessing module. Running on a single CPU" + cores=False + else: + # Daemonic processes are not allowed to have children + class NoDaemonProcess(multiprocessing.Process): + # make 'daemon' attribute always return False + def _get_daemon(self): + return False + def _set_daemon(self, value): + pass + daemon = property(_get_daemon, _set_daemon) - class MyPool(multiprocessing.pool.Pool): - Process = NoDaemonProcess - - # We need to define how to distribute cores among the tasks - # In case of limit<=3 it is efficient to use hybrid alg - # There are two situations: number of cores >= number of vertices and opposite one. - # So, we need to handle it separately to distritube cores equaly - lim = 0 - if (alg=="set"): - lim = 0 - if (alg=="variable"): - lim = 999 + class MyPool(multiprocessing.pool.Pool): + Process = NoDaemonProcess + + # There are two different algorithms for parallelization: set-wise and hybrid + # Set-wise algorithm is default and uses all the cores to compute parents set of one variable before proceeding further + # Hybrid algorithm evenly distributes cores between variables. + # It is recommended to use only if underlying network is supposed to be homogeneous + # with respect to the number of parents per variable or + # in case when computational complexity is low) - pool=MyPool(1) - if (limit is not None) and (int(limit)<=lim) and (cores>int(limit)): - if (cores>=len(vertices)): - pool=MyPool(len(vertices)) - for counter in range(1, len(vertices)+1): - if (counter<=(cores%len(vertices))): - distribs.append(cores/len(vertices) + 1) - else: - distribs.append(cores/len(vertices)) - else: - count = cores/int(limit) - if (cores%int(limit)>0): - count = count + 1 - - for counter in range(1, count+1): - if (counter<=(cores%count)): - distribs.append(cores/count + 1) - else: - distribs.append(cores/count) + pool=MyPool(1) + distribs = [] + # Computational part + if verbose: + print "Computing in parallel" - temp = [] - for i in range (1, len(vertices)+1): - temp.append(distribs[i%count]) - distribs = temp - pool=MyPool(count) - - # Computational part - if verbose: - print "Computing in parallel" + from itertools import izip,repeat + import timeit + start = timeit.default_timer() + + if (alg=="setwise"): + if (self.regulators): + for v in vertices: + if v in self.regulators[0]: + distribs.append(1) + else: + distribs.append(cores) - from itertools import izip,repeat - import timeit - start = timeit.default_timer() + result = map(learn_x, [(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, y) for x, y in zip(vertices, distribs)]) + else: + result = map(learn_x, [(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in vertices]) - # Need to check if we just want to concatenate previously obtained results - if (subset==False) or (subset != "concat"): - # Hybrid alg - if (limit is not None) and (int(limit)<=lim) and (cores>int(limit)): + if (alg=="hybrid"): + # There are two situations: number of cores >= number of vertices and opposite one. + # So, we need to handle it separately to distritube cores equaly + if (self.regulators): + length = len(vertices) - len(self.regulators[0]) + else: + length = len(vertices) + if (cores>=length): + pool=MyPool(length) + if (self.regulators): + for counter in range(len(self.regulators[0])): + distribs.append(1) + + for counter in range(length): + if (counter<=(cores%length)): + distribs.append(cores/length + 1) + else: + distribs.append(cores/length) + else: + pool=MyPool(cores) + distribs = [1 for i in range(len(vertices))] + result=pool.map(learn_x,[(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, y) for x, y in zip(vertices, distribs)]) - # Simple alg - else: - result = map(learn_x, [(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in vertices]) - pool.close() - pool.join() - stop = timeit.default_timer() - s = str(stop - start) - if verbose: - print "----------------------------------------" - print "Time, secs: ", s - # Save computational time to file - with open("time"+str(cores)+".txt", 'w') as file: - file.write(s) + pool.close() + pool.join() + + stop = timeit.default_timer() + s = str(stop - start) + + if verbose: + print "----------------------------------------" + print "Time, secs: ", s + # Save computational time to file + with open("time"+str(cores)+".txt", 'w') as file: + file.write(s) # Save computed subset of genes to file if (subset) and (subset != "concat"): @@ -757,12 +760,12 @@ class MyPool(multiprocessing.pool.Pool): pickle.dump(result, file_pi) exit() - # Computing without any parallelizing - else: - result=map(learn_x,[(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in self.vertices]) + # Computing without any parallelizing + else: + result=map(learn_x,[(x,self,min_empty,min_optim,scr,verbose,n_min,limit,fpr_factor,max_tries, cores) for x in self.vertices]) # Concatenating previously computed subsets of genes - if subset == "concat": + elif subset == "concat": import pickle result = [] names = [] @@ -771,9 +774,10 @@ class MyPool(multiprocessing.pool.Pool): for name in files: if name.startswith("genes"): names.append(name) - + break + for name in names: - file_pi = open(name) + file_pi = open(os.getcwd()+"/"+name) result.extend(pickle.load(file_pi)) #print result diff --git a/BNfinder/score.py b/BNfinder/score.py index b146f43..2ca264a 100644 --- a/BNfinder/score.py +++ b/BNfinder/score.py @@ -129,7 +129,9 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst # if not self.sloops: # selected_data.rm_sloops() - + import timeit + start = timeit.default_timer() + v = selected_data.vertex nd = len(selected_data) parents=selected_data.parents @@ -159,7 +161,7 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst while min_set.accepts(mg+mindata) and (size<=lim): #we can possibly add (sub-)optimal scores # Parallelized version - if cores: + if (cores) and (cores>1): import multiprocessing import multiprocessing.pool @@ -189,7 +191,7 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst print "Using algorithm 1" # Parallelized version - if cores: + if (cores) and (cores>1): import multiprocessing import multiprocessing.pool pool=multiprocessing.Pool(cores) @@ -239,8 +241,13 @@ def learn_1(self,selected_data,verbose=None,n_min=1,limit=None,score_max=fpconst mg_succ=self.graph_score(n,v,weights_succ,nd) heappush(subsets,(mg_succ,weights_succ,sub_succ)) + stop = timeit.default_timer() + s = str(stop - start) if verbose: - print 'done', min_set + print "\n----------------------------------------" + print 'done ' + v.name, min_set + print "Time, secs: ", s + print "----------------------------------------" return min_set.optimal, min_set.tolist() # def learn_all(self,vertices,data,n_points): diff --git a/bnf b/bnf index 17a5969..223d86f 100755 --- a/bnf +++ b/bnf @@ -14,7 +14,7 @@ if __name__=="__main__": parser = OptionParser() parser.add_option("-s", "--score", dest="score",default="BDE",help="Scoring function: BDE (default) or MDL or MIT") parser.add_option("-d", "--data-factor", dest="df",default=1.0,type=float,help="Factor multiplying the data set") - parser.add_option("-e", "--expr", dest="expr",help="Expression data file") + parser.add_option("-e", "--expr", dest="expr",help="Expression data file (can be used with other data types as well)") parser.add_option("-t", "--txt", dest="txt",help="Output file with the suboptimal parents sets") parser.add_option("-n", "--net", dest="net",help="Output file with the network structure") parser.add_option("-c", "--cpd", dest="cpd",help="Output file with the conditional probability distributions") @@ -30,9 +30,9 @@ if __name__=="__main__": parser.add_option("-x", "--max-permutations", dest="max_tries", type=int, default=None, help="Maximal number of permutations in type I error rate predetermination") parser.add_option("-a", "--chi", dest="chi", type=float, default=.9999, help="alpha value for the chi-square distribution used in MIT score (default=.9999)") parser.add_option("-g", "--sloops", dest="sloops", action='store_true', default=False, help="Allow self-loops in Dynamic Bayesian Networks (no self-loops by default)") - parser.add_option("-k", "--cpu", dest="cores", type=int, default=0, help="Number of cores you want to use for parallelization") - parser.add_option("-j", "--subset", dest="subset", help="Specify the file with subset of genes (divided by space character), parents of which you want to find. Use 'concat' value to combine the results. BNFinder will automaticaly search for files 'genes_*'") - parser.add_option("-q", "--algorithm", dest="alg", default="set", help="Choose the parralelization algorithm:'set' (default, give all the cores to compute parents set of one variable) or 'variable' (evenly distributes cores between variables)") + parser.add_option("-k", "--cpu", dest="cores", type=int, default=0, help="Number of cores to use for parallelization") + parser.add_option("-j", "--subset", dest="subset", help="Input file with subset of variables (divided by space character). The feature is useful when the inference complexity is too high, allowing distributed usage of BNFinder. After all the variables are processed (including regulators), put resulting 'genes_*' files into one folder and use 'concat' instead of file name. If only particular variables are subject for parents set inference it is advisable to simply edit input data file instead of using this argument") + parser.add_option("-q", "--algorithm", dest="alg", default="setwise", help="Parallelization algorithm: 'setwise' (default, uses each provided core to compute parents set of one variable before proceeding further) or 'hybrid' (evenly distributes cores between variables, it is recommended to use when underlying network is supposed to be homogeneous with respect to the number of parents per variable, in cases when computational complexity is low or small parents set limit is used)") (options, args) = parser.parse_args() diff --git a/doc/doc_all.pdf b/doc/doc_all.pdf new file mode 100644 index 0000000..b32800b Binary files /dev/null and b/doc/doc_all.pdf differ diff --git a/doc/doc_all.tex b/doc/doc_all.tex index 17d7ad1..9a199d3 100644 --- a/doc/doc_all.tex +++ b/doc/doc_all.tex @@ -1,4 +1,5 @@ \documentclass{howto} +\let\ifpdf\relax %\documentclass{article} \usepackage[T1]{fontenc} \usepackage[utf8]{inputenc} diff --git a/doc/manual.tex b/doc/manual.tex index 288b730..65057aa 100644 --- a/doc/manual.tex +++ b/doc/manual.tex @@ -114,6 +114,11 @@ \subsubsection{bnf} allow self-loops in Dynamic Bayesian Networks (by default self-loops are disabled) \item[\texttt{-k, -\hspace{0pt}-cpu }]~\\ use number of processes for multiprocessing - by default 0 is used (no multiprocessing) +\item[\texttt{-j, -\hspace{0pt}-subset | "concat"}]~\\ + input file with subset of variables (divided by space character); the feature is useful when the inference complexity is too high, allowing distributed usage of BNFinder; after all the variables are processed (including regulators), put resulting \texttt{genes_*} files into one folder and use \texttt{concat} instead of file name; if only particular variables are subject for parents set inference, it is advisable to simply edit input data file instead of using this argument +\item[\texttt{-q, -\hspace{0pt}-algorithm }]~\\ + parallelization algorithm: \texttt{setwise} (default, uses each provided core to compute parents set of one variable before proceeding further) or \texttt{hybrid} (evenly distributes cores between variables, it is recommended to use when underlying network is supposed to be homogeneous with respect to the number of parents per variable, in cases when computational complexity is low or small parents set limit is used) + \end{description} \subsubsection{bnf-cv}