diff --git a/ChangeLog b/ChangeLog index a9db11d..b93a2e5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -3,6 +3,7 @@ Unreleased * SemiBin1: Introduce separate SemiBin1 command * internal: Code simplification and refactor * deprecation: Deprecate --orf-finder=fraggenescan option + * SemiBin: do not use more processes than can be taken advantage of (#155) Version 2.0.2 Oct 31 2023 by BigDataBiology * multi_easy_bin: Fix multi_easy_bin with --write-pre-recluster (#128) diff --git a/SemiBin/cluster.py b/SemiBin/cluster.py index 791fca2..dc9759a 100644 --- a/SemiBin/cluster.py +++ b/SemiBin/cluster.py @@ -17,8 +17,8 @@ def run_infomap(g, edge_weights, vertex_weights, num_process): '''Run infomap, using multiple processors (if available)''' if num_process == 1: return g.community_infomap(edge_weights=edge_weights, vertex_weights=vertex_weights, trials=NR_INFOMAP_TRIALS) - import multiprocessing - with multiprocessing.Pool(num_process) as p: + import multiprocessing as mp + with mp.Pool(min(num_process, NR_INFOMAP_TRIALS)) as p: rs = [p.apply_async(run_infomap1, (g, edge_weights, vertex_weights, 1)) for _ in range(NR_INFOMAP_TRIALS)] p.close() diff --git a/SemiBin/main.py b/SemiBin/main.py index e35d808..125fca6 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -773,6 +773,44 @@ def generate_sequence_features_single(logger, contig_fasta, logger.info('We will only calculate k-mer features.') if not only_kmer: + n_sample = len(bams) + is_combined = n_sample >= 5 + bam_list = bams + + logger.info('Calculating coverage for every sample.') + + with Pool(min(num_process, len(bams))) as pool: + results = [ + pool.apply_async( + generate_cov, + args=( + bam_file, + bam_index, + output, + must_link_threshold, + is_combined, + binned_length, + logger, + None + )) + for bam_index, bam_file in enumerate(bams)] + for r in results: + s = r.get() + logger.info(f'Processed: {s}') + + for bam_index, bam_file in enumerate(bams): + if not os.path.exists(os.path.join(output, '{}_data_cov.csv'.format( + os.path.split(bam_file)[-1] + '_{}'.format(bam_index)))): + sys.stderr.write( + f"Error: Generating coverage file fail\n") + sys.exit(1) + if is_combined: + if not os.path.exists(os.path.join(output, '{}_data_split_cov.csv'.format( + os.path.split(bam_file)[-1] + '_{}'.format(bam_index)))): + sys.stderr.write( + f"Error: Generating coverage file fail\n") + sys.exit(1) + logger.debug('Start generating kmer features from fasta file.') kmer_whole = generate_kmer_features_from_fasta( contig_fasta, binned_length, 4) @@ -922,6 +960,7 @@ def fasta_sample_iter(fn): os.path.join(args.output, f'samples/{sample}.fa'), args.ratio) + if args.bams: with Pool(args.num_process if args.num_process != 0 else None) as pool: results = [ diff --git a/SemiBin/naive_orffinder.py b/SemiBin/naive_orffinder.py index 2227ead..0f5791b 100644 --- a/SemiBin/naive_orffinder.py +++ b/SemiBin/naive_orffinder.py @@ -9,6 +9,12 @@ STOP_CODONS = ['TAA', 'TAG', 'TGA'] MIN_LEN = 90 +# See https://github.com/BigDataBiology/SemiBin/issues/155 +# A bit of testing showed that scaling tops up at 10~12 processes (on a 16-core +# machine). This leaves a little leeway for a few more processes, but still +# caps it to avoid #155 +MAX_PROCESS_ORFS = 24 + def reverse_complement(seq): return seq[::-1].translate(str.maketrans('ATGC', 'TACG')) @@ -123,7 +129,7 @@ def run_naiveorf(fasta_path, num_process, output): out.write(f'{translate(extract(seq, orf))}\n') else: ctx = mp.get_context('spawn') - with ctx.Pool(processes=num_process) as p: + with ctx.Pool(processes=min(num_process, MAX_PROCESS_ORFS)) as p: outs = p.imap(get_orfs, fasta.fasta_iter(fasta_path), chunksize=8) diff --git a/test/test_bin.py b/test/test_bin.py index 900adee..f288f04 100644 --- a/test/test_bin.py +++ b/test/test_bin.py @@ -94,6 +94,14 @@ def test_cluster(): res, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) +def _renumber_cluster(xs): + ys = np.zeros_like(xs) + renumber = {} + for ix,x in enumerate(xs): + if x not in renumber: + renumber[x] = len(renumber) + ys[ix] = renumber[x] + return ys def test_recluster(): contig_dict = {h:seq for h,seq in fasta_iter('test/bin_data/input.fasta')} @@ -116,10 +124,12 @@ def test_recluster(): random_seed=123) # Computed with a previous version - np.testing.assert_array_equal( - reclustered, - np.array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 29, 21, 22, 23, 24, 25, 26, + expected = np.array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 29, 21, 22, 23, 24, 25, 26, 26, 28, 27, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 10, 11, 12, 13, - 14, 15, 16, 17, 18, 19])) + 14, 15, 16, 17, 18, 19]) + + np.testing.assert_array_equal( + _renumber_cluster(reclustered), + _renumber_cluster(expected))