From b110e9472ab50c2261f5a0619985be67bbf1eb37 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Fri, 12 Mar 2021 17:09:51 -0800 Subject: [PATCH 01/10] small fix --- neusomatic/cpp/scan_alignments.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neusomatic/cpp/scan_alignments.cpp b/neusomatic/cpp/scan_alignments.cpp index d93adad..ad75aa1 100644 --- a/neusomatic/cpp/scan_alignments.cpp +++ b/neusomatic/cpp/scan_alignments.cpp @@ -335,7 +335,7 @@ int main(int argc, char **argv) { std::cout<<"col "< Date: Fri, 2 Apr 2021 17:58:39 -0700 Subject: [PATCH 02/10] accelarate preprocess --- neusomatic/cpp/scan_alignments.cpp | 516 +++++++++++++++++++---- neusomatic/include/Options.h | 14 + neusomatic/include/msa_utils.hpp | 267 +++++++++++- neusomatic/python/generate_dataset.py | 577 +++++++++++++++++++++----- neusomatic/python/preprocess.py | 7 +- neusomatic/python/scan_alignments.py | 22 +- 6 files changed, 1190 insertions(+), 213 deletions(-) diff --git a/neusomatic/cpp/scan_alignments.cpp b/neusomatic/cpp/scan_alignments.cpp index ad75aa1..e459431 100644 --- a/neusomatic/cpp/scan_alignments.cpp +++ b/neusomatic/cpp/scan_alignments.cpp @@ -59,9 +59,15 @@ int main(int argc, char **argv) { float snp_min_af=min_af; const bool calculate_qual_stat = opts.calculate_qual_stat(); const bool report_all_alleles = opts.report_all_alleles(); + const bool report_count_for_all_positions = opts.report_count_for_all_positions(); + + + + auto start_main = std::chrono::system_clock::now(); //const std::map empty_pileup_counts = {{'-', 0}, {'A', 0}, {'C', 0}, {'G', 0}, {'T', 0}}; static const std::vector nuc_code_char = {'A', 'C', 'G', 'T', '-', 'N'}; + const int matrix_base_pad = 7; using GInvStdString = neusomatic::bio::GenomicInterval; using GInvInt = neusomatic::bio::GenomicInterval; @@ -80,6 +86,7 @@ int main(int argc, char **argv) { } std::ofstream count_out_writer; count_out_writer.open(count_out); + std::string count_out_str = ""; neusomatic::CaptureLayout capture_layout(bam_path, bed_regions, opts); SeqLib::BamHeader bam_header = capture_layout.Reader().Header(); @@ -99,16 +106,35 @@ int main(int argc, char **argv) { exit(1); } + auto end_main_0 = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_main_0 = end_main_0-start_main; + std::time_t end_time_main_0 = std::chrono::system_clock::to_time_t(end_main_0); + std::cout << "elapsed_main_0 time: " << elapsed_seconds_main_0.count() << "s\n"; + + + int last_var_pos=-1; + int cnt_region=0; + auto timestamp = opts.verbosity() > 0; while (capture_layout.NextRegion(opts.fully_contained())) { + + auto start_0_while = std::chrono::system_clock::now(); + // a map from genomic interval -> a vector of alignReadIds for (auto targeted_region : capture_layout.ginv_records()) { + + + auto start_0_for = std::chrono::system_clock::now(); + auto start_0 = std::chrono::system_clock::now(); + auto ginv = targeted_region.first; auto& records = targeted_region.second; GInvStdString ginvstr(bam_header.IDtoName(ginv.contig()), ginv.left(), ginv.right()); - if (opts.verbosity() > 2 || cnt_region % 100 == 0){ + + if (opts.verbosity() > 2 || (cnt_region % 100 == 0 || timestamp)) { std::cerr<<"#On region "< elapsed_seconds_0 = end_0-start_0; + std::time_t end_time_0 = std::chrono::system_clock::to_time_t(end_0); + if (timestamp){ + std::cout << "elapsed_0 time: " << elapsed_seconds_0.count() << "s\n"; + } + auto start_1 = std::chrono::system_clock::now(); + neusomatic::CondensedArray condensed_array; + + auto end_1 = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_1 = end_1-start_1; + std::time_t end_time_1 = std::chrono::system_clock::to_time_t(end_1); + if (timestamp){ + std::cout << "elapsed_1 time: " << elapsed_seconds_1.count() << "s\n"; + } + auto start_2 = std::chrono::system_clock::now(); + MSA msa(ginv, records, non_gapped_ref); + + auto end_2 = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_2 = end_2-start_2; + std::time_t end_time_2 = std::chrono::system_clock::to_time_t(end_2); + if (timestamp){ + std::cout << "elapsed_2 time: " << elapsed_seconds_2.count() << "s\n"; + } + auto start_3 = std::chrono::system_clock::now(); + condensed_array = neusomatic::CondensedArray(msa, calculate_qual_stat); - auto cols = condensed_array.GetColSpace(); - auto cols_mqual = condensed_array.GetColSpaceMQual(); - auto cols_strand = condensed_array.GetColSpaceStrand(); - auto cols_lsc = condensed_array.GetColSpaceLSC(); - auto cols_rsc = condensed_array.GetColSpaceRSC(); - auto cols_tag = condensed_array.GetColSpaceTag(); - auto ncol = cols.size(); + + auto end_3 = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_3 = end_3-start_3; + std::time_t end_time_3 = std::chrono::system_clock::to_time_t(end_3); + if (timestamp){ + std::cout << "elapsed_3 time: " << elapsed_seconds_3.count() << "s\n"; + } + // auto start_4 = std::chrono::system_clock::now(); + + // auto cols = condensed_array.GetColSpace(); + // auto cols = condensed_array.GetColSpaceMQual(); + // auto cols = condensed_array.GetColSpaceStrand(); + // auto cols = condensed_array.GetColSpaceLSC(); + // auto cols = condensed_array.GetColSpaceRSC(); + // auto cols = condensed_array.GetColSpaceTag(); + // auto ncol = cols.size(); + // const auto ref = condensed_array.GetGappedRef(); + // const auto cc = neusomatic::ChangeCoordinates(ref); + + + // auto end_4 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_4 = end_4-start_4; + // std::time_t end_time_4 = std::chrono::system_clock::to_time_t(end_4); + // if (timestamp){ + // std::cout << "elapsed_4 time: " << elapsed_seconds_4.count() << "s\n"; + // } + + auto start_4 = std::chrono::system_clock::now(); + + auto ncol = condensed_array.ncol(); const auto ref = condensed_array.GetGappedRef(); const auto cc = neusomatic::ChangeCoordinates(ref); + auto end_4 = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_4 = end_4-start_4; + std::time_t end_time_4 = std::chrono::system_clock::to_time_t(end_4); + if (timestamp){ + std::cout << "elapsed_4 time: " << elapsed_seconds_4.count() << "s\n"; + } + + auto start_5 = std::chrono::system_clock::now(); + + std::vector var_cols; + + if (( cnt_region == 1 ) or (last_var_pos >= 0)) { + var_cols.push_back(0); + } + + last_var_pos = -1; + for (size_t i = 0; i < ncol; i++) { + + // auto start_4p0 = std::chrono::system_clock::now(); + if (ginv.left() + cc.UngapPos(i) >=ginv.right()){ break;} auto ref_base = ref[i]; ref_base = std::toupper(ref_base); @@ -144,80 +239,54 @@ int main(int argc, char **argv) { ref_base = '-'; } - if (opts.verbosity()>0){ + auto base_freq_ = condensed_array.GetBaseFreq(i); + + if (opts.verbosity()>1){ std::cout<<"col "< pileup_counts(cols[i].base_freq_.size()); + auto nrow = condensed_array.nrow()-base_freq_[5]; + base_freq_.erase(base_freq_.begin() + 5); + + std::vector pileup_counts(base_freq_.size()); int total_count=0; - for (int base = 0; base < (int) cols[i].base_freq_.size(); ++base) { - pileup_counts[base] = cols[i].base_freq_[base]; - total_count+=cols[i].base_freq_[base]; + for (int base = 0; base < (int) base_freq_.size(); ++base) { + pileup_counts[base] = base_freq_[base]; + total_count+=base_freq_[base]; } - if (total_count==0) {continue;} - int rsc_counts=0; - int lsc_counts=0; auto start_pos=ginv.left() + cc.UngapPos(i); if (ref_base!='-'){ start_pos++; } - if (calculate_qual_stat){ - for(auto it = cols_lsc[i].lsc_mean.cbegin(); it != cols_lsc[i].lsc_mean.cend(); ++it) { - rsc_counts+=*it; - } - for(auto it = cols_rsc[i].rsc_mean.cbegin(); it != cols_rsc[i].rsc_mean.cend(); ++it) { - lsc_counts+=*it; - } - int sc_counts=lsc_counts+rsc_counts; - count_out_writer< elapsed_seconds_4p0 = end_4p0-start_4p0; + // std::time_t end_time_4p0 = std::chrono::system_clock::to_time_t(end_4p0); + // std::cout << "elapsed_4p0 time: " << elapsed_seconds_4p0.count() << "s\n"; + // auto start_4p1 = std::chrono::system_clock::now(); + std::map alt_counts; - auto ref_count = cols[i].base_freq_[ref_code]; + auto ref_count = base_freq_[ref_code]; auto var_code = ref_code; int var_count = 0; int dp = ref_count; if (report_all_alleles and ref_base != '-'){ - for (int row = 0; row < cols[i].base_freq_.size(); ++row) { - auto alt_cnt = cols[i].base_freq_[row]; + for (int row = 0; row < base_freq_.size(); ++row) { + auto alt_cnt = base_freq_[row]; if (( row != ref_code) and (alt_cnt > 0)){ auto af = alt_cnt/float(alt_cnt+ref_count); - if ((alt_cnt >= ref_code) or ((row == 4 and af > del_min_af ) or + if ((alt_cnt >= ref_count) or ((row == 4 and af > del_min_af ) or (row != 4 and ref_base != '-' and af > snp_min_af ) or (ref_base =='-' and af > ins_min_af))){ alt_counts.insert(std::pair(row, alt_cnt)); @@ -233,21 +302,21 @@ int main(int argc, char **argv) { int minor2 = -1; int minor2_count = 0; - for (int row = 0; row < cols[i].base_freq_.size(); ++row) { - if (cols[i].base_freq_[row] > major_count) { + for (int row = 0; row < base_freq_.size(); ++row) { + if (base_freq_[row] > major_count) { minor2 = minor; minor2_count = minor_count; minor_count = major_count; minor = major; - major_count = cols[i].base_freq_[row]; + major_count = base_freq_[row]; major = row; - } else if (cols[i].base_freq_[row] > minor_count) { + } else if (base_freq_[row] > minor_count) { minor2 = minor; minor2_count = minor_count; - minor_count = cols[i].base_freq_[row]; + minor_count = base_freq_[row]; minor = row; - } else if (cols[i].base_freq_[row] > minor2_count) { - minor2_count = cols[i].base_freq_[row]; + } else if (base_freq_[row] > minor2_count) { + minor2_count = base_freq_[row]; minor2 = row; } } @@ -279,6 +348,20 @@ int main(int argc, char **argv) { // { // std::cout << it->first << " " << it->second << std::endl; // } + + + + // auto end_4p1 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_4p1 = end_4p1-start_4p1; + // std::time_t end_time_4p1 = std::chrono::system_clock::to_time_t(end_4p1); + // std::cout << "elapsed_4p1 time: " << elapsed_seconds_4p1.count() << "s\n"; + // auto start_4p2 = std::chrono::system_clock::now(); + + if ( alt_counts.size() > 0 ){ + var_cols.push_back(i); + last_var_pos = ginv.left() + cc.UngapPos(i); + } + for(auto it = alt_counts.cbegin(); it != alt_counts.cend(); ++it) { auto var_code_ = it->first; @@ -286,28 +369,46 @@ int main(int argc, char **argv) { auto record_info = "AF="+std::to_string((var_count_)/float(dp))+";DP="+std::to_string(nrow)+";RO="+std::to_string(ref_count)+";AO="+std::to_string(var_count_); auto gtinfo = "0/1:"+std::to_string(nrow)+":"+std::to_string(ref_count)+":"+std::to_string(var_count_); if (calculate_qual_stat){ - record_info += ";ST="+std::to_string(int(round(ref_count*(cols_strand[i].strand_mean[ref_code]/100))))+ \ - ","+std::to_string(int(round(var_count_*(cols_strand[i].strand_mean[var_code_]/100))))+ \ + + auto bqual_mean_ = condensed_array.GetBQMean(i); + auto mqual_mean_ = condensed_array.GetMQMean(i); + auto strand_mean_ = condensed_array.GetStrandMean(i); + auto lsc_mean_ = condensed_array.GetLSCMean(i); + auto rsc_mean_ = condensed_array.GetRSCMean(i); + auto tag_mean_ = condensed_array.GetTagMean(i); + + int rsc_counts=0; + int lsc_counts=0; + for(auto it = lsc_mean_.cbegin(); it != lsc_mean_.cend(); ++it) { + rsc_counts+=*it; + } + for(auto it = rsc_mean_.cbegin(); it != rsc_mean_.cend(); ++it) { + lsc_counts+=*it; + } + + + record_info += ";ST="+std::to_string(int(round(ref_count*(strand_mean_[ref_code]/100))))+ \ + ","+std::to_string(int(round(var_count_*(strand_mean_[var_code_]/100))))+ \ ";LS="+std::to_string(lsc_counts)+\ ";RS="+std::to_string(rsc_counts)+\ - ";NM="+std::to_string(int(round(cols_tag[i].tag_mean[var_code_][0])))+\ - ";AS="+std::to_string(int(round(cols_tag[i].tag_mean[var_code_][1])))+ \ - ";XS="+std::to_string(int(round(cols_tag[i].tag_mean[var_code_][2])))+ \ - ";PR="+std::to_string(int(round(cols_tag[i].tag_mean[var_code_][3])))+ \ - ";CL="+std::to_string(int(round(cols_tag[i].tag_mean[var_code_][4])))+ \ - ";MQ="+std::to_string(int(round(cols_mqual[i].mqual_mean[var_code_])))+ \ - ";BQ="+std::to_string(int(round(cols[i].bqual_mean[var_code_]))); - gtinfo += ":"+std::to_string(int(round(ref_count*(cols_strand[i].strand_mean[ref_code]/100))))+","+ \ - std::to_string(int(round(var_count_*(cols_strand[i].strand_mean[var_code_]/100))))+":"+\ + ";NM="+std::to_string(int(round(tag_mean_[var_code_][0])))+\ + ";AS="+std::to_string(int(round(tag_mean_[var_code_][1])))+ \ + ";XS="+std::to_string(int(round(tag_mean_[var_code_][2])))+ \ + ";PR="+std::to_string(int(round(tag_mean_[var_code_][3])))+ \ + ";CL="+std::to_string(int(round(tag_mean_[var_code_][4])))+ \ + ";MQ="+std::to_string(int(round(mqual_mean_[var_code_])))+ \ + ";BQ="+std::to_string(int(round(bqual_mean_[var_code_]))); + gtinfo += ":"+std::to_string(int(round(ref_count*(strand_mean_[ref_code]/100))))+","+ \ + std::to_string(int(round(var_count_*(strand_mean_[var_code_]/100))))+":"+\ std::to_string(lsc_counts)+":"+\ std::to_string(rsc_counts)+":"+\ - std::to_string(int(round(cols_tag[i].tag_mean[var_code_][0])))+":"+\ - std::to_string(int(round(cols_tag[i].tag_mean[var_code_][1])))+":"+\ - std::to_string(int(round(cols_tag[i].tag_mean[var_code_][2])))+":"+\ - std::to_string(int(round(cols_tag[i].tag_mean[var_code_][3])))+":"+\ - std::to_string(int(round(cols_tag[i].tag_mean[var_code_][4])))+":"+\ - std::to_string(int(round(cols_mqual[i].mqual_mean[var_code_])))+":"+\ - std::to_string(int(round(cols[i].bqual_mean[var_code_]))); + std::to_string(int(round(tag_mean_[var_code_][0])))+":"+\ + std::to_string(int(round(tag_mean_[var_code_][1])))+":"+\ + std::to_string(int(round(tag_mean_[var_code_][2])))+":"+\ + std::to_string(int(round(tag_mean_[var_code_][3])))+":"+\ + std::to_string(int(round(tag_mean_[var_code_][4])))+":"+\ + std::to_string(int(round(mqual_mean_[var_code_])))+":"+\ + std::to_string(int(round(bqual_mean_[var_code_]))); } auto var_base = nuc_code_char[var_code_]; if (ref_base == '-') {ref_base = 'N';} @@ -330,20 +431,265 @@ int main(int argc, char **argv) { } appendValue(record.genotypeInfos, gtinfo); vcf_writer.Write(record); - if (opts.verbosity()>0){ + if (opts.verbosity()>1){ std::cout<<"var: " << i << "," << var_ref_pos << ","<< ref_base << "," << var_base<<","< elapsed_seconds_4p2 = end_4p2-start_4p2; + // std::time_t end_time_4p2 = std::chrono::system_clock::to_time_t(end_4p2); + // std::cout << "elapsed_4p2 time: " << elapsed_seconds_4p2.count() << "s\n"; + + } + + if (last_var_pos>=0){ + if (ginv.right()>(last_var_pos+matrix_base_pad+2)){ + last_var_pos=-1; + } + } + + var_cols.push_back(std::max(0,int(ncol)-1)); + // for (auto i=std::max(processed_col+1,j-matrix_base_pad-1); i < std::min(int(ncol),j+matrix_base_pad+1); ++i){ + + std::vector count_cols; + if (report_count_for_all_positions){ + for (size_t i = 0; i < ncol; i++) { + count_cols.push_back(i); + } + }else{ + int processed_col = -1; + for (auto& j : var_cols){ + // std::cout << j << "," << processed_col << std::endl; + std::vector count_cols_0; + int cnt_0 = 0; + for (auto i=j; i >= (processed_col+1); --i){ + if (ginv.left() + cc.UngapPos(i) >=ginv.right()){ break;} + auto ref_base = ref[i]; + ref_base = std::toupper(ref_base); + auto ref_code = neusomatic::CondensedArray::DnaCharToDnaCode(ref_base); + if (ref_base == 'N') { + ref_base = '-'; + } + count_cols_0.push_back(i); + if (ref_base == '-') { + continue; + } + cnt_0++; + if (cnt_0 >= matrix_base_pad+2){ + break; + } + } + std::reverse(count_cols_0.begin(),count_cols_0.end()); + count_cols.insert(count_cols.end(), count_cols_0.begin(), count_cols_0.end()); + + std::vector count_cols_1; + int cnt_1 = 0; + for (auto i=j+1; i < ncol; ++i){ + if (ginv.left() + cc.UngapPos(i) >=ginv.right()){ break;} + auto ref_base = ref[i]; + ref_base = std::toupper(ref_base); + auto ref_code = neusomatic::CondensedArray::DnaCharToDnaCode(ref_base); + if (ref_base == 'N') { + ref_base = '-'; + } + if (i >= (processed_col+1)){ + count_cols_1.push_back(i); + } + if (ref_base == '-') { + continue; + } + cnt_1++; + if (cnt_1 >= matrix_base_pad+1){ + break; + } + } + count_cols.insert(count_cols.end(), count_cols_1.begin(), count_cols_1.end()); + if (! count_cols.empty()) { + processed_col = count_cols.back(); + // std::cout << ginv.left() + cc.UngapPos(processed_col) << std::endl; + } + // for (auto& iii : count_cols_0){ + // std::cout << "count_cols_0, " << iii << std::endl; + // } + // for (auto& iii : count_cols_1){ + // std::cout << "count_cols_1, " << iii << std::endl; + // } + + } + } + + auto end_5 = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_5 = end_5-start_5; + std::time_t end_time_5 = std::chrono::system_clock::to_time_t(end_5); + if (timestamp){ + std::cout << "elapsed_5 time: " << elapsed_seconds_5.count() << "s\n"; + } + auto start_6 = std::chrono::system_clock::now(); + + for (auto& i : count_cols){ + auto ref_base = ref[i]; + ref_base = std::toupper(ref_base); + auto ref_code = neusomatic::CondensedArray::DnaCharToDnaCode(ref_base); + + if (ref_base == 'N') { + ref_base = '-'; + } + auto start_pos=ginv.left() + cc.UngapPos(i); + if (ref_base!='-'){ + start_pos++; + } + + auto base_freq_ = condensed_array.GetBaseFreq(i); + + std::vector pileup_counts(base_freq_.size()); + int total_count=0; + for (int base = 0; base < (int) base_freq_.size(); ++base) { + pileup_counts[base] = base_freq_[base]; + total_count+=base_freq_[base]; + } + + + if (total_count==0) {continue;} + + if (calculate_qual_stat){ + + auto bqual_mean_ = condensed_array.GetBQMean(i); + auto mqual_mean_ = condensed_array.GetMQMean(i); + auto strand_mean_ = condensed_array.GetStrandMean(i); + auto lsc_mean_ = condensed_array.GetLSCMean(i); + auto rsc_mean_ = condensed_array.GetRSCMean(i); + auto tag_mean_ = condensed_array.GetTagMean(i); + + + // count_out_writer< elapsed_seconds_6 = end_6-start_6; + std::time_t end_time_6 = std::chrono::system_clock::to_time_t(end_6); + if (timestamp){ + std::cout << "elapsed_6 time: " << elapsed_seconds_6.count() << "s\n"; + } + + auto end_0_for = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_0_for = end_0_for-start_0_for; + std::time_t end_time_0_for = std::chrono::system_clock::to_time_t(end_0_for); + if (timestamp){ + std::cout << "elapsed_0_for time: " << elapsed_seconds_0_for.count() << "s\n"; } } //end for + auto end_0_while = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_0_while = end_0_while-start_0_while; + std::time_t end_time_0_while = std::chrono::system_clock::to_time_t(end_0_while); + if (timestamp){ + std::cout << "elapsed_0_while time: " << elapsed_seconds_0_while.count() << "s\n"; + } } //end while + + + auto start_w = std::chrono::system_clock::now(); + + // count_out_writer << count_out_str; + + auto end_w = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_w = end_w-start_w; + std::time_t end_time_w = std::chrono::system_clock::to_time_t(end_w); + std::cout << "elapsed_w time: " << elapsed_seconds_w.count() << "s\n"; + + + + auto start_w2 = std::chrono::system_clock::now(); + count_out_writer.close(); + + auto end_w2 = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_w2 = end_w2-start_w2; + std::time_t end_time_w2 = std::chrono::system_clock::to_time_t(end_w2); + std::cout << "elapsed_w2 time: " << elapsed_seconds_w2.count() << "s\n"; + + + + + auto end_main = std::chrono::system_clock::now(); + std::chrono::duration elapsed_seconds_main = end_main-start_main; + std::time_t end_time_main = std::chrono::system_clock::to_time_t(end_main); + std::cout << "elapsed_main time: " << elapsed_seconds_main.count() << "s\n"; + return 0; } diff --git a/neusomatic/include/Options.h b/neusomatic/include/Options.h index c7b9a6f..9227ed1 100644 --- a/neusomatic/include/Options.h +++ b/neusomatic/include/Options.h @@ -30,6 +30,7 @@ namespace neusomatic { {"filter_mate_unmapped", no_argument, 0, 'F'}, {"filter_improper_orientation", no_argument, 0, 'G'}, {"report_all_alleles", no_argument, 0, 'A'}, + {"report_count_for_all_positions", no_argument, 0, 'C'}, {0, 0, 0, 0} // terminator }; @@ -60,6 +61,7 @@ namespace neusomatic { std::cerr<< "-F/--filter_mate_unmapped, filter reads with unmapeed mates if the flag is set, default is False.\n"; std::cerr<< "-G/--filter_improper_orientation, filter reads with improper orientation (not FR) or different chrom, default is False.\n"; std::cerr<< "-A/--report_all_alleles, report all alleles per column, default is False.\n"; + std::cerr<< "-C/--report_count_for_all_positions, report counts for all positions, default is False.\n"; } int parseInt(const char* optarg, int lower, const char *errmsg, void (*print_help)()) { @@ -168,6 +170,9 @@ namespace neusomatic { case 'A': opt.report_all_alleles() = true; break; + case 'C': + opt.report_count_for_all_positions() = true; + break; case 'a': opt.min_allele_freq() = parseFloat(optarg, 0.0, 1.0, "-a/--min_af must be between 0 and 1", print_help); break; @@ -342,6 +347,14 @@ struct Options { return (report_all_alleles_); } + decltype(auto) report_count_for_all_positions() const { + return (report_count_for_all_positions_); + } + + decltype(auto) report_count_for_all_positions() { + return (report_count_for_all_positions_); + } + private: unsigned verbosity_ = 0; std::string bam_in_; @@ -364,6 +377,7 @@ struct Options { bool filter_mate_unmapped_ = false; bool filter_improper_orientation_ = false; bool report_all_alleles_ = false; + bool report_count_for_all_positions_ = false; }; }//namespace neusomatic diff --git a/neusomatic/include/msa_utils.hpp b/neusomatic/include/msa_utils.hpp index 8ac2278..da68f4a 100644 --- a/neusomatic/include/msa_utils.hpp +++ b/neusomatic/include/msa_utils.hpp @@ -99,22 +99,33 @@ class CondensedArray{ return (cspace_); } - decltype(auto) GetColSpaceMQual() const { - return (cspace_); + decltype(auto) GetBaseFreq(int i) const { + return (cspace_[i].base_freq_); } - decltype(auto) GetColSpaceStrand() const { - return (cspace_); + + decltype(auto) GetBQMean(int i) const { + return (cspace_[i].bqual_mean); } - decltype(auto) GetColSpaceLSC() const { - return (cspace_); + + decltype(auto) GetMQMean(int i) const { + return (cspace_[i].mqual_mean); } - decltype(auto) GetColSpaceRSC() const { - return (cspace_); + + decltype(auto) GetStrandMean(int i) const { + return (cspace_[i].strand_mean); } - decltype(auto) GetColSpaceTag() const { - return (cspace_); + + decltype(auto) GetLSCMean(int i) const { + return (cspace_[i].lsc_mean); } + decltype(auto) GetRSCMean(int i) const { + return (cspace_[i].rsc_mean); + } + + decltype(auto) GetTagMean(int i) const { + return (cspace_[i].tag_mean); + } size_t ncol() const { return cspace_.size(); @@ -150,32 +161,90 @@ class CondensedArray{ cspace_(msa.ncol()), gapped_ref_str_(msa.gapped_ref_str()) { + + // auto start_3p0 = std::chrono::system_clock::now(); + + + for (int pp = 0; pp < msa.ncol(); ++pp) { + cspace_[pp].base_freq_[MISSING_CODE] = nrow_; + } + for (size_t i = 0; i < nrow_; ++i) { + // auto start_3p0p0 = std::chrono::system_clock::now(); + auto const& r = msa.bam_records()[i]; auto const& cigar = r.GetCigar(); const auto seq_qual = msa.GetGappedSeqAndQual(r); const auto& seq = seq_qual.first; + // -1,+1 for INS at the begin or end of a read auto s = msa.GapPosition(std::max(r.Position() - 1 , msa.ref_gaps().left()) - msa.ref_gaps().left()); auto e = msa.GapPosition(std::min(r.PositionEnd() + 1, msa.ref_gaps().right()) - msa.ref_gaps().left() - 1); - for (int pp = 0; pp < s; ++pp) { - ++cspace_[pp].base_freq_[MISSING_CODE]; - } - for (int pp = e + 1; pp < msa.ncol(); ++pp){ - ++cspace_[pp].base_freq_[MISSING_CODE]; - } + // for (int pp = 0; pp < s; ++pp) { + // ++cspace_[pp].base_freq_[MISSING_CODE]; + // } + + + // for (int pp = e + 1; pp < msa.ncol(); ++pp){ + // ++cspace_[pp].base_freq_[MISSING_CODE]; + // } + + // auto end_3p0p0 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0p0 = end_3p0p0-start_3p0p0; + // std::time_t end_time_3p0p0 = std::chrono::system_clock::to_time_t(end_3p0p0); + // std::cout << "elapsed_3p0p0 time: " << elapsed_seconds_3p0p0.count() << "s\n"; + // auto start_3p0p1 = std::chrono::system_clock::now(); + for (int pp = s; pp <= e; ++pp) { + --cspace_[pp].base_freq_[MISSING_CODE]; ++cspace_[pp].base_freq_[DnaCharToDnaCode(seq[pp])]; } + + // for (int pp = s; pp <= e; ++pp) { + // std::map alt_counts; + // int dp = 0; + // auto ref_base = ref[pp]; + // Compute_ALTs(alt_counts, cspace_[pp].base_freq_, dp, ref_base, report_all_alleles, del_min_af, snp_min_af, ins_min_af); + + + // auto end_3p0p1 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0p1 = end_3p0p1-start_3p0p1; + // std::time_t end_time_3p0p1 = std::chrono::system_clock::to_time_t(end_3p0p1); + // std::cout << "elapsed_3p0p1 time: " << elapsed_seconds_3p0p1.count() << "s\n"; + // auto start_3p0p2 = std::chrono::system_clock::now(); + if (calculate_qual) { + // auto start_3p0p2p0 = std::chrono::system_clock::now(); + const auto& qual = seq_qual.second; int strand = (int) !r.ReverseFlag(); + + // auto end_3p0p2p0 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0p2p0 = end_3p0p2p0-start_3p0p2p0; + // std::time_t end_time_3p0p2p0 = std::chrono::system_clock::to_time_t(end_3p0p2p0); + // std::cout << "elapsed_3p0p2p0 time: " << elapsed_seconds_3p0p2p0.count() << "s\n"; + // auto start_3p0p2p1 = std::chrono::system_clock::now(); + const auto clips = msa.GetClipping(r); const auto tags = msa.GetTags(r, 5); + + // auto end_3p0p2p1 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0p2p1 = end_3p0p2p1-start_3p0p2p1; + // std::time_t end_time_3p0p2p1 = std::chrono::system_clock::to_time_t(end_3p0p2p1); + // std::cout << "elapsed_3p0p2p1 time: " << elapsed_seconds_3p0p2p1.count() << "s\n"; + // auto start_3p0p2p2 = std::chrono::system_clock::now(); + if (clips.first != -1) cspace_[clips.first].lsc_mean[DnaCharToDnaCode(seq[clips.first])] ++; if (clips.second != -1) cspace_[clips.second].rsc_mean[DnaCharToDnaCode(seq[clips.second])] ++; + + // auto end_3p0p2p2 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0p2p2 = end_3p0p2p2-start_3p0p2p2; + // std::time_t end_time_3p0p2p2 = std::chrono::system_clock::to_time_t(end_3p0p2p2); + // std::cout << "elapsed_3p0p2p2 time: " << elapsed_seconds_3p0p2p2.count() << "s\n"; + // auto start_3p0p2p3 = std::chrono::system_clock::now(); + for (int pp = s; pp <= e; ++pp) { cspace_[pp].bqual_mean[DnaCharToDnaCode(seq[pp])] += float(qual[pp] - 33) ; cspace_[pp].strand_mean[DnaCharToDnaCode(seq[pp])] += strand; @@ -184,9 +253,28 @@ class CondensedArray{ cspace_[pp].tag_mean[ DnaCharToDnaCode(seq[pp]) ][ii] += tags[ii]; } } + + // auto end_3p0p2p3 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0p2p3 = end_3p0p2p3-start_3p0p2p3; + // std::time_t end_time_3p0p2p3 = std::chrono::system_clock::to_time_t(end_3p0p2p3); + // std::cout << "elapsed_3p0p2p3 time: " << elapsed_seconds_3p0p2p3.count() << "s\n"; + } + + // auto end_3p0p2 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0p2 = end_3p0p2-start_3p0p2; + // std::time_t end_time_3p0p2 = std::chrono::system_clock::to_time_t(end_3p0p2); + // std::cout << "elapsed_3p0p2 time: " << elapsed_seconds_3p0p2.count() << "s\n"; + + }//end for + // auto end_3p0 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p0 = end_3p0-start_3p0; + // std::time_t end_time_3p0 = std::chrono::system_clock::to_time_t(end_3p0); + // std::cout << "elapsed_3p0 time: " << elapsed_seconds_3p0.count() << "s\n"; + // auto start_3p1 = std::chrono::system_clock::now(); + if (calculate_qual) { for (size_t ii = 0; ii < msa.ncol(); ++ii) { auto & col = cspace_[ii]; @@ -202,6 +290,12 @@ class CondensedArray{ } } } + + // auto end_3p1 = std::chrono::system_clock::now(); + // std::chrono::duration elapsed_seconds_3p1 = end_3p1-start_3p1; + // std::time_t end_time_3p1 = std::chrono::system_clock::to_time_t(end_3p1); + // std::cout << "elapsed_3p1 time: " << elapsed_seconds_3p1.count() << "s\n"; + } @@ -475,6 +569,18 @@ std::string add_qual_col(auto & data_array, bool is_int=false){ return ret; } + +void add_qual_col(auto & count_writer, auto & data_array, bool is_int, const std::string& end_str ){ + auto sep = ":"; + int order [5] = { 4, 0, 1, 2, 3 }; + if (is_int){ + count_writer << (data_array[order[0]])<<":"<<(data_array[order[1]])<<":"<<(data_array[order[2]])<<":"<<(data_array[order[3]])<<":"<<(data_array[order[4]])<(result, ":")); + // cout << result< pileup_counts(base_freq_.size()); +// int total_count=0; +// for (int base = 0; base < (int) base_freq_.size(); ++base) { +// pileup_counts[base] = base_freq_[base]; +// total_count+=base_freq_[base]; +// } + +// if (total_count==0) { +// return; +// } +// auto ref_count = base_freq_[ref_code]; +// auto var_code = ref_code; +// int var_count = 0; +// dp = ref_count; +// if (report_all_alleles and ref_base != '-'){ +// for (int row = 0; row < base_freq_.size(); ++row) { +// auto alt_cnt = base_freq_[row]; +// if (( row != ref_code) and (alt_cnt > 0)){ +// auto af = alt_cnt/float(alt_cnt+ref_count); +// if ((alt_cnt >= ref_count) or ((row == 4 and af > del_min_af ) or +// (row != 4 and ref_base != '-' and af > snp_min_af ) or +// (ref_base =='-' and af > ins_min_af))){ +// alt_counts.insert(std::pair(row, alt_cnt)); +// dp += alt_cnt; +// } +// } +// } +// }else{ +// int major = -1; +// int major_count = 0; +// int minor = -1; +// int minor_count = 0; +// int minor2 = -1; +// int minor2_count = 0; + +// for (int row = 0; row < base_freq_.size(); ++row) { +// if (base_freq_[row] > major_count) { +// minor2 = minor; +// minor2_count = minor_count; +// minor_count = major_count; +// minor = major; +// major_count = base_freq_[row]; +// major = row; +// } else if (base_freq_[row] > minor_count) { +// minor2 = minor; +// minor2_count = minor_count; +// minor_count = base_freq_[row]; +// minor = row; +// } else if (base_freq_[row] > minor2_count) { +// minor2_count = base_freq_[row]; +// minor2 = row; +// } +// } + +// if (minor != -1 and major != -1){ +// if (minor2 != -1 and ref_code == major and minor == 4 and ref_code != 4 ){ +// if (minor2_count>0.5*minor_count){ +// minor = minor2; +// minor_count = minor2_count; +// } +// } +// } +// auto af = minor_count/float(major_count+minor_count); +// if (major != ref_code){ +// var_code = major; +// var_count = major_count; +// } else if (minor != ref_code and ( (minor == 4 and af > del_min_af ) or +// (minor != 4 and ref_base != '-' and af > snp_min_af ) or +// (ref_base =='-' and af > ins_min_af))){ +// var_code = minor; +// var_count = minor_count; +// } +// if (var_count > 0) { +// alt_counts.insert(std::pair(var_code,var_count)); +// dp += var_count; +// } +// } +// } + + }// end neusomatic #endif diff --git a/neusomatic/python/generate_dataset.py b/neusomatic/python/generate_dataset.py index 0753a8f..9aca780 100755 --- a/neusomatic/python/generate_dataset.py +++ b/neusomatic/python/generate_dataset.py @@ -18,6 +18,7 @@ import numpy as np import pysam from PIL import Image +from random import shuffle from split_bed import split_region from utils import concatenate_vcfs, get_chromosomes_order, run_bedtools_cmd, vcf_2_bed, bedtools_sort, bedtools_window, bedtools_intersect, bedtools_slop, get_tmp_file, skip_empty @@ -26,6 +27,8 @@ NUC_to_NUM_tabix = {"A": 1, "C": 2, "G": 3, "T": 4, "-": 0} +import time + def get_type(ref, alt): logger = logging.getLogger(get_type.__name__) len_diff = len(ref) - len(alt.split(",")[0]) @@ -36,20 +39,28 @@ def get_type(ref, alt): else: return "SNP" - -def get_variant_matrix_tabix(ref_file, count_bed, record, matrix_base_pad, chrom_lengths): +def get_variant_matrix_tabix(ref_seq, count_info, record, matrix_base_pad, chrom_lengths): logger = logging.getLogger(get_variant_matrix_tabix.__name__) chrom, pos, ref, alt = record[0:4] - fasta_file = pysam.Fastafile(ref_file) - try: - tb = pysam.TabixFile(count_bed, parser=pysam.asTuple()) - tabix_records = tb.fetch( - chrom, max(pos - matrix_base_pad, 0), min(pos + matrix_base_pad, chrom_lengths[chrom] - 2)) - except: - logger.warning("No count information at {}:{}-{} for {}".format(chrom, - pos - matrix_base_pad, pos + matrix_base_pad, count_bed)) - tabix_records = [] - + # logger.info("vvv-1") + # fasta_file = pysam.Fastafile(ref_file) + # logger.info("vvv-2") + # try: + # tb = pysam.TabixFile(count_info, parser=pysam.asTuple()) + # tabix_records = tb.fetch( + # chrom, max(pos - matrix_base_pad, 0), min(pos + matrix_base_pad, chrom_lengths[chrom] - 2)) + # except: + # logger.warning("No count information at {}:{}-{} for {}".format(chrom, + # pos - matrix_base_pad, pos + matrix_base_pad, count_info)) + # tabix_records = [] + + t1=time.time() + + tabix_records=[] + for pos_ in sorted(count_info.keys()): + # print([record[0:4],pos_]) + tabix_records.extend(count_info[pos_]) + # logger.info("vvv-3") matrix_ = [] bq_matrix_ = [] mq_matrix_ = [] @@ -60,19 +71,40 @@ def get_variant_matrix_tabix(ref_file, count_bed, record, matrix_base_pad, chrom ref_array = [] col_pos_map = {} cnt = 0 - curr_pos = max(1, pos - matrix_base_pad) + s_pos = max(1, pos - matrix_base_pad) + curr_pos = s_pos + + # logger.info(["rrr-11",time.time()-t1]) + t1=time.time() + tabix_records=list(tabix_records) + # logger.info(["rrr-12",time.time()-t1]) + # logger.info("vvv-30") + t1=time.time() + t2=time.time() + t0=time.time() for rec in tabix_records: + # print(rec) + # logger.info(["rrr-13",time.time()-t1]) + t1=time.time() pos_ = int(rec[1]) if pos_ > pos + matrix_base_pad: + # logger.info(["rrr-19",time.time()-t0]) + t0=time.time() + t1=time.time() continue ref_base = rec[3] if ref_base.upper() not in "ACGT-": ref_base = "-" if pos_ in col_pos_map and ref_base != "-": + # logger.info(["rrr-19",time.time()-t0]) + t0=time.time() + t1=time.time() continue if pos_ > (curr_pos): - refs = fasta_file.fetch( - chrom, curr_pos - 1, pos_ - 1).upper().replace("N", "-") + # refs = fasta_file.fetch( + # chrom, curr_pos - 1, pos_ - 1).upper().replace("N", "-") + refs = ref_seq[curr_pos-s_pos:pos_-s_pos] + for i in range(curr_pos, pos_): ref_base_ = refs[i - curr_pos] if ref_base_.upper() not in "ACGT-": @@ -91,9 +123,12 @@ def get_variant_matrix_tabix(ref_file, count_bed, record, matrix_base_pad, chrom col_pos_map[i] = cnt cnt += 1 curr_pos = pos_ + # logger.info(["rrr-14",time.time()-t1]) + t1=time.time() if pos_ == (curr_pos) and ref_base == "-" and pos_ not in col_pos_map: - ref_base_ = fasta_file.fetch( - chrom, pos_ - 1, pos_).upper().replace("N", "-") + # ref_base_ = fasta_file.fetch( + # chrom, pos_ - 1, pos_).upper().replace("N", "-") + ref_base_ = ref_seq[pos_-s_pos:pos_-s_pos+1] if ref_base_.upper() not in "ACGT-": ref_base_ = "-" matrix_.append([0, 0, 0, 0, 0]) @@ -110,6 +145,10 @@ def get_variant_matrix_tabix(ref_file, count_bed, record, matrix_base_pad, chrom col_pos_map[pos_] = cnt cnt += 1 curr_pos = pos_ + 1 + # logger.info(["rrr-15",time.time()-t1]) + # logger.info(["rrr-19",time.time()-t0]) + t0=time.time() + t1=time.time() matrix_.append(list(map(int, rec[4].split(":")))) bq_matrix_.append(list(map(int, rec[5].split(":")))) mq_matrix_.append(list(map(int, rec[6].split(":")))) @@ -123,12 +162,16 @@ def get_variant_matrix_tabix(ref_file, count_bed, record, matrix_base_pad, chrom col_pos_map[pos_] = cnt cnt += 1 curr_pos = pos_ + 1 + # logger.info(["rrr-16",time.time()-t1]) + t1=time.time() + # logger.info("vvv-32") end_pos = min(pos + matrix_base_pad, chrom_lengths[chrom] - 2) if curr_pos < pos + matrix_base_pad + 1: - refs = fasta_file.fetch( - chrom, curr_pos - 1, end_pos).upper().replace("N", "-") + # refs = fasta_file.fetch( + # chrom, curr_pos - 1, end_pos).upper().replace("N", "-") + refs = ref_seq[curr_pos-s_pos:end_pos-s_pos+1] for i in range(curr_pos, end_pos + 1): ref_base_ = refs[i - curr_pos] if ref_base_.upper() not in "ACGT-": @@ -148,6 +191,11 @@ def get_variant_matrix_tabix(ref_file, count_bed, record, matrix_base_pad, chrom cnt += 1 curr_pos = end_pos + 1 + # logger.info(["rrr-18",time.time()-t2]) + + t1=time.time() + + matrix_ = np.array(matrix_).transpose() bq_matrix_ = np.array(bq_matrix_).transpose() mq_matrix_ = np.array(mq_matrix_).transpose() @@ -158,9 +206,211 @@ def get_variant_matrix_tabix(ref_file, count_bed, record, matrix_base_pad, chrom tag_matrices_[iii] = np.array(tag_matrices_[iii]).transpose() ref_array = np.array(ref_array) + # logger.info(["vvv-4",matrix_.shape]) + + # logger.info(["rrr-17",time.time()-t1]) + return matrix_, bq_matrix_, mq_matrix_, st_matrix_, lsc_matrix_, rsc_matrix_, tag_matrices_, ref_array, col_pos_map + +# def get_variant_matrix_tabix(ref_seq, count_info, record, matrix_base_pad, chrom_lengths): +# logger = logging.getLogger(get_variant_matrix_tabix.__name__) +# chrom, pos, ref, alt = record[0:4] +# # logger.info("vvv-1") +# # fasta_file = pysam.Fastafile(ref_file) +# # logger.info("vvv-2") +# # try: +# # tb = pysam.TabixFile(count_info, parser=pysam.asTuple()) +# # tabix_records = tb.fetch( +# # chrom, max(pos - matrix_base_pad, 0), min(pos + matrix_base_pad, chrom_lengths[chrom] - 2)) +# # except: +# # logger.warning("No count information at {}:{}-{} for {}".format(chrom, +# # pos - matrix_base_pad, pos + matrix_base_pad, count_info)) +# # tabix_records = [] + +# t1=time.time() + +# tabix_records=[] +# for pos_ in sorted(count_info.keys()): +# # print([record[0:4],pos_]) +# tabix_records.extend(count_info[pos_]) +# # logger.info("vvv-3") +# ref_array = [] +# col_pos_map = {} +# cnt = 0 +# s_pos = max(1, pos - matrix_base_pad) +# curr_pos = s_pos + +# # logger.info(["rrr-11",time.time()-t1]) +# t1=time.time() +# tabix_records=list(tabix_records) +# # logger.info(["rrr-12",time.time()-t1]) +# # logger.info("vvv-30") +# t1=time.time() +# t2=time.time() +# t0=time.time() +# z_s={} +# z_e=0 +# n_=0 +# for i_rec, rec in enumerate(tabix_records): +# z_s[i_rec]=[0,-1] +# # print(rec) +# # logger.info(["rrr-13",time.time()-t1]) +# t1=time.time() +# pos_ = int(rec[1]) +# if pos_ > pos + matrix_base_pad: +# # logger.info(["rrr-19",time.time()-t0]) +# t0=time.time() +# t1=time.time() +# continue +# ref_base = rec[3] +# if ref_base.upper() not in "ACGT-": +# ref_base = "-" +# if pos_ in col_pos_map and ref_base != "-": +# # logger.info(["rrr-19",time.time()-t0]) +# t0=time.time() +# t1=time.time() +# continue +# if pos_ > (curr_pos): +# # refs = fasta_file.fetch( +# # chrom, curr_pos - 1, pos_ - 1).upper().replace("N", "-") +# refs = ref_seq[curr_pos-s_pos:pos_-s_pos] + +# for i in range(curr_pos, pos_): +# ref_base_ = refs[i - curr_pos] +# if ref_base_.upper() not in "ACGT-": +# ref_base_ = "-" +# # G.append([0, 0, 0, 0, 0]) +# # bq_G.append([0, 0, 0, 0, 0]) +# # mq_matrix_.append([0, 0, 0, 0, 0]) +# # st_matrix_.append([0, 0, 0, 0, 0]) +# # lsc_matrix_.append([0, 0, 0, 0, 0]) +# # rsc_matrix_.append([0, 0, 0, 0, 0]) +# # for iii in range(len(tag_matrices_)): +# # tag_matrices_[iii].append([0, 0, 0, 0, 0]) +# z_s[i_rec][0]+=1 +# n_ += 1 +# ref_array.append(NUC_to_NUM_tabix[ref_base_]) +# # if ref_base_ != "-" and i not in col_pos_map: +# if i not in col_pos_map: +# col_pos_map[i] = cnt +# cnt += 1 +# curr_pos = pos_ +# # logger.info(["rrr-14",time.time()-t1]) +# t1=time.time() +# if pos_ == (curr_pos) and ref_base == "-" and pos_ not in col_pos_map: +# # ref_base_ = fasta_file.fetch( +# # chrom, pos_ - 1, pos_).upper().replace("N", "-") +# ref_base_ = ref_seq[pos_-s_pos:pos_-s_pos+1] +# if ref_base_.upper() not in "ACGT-": +# ref_base_ = "-" +# # matrix_.append([0, 0, 0, 0, 0]) +# # bq_matrix_.append([0, 0, 0, 0, 0]) +# # mq_matrix_.append([0, 0, 0, 0, 0]) +# # st_matrix_.append([0, 0, 0, 0, 0]) +# # lsc_matrix_.append([0, 0, 0, 0, 0]) +# # rsc_matrix_.append([0, 0, 0, 0, 0]) +# # for iii in range(len(tag_matrices_)): +# # tag_matrices_[iii].append([0, 0, 0, 0, 0]) +# z_s[i_rec][0]+=1 +# n_ += 1 +# ref_array.append(NUC_to_NUM_tabix[ref_base_]) +# # if ref_base_ != "-" and pos_ not in col_pos_map: +# if pos_ not in col_pos_map: +# col_pos_map[pos_] = cnt +# cnt += 1 +# curr_pos = pos_ + 1 +# # logger.info(["rrr-15",time.time()-t1]) +# # logger.info(["rrr-19",time.time()-t0]) +# t0=time.time() +# t1=time.time() +# z_s[i_rec][1]=n_ +# n_ += 1 +# # matrix_.append(list(map(int, rec[4].split(":")))) +# # bq_matrix_.append(list(map(int, rec[5].split(":")))) +# # mq_matrix_.append(list(map(int, rec[6].split(":")))) +# # st_matrix_.append(list(map(int, rec[7].split(":")))) +# # lsc_matrix_.append(list(map(int, rec[8].split(":")))) +# # rsc_matrix_.append(list(map(int, rec[9].split(":")))) +# # for iii in range(len(tag_matrices_)): +# # tag_matrices_[iii].append(list(map(int, rec[10 + iii].split(":")))) +# ref_array.append(NUC_to_NUM_tabix[ref_base]) +# if ref_base != "-" and pos_ not in col_pos_map: +# col_pos_map[pos_] = cnt +# cnt += 1 +# curr_pos = pos_ + 1 +# # logger.info(["rrr-16",time.time()-t1]) +# t1=time.time() + + +# # logger.info("vvv-32") +# end_pos = min(pos + matrix_base_pad, chrom_lengths[chrom] - 2) + +# if curr_pos < pos + matrix_base_pad + 1: +# # refs = fasta_file.fetch( +# # chrom, curr_pos - 1, end_pos).upper().replace("N", "-") +# refs = ref_seq[curr_pos-s_pos:end_pos-s_pos+1] +# for i in range(curr_pos, end_pos + 1): +# ref_base_ = refs[i - curr_pos] +# if ref_base_.upper() not in "ACGT-": +# ref_base_ = "-" +# # matrix_.append([0, 0, 0, 0, 0]) +# # bq_matrix_.append([0, 0, 0, 0, 0]) +# # mq_matrix_.append([0, 0, 0, 0, 0]) +# # st_matrix_.append([0, 0, 0, 0, 0]) +# # lsc_matrix_.append([0, 0, 0, 0, 0]) +# # rsc_matrix_.append([0, 0, 0, 0, 0]) +# # for iii in range(len(tag_matrices_)): +# # tag_matrices_[iii].append([0, 0, 0, 0, 0]) +# n_ += 1 +# z_e += 1 +# ref_array.append(NUC_to_NUM_tabix[ref_base_]) +# # if ref_base_ != "-" and i not in col_pos_map: +# if i not in col_pos_map: +# col_pos_map[i] = cnt +# cnt += 1 +# curr_pos = end_pos + 1 + +# # logger.info(["rrr-18",time.time()-t2]) +# t1=time.time() + +# matrix_ = np.zeros((5,n_)) +# bq_matrix_ = np.zeros((5,n_)) +# mq_matrix_ = np.zeros((5,n_)) +# st_matrix_ = np.zeros((5,n_)) +# lsc_matrix_ = np.zeros((5,n_)) +# rsc_matrix_ = np.zeros((5,n_)) +# tag_matrices_ = [np.zeros((5,n_)), np.zeros((5,n_)), np.zeros((5,n_)), np.zeros((5,n_)), np.zeros((5,n_))] + +# # logger.info(["rrr-21",time.time()-t1]) +# t1=time.time() + + +# for i in sorted(z_s.keys()): +# if z_s[i][1]>=0: +# rec=tabix_records[i] +# matrix_[:,z_s[i][1]]=[int(x) for x in rec[4].split(":")] +# bq_matrix_[:,z_s[i][1]]=[int(x) for x in rec[5].split(":")] +# mq_matrix_[:,z_s[i][1]]=[int(x) for x in rec[6].split(":")] +# st_matrix_[:,z_s[i][1]]=[int(x) for x in rec[7].split(":")] +# lsc_matrix_[:,z_s[i][1]]=[int(x) for x in rec[8].split(":")] +# rsc_matrix_[:,z_s[i][1]]=[int(x) for x in rec[9].split(":")] +# for iii in range(len(tag_matrices_)): +# tag_matrices_[iii][:,z_s[i][1]]=[int(x) for x in rec[10 + iii].split(":")] + +# # logger.info(["rrr-20",time.time()-t1]) +# t1=time.time() + +# st_matrix_ = (st_matrix_ / 100.0) * matrix_ + +# ref_array = np.array(ref_array) +# # logger.info(["vvv-4",matrix_.shape]) + +# # logger.info(["rrr-17",time.time()-t1]) + +# return matrix_, bq_matrix_, mq_matrix_, st_matrix_, lsc_matrix_, rsc_matrix_, tag_matrices_, ref_array, col_pos_map + def align_tumor_normal_matrices(record, tumor_matrix_, bq_tumor_matrix_, mq_tumor_matrix_, st_tumor_matrix_, lsc_tumor_matrix_, rsc_tumor_matrix_, tag_tumor_matrices_, tumor_ref_array, tumor_col_pos_map, normal_matrix_, @@ -293,21 +543,27 @@ def align_tumor_normal_matrices(record, tumor_matrix_, bq_tumor_matrix_, mq_tumo new_tumor_col_pos_map] -def prepare_info_matrices_tabix(ref_file, tumor_count_bed, normal_count_bed, record, rlen, rcenter, +def prepare_info_matrices_tabix(ref_seq, tumor_count_info, normal_count_info, record, rlen, rcenter, matrix_base_pad, matrix_width, min_ev_frac_per_col, min_cov, chrom_lengths): logger = logging.getLogger(prepare_info_matrices_tabix.__name__) chrom, pos, ref, alt = record[0:4] + t1=time.time() + # logger.info("ttt-1") tumor_matrix_, bq_tumor_matrix_, mq_tumor_matrix_, st_tumor_matrix_, lsc_tumor_matrix_, rsc_tumor_matrix_, tag_tumor_matrices_, tumor_ref_array, tumor_col_pos_map = get_variant_matrix_tabix( - ref_file, tumor_count_bed, record, matrix_base_pad, chrom_lengths) + ref_seq, tumor_count_info, record, matrix_base_pad, chrom_lengths) normal_matrix_, bq_normal_matrix_, mq_normal_matrix_, st_normal_matrix_, lsc_normal_matrix_, rsc_normal_matrix_, tag_normal_matrices_, normal_ref_array, normal_col_pos_map = get_variant_matrix_tabix( - ref_file, normal_count_bed, record, matrix_base_pad, chrom_lengths) + ref_seq, normal_count_info, record, matrix_base_pad, chrom_lengths) + + # logger.info(["rrr-8",time.time()-t1]) + t1=time.time() if not tumor_col_pos_map: logger.warning("Skip {} for all N reference".format(record)) return None + # logger.info("ttt-2") bq_tumor_matrix_[0, np.where(tumor_matrix_.sum(0) == 0)[ 0]] = np.max(bq_tumor_matrix_) bq_normal_matrix_[0, np.where(normal_matrix_.sum(0) == 0)[ @@ -335,6 +591,7 @@ def prepare_info_matrices_tabix(ref_file, tumor_count_bed, normal_count_bed, rec tag_normal_matrices_[iii][0, np.where(normal_matrix_.sum(0) == 0)[ 0]] = np.max(tag_normal_matrices_[iii]) + # logger.info("ttt-3") tumor_matrix_[0, np.where(tumor_matrix_.sum(0) == 0)[ 0]] = max(np.sum(tumor_matrix_, 0)) normal_matrix_[0, np.where(normal_matrix_.sum(0) == 0)[ @@ -349,6 +606,8 @@ def prepare_info_matrices_tabix(ref_file, tumor_count_bed, normal_count_bed, rec for iii in range(len(tag_normal_matrices_)): tag_normal_matrices_[iii][0, :] = np.max(tag_normal_matrices_[iii]) + # logger.info("ttt-4") + tumor_matrix_, bq_tumor_matrix_, mq_tumor_matrix_, st_tumor_matrix_, lsc_tumor_matrix_, rsc_tumor_matrix_, \ tag_tumor_matrices_, normal_matrix_, bq_normal_matrix_, mq_normal_matrix_, st_normal_matrix_, \ lsc_normal_matrix_, rsc_normal_matrix_, tag_normal_matrices_, \ @@ -358,6 +617,9 @@ def prepare_info_matrices_tabix(ref_file, tumor_count_bed, normal_count_bed, rec bq_normal_matrix_, mq_normal_matrix_, st_normal_matrix_, lsc_normal_matrix_, rsc_normal_matrix_, tag_normal_matrices_, normal_ref_array, normal_col_pos_map) + # logger.info(["rrr-9",time.time()-t1]) + t1=time.time() + tw = int(matrix_width) count_column = sum(tumor_matrix_[1:], 0) n_col = tumor_matrix_.shape[1] @@ -561,6 +823,10 @@ def prepare_info_matrices_tabix(ref_file, tumor_count_bed, normal_count_bed, rec center = min(max(0, min(col_pos_map.values()) + rcenter[0] - 1 + rcenter[1]), ref_count_matrix.shape[1] - 1) + # logger.info("ttt-9") + + # logger.info(["rrr-10",time.time()-t1]) + return [tumor_matrix_, tumor_matrix, normal_matrix_, normal_matrix, ref_count_matrix, tumor_count_matrix, bq_tumor_count_matrix, mq_tumor_count_matrix, st_tumor_count_matrix, lsc_tumor_count_matrix, rsc_tumor_count_matrix, tag_tumor_count_matrices, normal_count_matrix, bq_normal_count_matrix, mq_normal_count_matrix, @@ -570,17 +836,21 @@ def prepare_info_matrices_tabix(ref_file, tumor_count_bed, normal_count_bed, rec def prep_data_single_tabix(input_record): - ref_file, tumor_count_bed, normal_count_bed, record, vartype, rlen, rcenter, ch_order, \ - matrix_base_pad, matrix_width, min_ev_frac_per_col, min_cov, ann, chrom_lengths, matrix_dtype = input_record + ref_seq, tumor_count_info, normal_count_info, record, vartype, rlen, rcenter, ch_order, \ + matrix_base_pad, matrix_width, min_ev_frac_per_col, min_cov, ann, chrom_lengths, matrix_dtype, is_none = input_record thread_logger = logging.getLogger( "{} ({})".format(prep_data_single_tabix.__name__, multiprocessing.current_process().name)) try: chrom, pos, ref, alt = record[:4] pos = int(pos) - matrices_info = prepare_info_matrices_tabix(ref_file=ref_file, - tumor_count_bed=tumor_count_bed, - normal_count_bed=normal_count_bed, record=record, rlen=rlen, rcenter=rcenter, + # thread_logger.info("prep-1") + + t1=time.time() + + matrices_info = prepare_info_matrices_tabix(ref_seq=ref_seq, + tumor_count_info=tumor_count_info, + normal_count_info=normal_count_info, record=record, rlen=rlen, rcenter=rcenter, matrix_base_pad=matrix_base_pad, matrix_width=matrix_width, min_ev_frac_per_col=min_ev_frac_per_col, min_cov=min_cov, @@ -593,6 +863,11 @@ def prep_data_single_tabix(input_record): else: return [] + + # thread_logger.info(["rrr-6",time.time()-t1]) + t1=time.time() + + # thread_logger.info("prep-2") candidate_mat = np.zeros((tumor_count_matrix.shape[0], tumor_count_matrix.shape[ 1], 13 + (len(tag_tumor_count_matrices) * 2))) candidate_mat[:, :, 0] = ref_count_matrix @@ -655,6 +930,7 @@ def prep_data_single_tabix(input_record): candidate_mat[:, :, 13 + (iii * 2) + 1] = candidate_mat[:, :, 13 + ( iii * 2) + 1] / (max(np.max(tag_normal_count_matrices[iii]), 100.0)) * max_norm + # thread_logger.info("prep-3") if matrix_dtype == "uint8": candidate_mat = np.maximum(0, np.minimum( candidate_mat, max_norm)).astype(np.uint8) @@ -667,9 +943,15 @@ def prep_data_single_tabix(input_record): raise Exception tag = "{}.{}.{}.{}.{}.{}.{}.{}.{}".format(ch_order, pos, ref[0:55], alt[ 0:55], vartype, center, rlen, tumor_cov, normal_cov) + # thread_logger.info("prep-4") candidate_mat = base64.b64encode( zlib.compress(candidate_mat)).decode('ascii') - return tag, candidate_mat, record[0:4], ann + # thread_logger.info("prep-5") + + # thread_logger.info(["rrr-7",time.time()-t1]) + + + return tag, candidate_mat, record[0:4], ann, is_none except Exception as ex: thread_logger.error(traceback.format_exc()) thread_logger.error(ex) @@ -1642,6 +1924,94 @@ def extract_ensemble(ensemble_tsvs, ensemble_bed, no_seq_complexity, enforce_hea f_.write("\t".join(map(str, ensemble_pos[i] + s)) + "\n") return ensemble_bed +def chunks(lst, n): + for i in range(0, len(lst), n): + yield lst[i:i + n] + + + + +def parallel_generation(inputs): + map_args, matrix_base_pad, chrom_lengths, tumor_count_bed, normal_count_bed = inputs + + thread_logger = logging.getLogger( + "{} ({})".format(parallel_generation.__name__, multiprocessing.current_process().name)) + try: + chrom_pos={} + for w in map_args: + record = w[3] + chrom = record[0] + pos = int(record[1]) + s_pos = max(1, pos - matrix_base_pad) + e_pos = min(pos + matrix_base_pad, chrom_lengths[chrom] - 2) + if chrom not in chrom_pos: + chrom_pos[chrom] = [s_pos, e_pos] + else: + chrom_pos[chrom] = [min(s_pos,chrom_pos[chrom][0]), max(e_pos,chrom_pos[chrom][1])] + thread_logger.info(chrom_pos) + + thread_logger.info("Gener-7") + tb_tumor = pysam.TabixFile(tumor_count_bed, parser=pysam.asTuple()) + tb_normal = pysam.TabixFile(normal_count_bed, parser=pysam.asTuple()) + + tumor_tabix_records_dict={} + normal_tabix_records_dict={} + for chrom in chrom_pos: + t2=time.time() + tumor_tabix_records_dict[chrom]={} + normal_tabix_records_dict[chrom]={} + try: + tumor_tabix_records = list(tb_tumor.fetch(chrom,chrom_pos[chrom][0]-1,chrom_pos[chrom][1])) + except: + thread_logger.warning("No count information at {} for {}:{}-{}".format(tumor_count_bed,chrom,chrom_pos[chrom][0]-1,chrom_pos[chrom][1])) + tumor_tabix_records = [] + try: + normal_tabix_records = list(tb_normal.fetch(chrom,chrom_pos[chrom][0]-1,chrom_pos[chrom][1])) + except: + thread_logger.warning("No count information at {} for {}:{}-{}".format(normal_count_bed,chrom,chrom_pos[chrom][0]-1,chrom_pos[chrom][1])) + normal_tabix_records = [] + # thread_logger.info(["ffff-1",time.time()-t2]) + t2=time.time() + + for x in tumor_tabix_records: + pos = int(x[1]) + if pos not in tumor_tabix_records_dict[chrom]: + tumor_tabix_records_dict[chrom][pos]=[] + tumor_tabix_records_dict[chrom][pos].append(list(x)) + for x in normal_tabix_records: + pos = int(x[1]) + if pos not in normal_tabix_records_dict[chrom]: + normal_tabix_records_dict[chrom][pos]=[] + normal_tabix_records_dict[chrom][pos].append(list(x)) + # thread_logger.info(["ffff-2",time.time()-t2]) + t2=time.time() + del tumor_tabix_records, normal_tabix_records + # thread_logger.info(["ffff-3",time.time()-t2]) + + thread_logger.info("Gener-8") + for i_ in range(len(map_args)): + w = map_args[i_] + record = w[3] + chrom = record[0] + pos = int(record[1]) + s_pos = max(1, pos - matrix_base_pad) + e_pos = min(pos + matrix_base_pad, chrom_lengths[chrom] - 2) + tumor_counts ={x_pos:tumor_tabix_records_dict[chrom][x_pos] for x_pos in range(s_pos,e_pos+1) if x_pos in tumor_tabix_records_dict[chrom]} + normal_counts ={x_pos:normal_tabix_records_dict[chrom][x_pos] for x_pos in range(s_pos,e_pos+1) if x_pos in normal_tabix_records_dict[chrom]} + map_args[i_][1] = tumor_counts + map_args[i_][2] = normal_counts + + records_done=[] + for w in map_args: + o=prep_data_single_tabix(w) + if o is None: + aaaa + records_done.append(o) + return records_done + except Exception as ex: + thread_logger.error(traceback.format_exc()) + thread_logger.error(ex) + return None def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_bed_file, tumor_count_bed, normal_count_bed, ref_file, matrix_width, matrix_base_pad, min_ev_frac_per_col, min_cov, num_threads, ensemble_tsv, @@ -1655,6 +2025,10 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be logger = logging.getLogger(generate_dataset.__name__) logger.info("---------------------Generate Dataset----------------------") + logger.info(tumor_count_bed) + + t1=time.time() + logger.info("Gener-0") if not os.path.exists(work): os.mkdir(work) @@ -1700,7 +2074,9 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be logger.info("len_candids: {}".format(len_candids)) num_splits = max(len_candids // split_batch_size, num_threads) split_region_files = split_region( - work, region_bed_file, num_splits, shuffle_intervals=True) + work, region_bed_file, num_splits, + shuffle_intervals=False + ) fasta_file = pysam.Fastafile(ref_file) chrom_lengths = dict(zip(fasta_file.references, fasta_file.lengths)) @@ -1715,7 +2091,11 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be x = i_f.readline().strip().split() if x: num_ens_features = len(x) - 5 + + # logger.info(["rrr-1",time.time()-t1]) + t1 = time.time() + logger.info("Gener-1") pool = multiprocessing.Pool(num_threads) map_args = [] for i, split_region_file in enumerate(split_region_files): @@ -1734,6 +2114,7 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be if o is None: raise Exception("find_records failed!") + none_vcf = "{}/none.vcf".format(work) var_vcf = "{}/var.vcf".format(work) if not os.path.exists(work): @@ -1743,12 +2124,14 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be for records_r, none_records, vtype, record_len, record_center, chroms_order, anns in records_data: total_ims += len(records_r) + len(none_records) + logger.info("Gener-2") candidates_split = int(total_ims // tsv_batch_size) + 1 is_split = total_ims // candidates_split with open(var_vcf, "w") as vv, open(none_vcf, "w") as nv: is_current = 0 is_ = -1 while is_ < candidates_split: + logger.info("Gener-3") is_ += 1 cnt = -1 if is_ < candidates_split - 1: @@ -1761,94 +2144,73 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be is_end, total_ims)) pool = multiprocessing.Pool(num_threads) map_args_records = [] - map_args_nones = [] for records_r, none_records, vtype, record_len, record_center, chroms_order, anns in records_data: - if len(records_r) + cnt < is_current: - cnt += len(records_r) - else: - for record in records_r: - cnt += 1 - if is_current <= cnt < is_end: - vartype = vtype[int(record[-1])] - rlen = record_len[int(record[-1])] - rcenter = record_center[int(record[-1])] - ch_order = chroms_order[record[0]] - ann = list(anns[int(record[-1])] - ) if ensemble_bed else [] - map_args_records.append((ref_file, tumor_count_bed, normal_count_bed, record, vartype, rlen, rcenter, ch_order, - matrix_base_pad, matrix_width, min_ev_frac_per_col, min_cov, ann, chrom_lengths, matrix_dtype)) - if cnt >= is_end: - break - if cnt >= is_end: - break - if cnt >= is_end: - break - - if len(none_records) + cnt < is_current: - cnt += len(none_records) + if len(records_r) + len(none_records) + cnt < is_current: + cnt += len(records_r)+len(none_records) else: - for record in none_records: - cnt += 1 - if is_current <= cnt < is_end: - rcenter = record_center[int(record[-1])] - ch_order = chroms_order[record[0]] - ann = list(anns[int(record[-1])] - ) if ensemble_bed else [] - map_args_nones.append((ref_file, tumor_count_bed, normal_count_bed, record, "NONE", - 0, rcenter, ch_order, - matrix_base_pad, matrix_width, min_ev_frac_per_col, min_cov, ann, chrom_lengths, matrix_dtype)) - if cnt >= is_end: - break + for is_none, records in [["False", records_r],["True",none_records]]: + for record in records: + cnt += 1 + if is_current <= cnt < is_end: + vartype = vtype[int(record[-1])] if not is_none else "NONE" + rlen = record_len[int(record[-1])] if not is_none else 0 + rcenter = record_center[int(record[-1])] + ch_order = chroms_order[record[0]] + ann = list(anns[int(record[-1])] + ) if ensemble_bed else [] + + chrom, pos = record[0:2] + pos = int(pos) + s_pos = max(1, pos - matrix_base_pad) + e_pos = min(pos + matrix_base_pad, chrom_lengths[chrom] - 2) + ref_seq = fasta_file.fetch(chrom, s_pos-1, e_pos).upper().replace("N", "-") + + map_args_records.append([ref_seq, None, None, record, vartype, rlen, rcenter, ch_order, + matrix_base_pad, matrix_width, min_ev_frac_per_col, min_cov, ann, chrom_lengths, matrix_dtype, is_none]) + if cnt >= is_end: + break if cnt >= is_end: break if cnt >= is_end: break - try: - records_done = pool.map_async( - prep_data_single_tabix, map_args_records).get() - pool.close() - except Exception as inst: - logger.error(inst) - pool.close() - traceback.print_exc() - raise Exception - - for o in records_done: - if o is None: - raise Exception("prep_data_single_tabix failed!") - - pool = multiprocessing.Pool(num_threads) - try: - none_records_done = pool.map_async( - prep_data_single_tabix, map_args_nones).get() - pool.close() - except Exception as inst: - logger.error(inst) - pool.close() - traceback.print_exc() - raise Exception - - for o in none_records_done: - if o is None: - raise Exception("prep_data_single_tabix failed!") + logger.info("Gener-4") + + len_records = len(map_args_records) + records_done = [] + if len_records>0: + logger.info("Gener-9") + pool = multiprocessing.Pool(num_threads) + try: + records_done_ = pool.map_async( + parallel_generation, [[map_args_records[i_split:i_split+(len_records//num_threads)],matrix_base_pad, chrom_lengths, tumor_count_bed, normal_count_bed] + for i_split in range(0, len_records, len_records//num_threads)]).get() + pool.close() + except Exception as inst: + logger.error(inst) + pool.close() + traceback.print_exc() + raise Exception + + for o in records_done_: + if o is None: + raise Exception("parallel_generation failed!") + records_done.extend(o) + + shuffle(records_done) + logger.info("Gener-10") cnt_ims = 0 tsv_idx = [] with open(candidates_tsv_file, "w") as b_o: for x in records_done: if x: - tag, compressed_candidate_mat, record, ann = x - vv.write("\t".join([record[0], str(record[1]), ".", record[2], record[ - 3], ".", ".", "TAG={};".format(tag), ".", "."]) + "\n") - tsv_idx.append(b_o.tell()) - b_o.write("\t".join([str(cnt_ims), "1", tag, compressed_candidate_mat] + list(map( - lambda x: str(np.round(x, 4)), ann))) + "\n") - cnt_ims += 1 - for x in none_records_done: - if x: - tag, compressed_candidate_mat, record, ann = x - nv.write("\t".join([record[0], str(record[1]), ".", record[2], record[ - 3], ".", ".", "TAG={};".format(tag), ".", "."]) + "\n") + tag, compressed_candidate_mat, record, ann, is_none = x + if not is_none: + vv.write("\t".join([record[0], str(record[1]), ".", record[2], record[ + 3], ".", ".", "TAG={};".format(tag), ".", "."]) + "\n") + else: + nv.write("\t".join([record[0], str(record[1]), ".", record[2], record[ + 3], ".", ".", "TAG={};".format(tag), ".", "."]) + "\n") tsv_idx.append(b_o.tell()) b_o.write("\t".join([str(cnt_ims), "1", tag, compressed_candidate_mat] + list(map( lambda x: str(np.round(x, 4)), ann))) + "\n") @@ -1862,9 +2224,14 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be with open(done_flag, "w") as d_f: d_f.write("Done") + # logger.info(["rrr-4",time.time()-t1]) + t1 = time.time() + shutil.rmtree(bed_tempdir) tempfile.tempdir = original_tempdir + # logger.info(["rrr-5",time.time()-t1]) + logger.info("Generating dataset is Done.") if __name__ == '__main__': diff --git a/neusomatic/python/preprocess.py b/neusomatic/python/preprocess.py index 6c5860f..2ae1bf5 100755 --- a/neusomatic/python/preprocess.py +++ b/neusomatic/python/preprocess.py @@ -34,13 +34,15 @@ def process_split_region(tn, work, region, reference, mode, alignment_bam, ins_merge_min_af, merge_r, merge_d_for_scan, report_all_alleles, + report_count_for_all_positions, scan_alignments_binary, restart, num_splits, num_threads, calc_qual, regions=[]): logger = logging.getLogger(process_split_region.__name__) logger.info("Scan bam.") scan_outputs = scan_alignments(work, merge_d_for_scan, scan_alignments_binary, alignment_bam, region, reference, num_splits, num_threads, scan_window_size, scan_maf, - min_mapq, max_dp, report_all_alleles, filter_duplicate, restart=restart, split_region_files=regions, + min_mapq, max_dp, report_all_alleles, report_count_for_all_positions, + filter_duplicate, restart=restart, split_region_files=regions, calc_qual=calc_qual) if filtered_candidates_vcf: logger.info("Filter candidates.") @@ -302,6 +304,7 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, ins_merge_min_af, merge_r, merge_d_for_scan, report_all_alleles, + False, scan_alignments_binary, restart, num_splits, num_threads, calc_qual=False) tumor_counts_without_q, split_regions, filtered_candidates_vcfs_without_q = tumor_outputs_without_q @@ -329,6 +332,7 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, ins_merge_min_af, merge_r, merge_d_for_scan, report_all_alleles, + False, scan_alignments_binary, restart, num_splits, num_threads, calc_qual=True, regions=candidates_split_regions) @@ -358,6 +362,7 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, ins_merge_min_af, merge_r, merge_d_for_scan, report_all_alleles, + True, scan_alignments_binary, restart, num_splits, num_threads, calc_qual=True, regions=candidates_split_regions) diff --git a/neusomatic/python/scan_alignments.py b/neusomatic/python/scan_alignments.py index 49b282f..60c31d1 100755 --- a/neusomatic/python/scan_alignments.py +++ b/neusomatic/python/scan_alignments.py @@ -25,7 +25,7 @@ def run_scan_alignments(record): work, reference, merge_d_for_scan, scan_alignments_binary, split_region_file, \ - input_bam, window_size, maf, min_mapq, max_dp, report_all_alleles, filter_duplicate, calc_qual = record + input_bam, window_size, maf, min_mapq, max_dp, report_all_alleles, report_count_for_all_positions, filter_duplicate, calc_qual = record if filter_duplicate: filter_duplicate_str = "--filter_duplicate" @@ -35,6 +35,10 @@ def run_scan_alignments(record): report_all_alleles_str = "--report_all_alleles" else: report_all_alleles_str = "" + if report_count_for_all_positions: + report_count_for_all_positions_str = "--report_count_for_all_positions" + else: + report_count_for_all_positions_str = "" thread_logger = logging.getLogger( "{} ({})".format(run_scan_alignments.__name__, multiprocessing.current_process().name)) try: @@ -54,9 +58,10 @@ def run_scan_alignments(record): if os.path.getsize(split_region_file_) > 0: cmd = "{} --ref {} -b {} -L {} --out_vcf_file {}/candidates.vcf --out_count_file {}/count.bed \ - --window_size {} --min_af {} --min_mapq {} --max_depth {} {} {}".format( + --window_size {} --min_af {} --min_mapq {} --max_depth {} {} {} {}".format( scan_alignments_binary, reference, input_bam, split_region_file_, - work, work, window_size, maf, min_mapq, max_dp * window_size / 100.0, report_all_alleles_str, filter_duplicate_str) + work, work, window_size, maf, min_mapq, max_dp * window_size / 100.0, report_all_alleles_str, + report_count_for_all_positions_str, filter_duplicate_str) if calc_qual: cmd += " --calculate_qual_stat" run_shell_command(cmd, stdout=os.path.join(work, "scan.out"), @@ -84,7 +89,8 @@ def run_scan_alignments(record): def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, regions_bed_file, reference, num_splits, - num_threads, window_size, maf, min_mapq, max_dp, report_all_alleles, filter_duplicate, restart=True, + num_threads, window_size, maf, min_mapq, max_dp, report_all_alleles, + report_count_for_all_positions, filter_duplicate, restart=True, split_region_files=[], calc_qual=True): logger = logging.getLogger(scan_alignments.__name__) @@ -156,7 +162,7 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, shutil.rmtree(work_) map_args.append((os.path.join(work, "work.{}".format(i)), reference, merge_d_for_scan, scan_alignments_binary, split_region_file, - input_bam, window_size, maf, min_mapq, max_dp, report_all_alleles, filter_duplicate, calc_qual)) + input_bam, window_size, maf, min_mapq, max_dp, report_all_alleles, report_count_for_all_positions, filter_duplicate, calc_qual)) not_done.append(i) else: all_outputs[i] = [os.path.join(work, "work.{}".format(i), "candidates.vcf"), @@ -216,6 +222,9 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, parser.add_argument('--report_all_alleles', help='report all alleles per position', action="store_true") + parser.add_argument('--report_count_for_all_positions', + help='report_count_for_all_positions', + action="store_true") parser.add_argument('--num_splits', type=int, help='number of region splits', default=None) parser.add_argument('--num_threads', type=int, @@ -227,7 +236,8 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, outputs = scan_alignments(args.work, args.merge_d_for_scan, args.scan_alignments_binary, args.input_bam, args.regions_bed_file, args.reference, args.num_splits, args.num_threads, args.window_size, args.maf, - args.min_mapq, args.max_dp, args.report_all_alleles, args.filter_duplicate) + args.min_mapq, args.max_dp, args.report_all_alleles, args.report_count_for_all_positions, + args.filter_duplicate) except Exception as e: logger.error(traceback.format_exc()) logger.error("Aborting!") From 0c5a35283f1597753c1542332a66eac0af741242 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Mon, 12 Apr 2021 17:19:09 -0700 Subject: [PATCH 03/10] add more options to scan_alignments --- neusomatic/cpp/scan_alignments.cpp | 47 ++++++++++--- neusomatic/include/Options.h | 66 ++++++++++++++++-- neusomatic/include/msa_utils.hpp | 98 --------------------------- neusomatic/python/generate_dataset.py | 20 +++--- neusomatic/python/preprocess.py | 9 ++- neusomatic/python/scan_alignments.py | 60 +++++++++++++--- 6 files changed, 161 insertions(+), 139 deletions(-) diff --git a/neusomatic/cpp/scan_alignments.cpp b/neusomatic/cpp/scan_alignments.cpp index e459431..bea87da 100644 --- a/neusomatic/cpp/scan_alignments.cpp +++ b/neusomatic/cpp/scan_alignments.cpp @@ -53,10 +53,12 @@ int main(int argc, char **argv) { const std::string& vcf_out = opts.vcf_out(); const std::string& count_out = opts.count_out(); int window_size = opts.window_size(); - float min_af = opts.min_allele_freq(); - float ins_min_af=min_af; - float del_min_af=min_af; - float snp_min_af=min_af; + float snp_min_af = opts.snp_min_allele_freq(); + float ins_min_af = opts.ins_min_allele_freq();; + float del_min_af = opts.del_min_allele_freq();; + int snp_min_ao = opts.snp_min_ao(); + int snp_min_bq = opts.snp_min_bq(); + int min_depth = opts.min_depth(); const bool calculate_qual_stat = opts.calculate_qual_stat(); const bool report_all_alleles = opts.report_all_alleles(); const bool report_count_for_all_positions = opts.report_count_for_all_positions(); @@ -138,6 +140,9 @@ int main(int argc, char **argv) { } ++cnt_region; if (records.empty()) continue; + // if (records.size() < min_depth) { + // continue; + // } if (records.size() > opts.max_depth()) { records.resize(opts.max_depth()); } @@ -260,9 +265,10 @@ int main(int argc, char **argv) { pileup_counts[base] = base_freq_[base]; total_count+=base_freq_[base]; } - - + if (total_count==0) {continue;} + if ((ref_base!='-') and ( total_count 0)){ auto af = alt_cnt/float(alt_cnt+ref_count); + if ((alt_cnt >= ref_count) or ((row == 4 and af > del_min_af ) or - (row != 4 and ref_base != '-' and af > snp_min_af ) or + (row != 4 and ref_base != '-' and ((af >= snp_min_af) or (alt_cnt >= snp_min_ao))) or (ref_base =='-' and af > ins_min_af))){ + if (row != 4 and ref_base != '-'){ + bool passed_bq = true; + if (calculate_qual_stat){ + auto bqual_mean_ = condensed_array.GetBQMean(i); + passed_bq = int(round(bqual_mean_[row])) >= snp_min_bq; + } + if (not passed_bq){ + continue; + } + } alt_counts.insert(std::pair(row, alt_cnt)); dp += alt_cnt; } @@ -334,8 +351,18 @@ int main(int argc, char **argv) { var_code = major; var_count = major_count; } else if (minor != ref_code and ( (minor == 4 and af > del_min_af ) or - (minor != 4 and ref_base != '-' and af > snp_min_af ) or + (minor != 4 and ref_base != '-' and ((af >= snp_min_af) or (minor_count >= snp_min_ao))) or (ref_base =='-' and af > ins_min_af))){ + if (minor != 4 and ref_base != '-'){ + bool passed_bq = true; + if (calculate_qual_stat){ + auto bqual_mean_ = condensed_array.GetBQMean(i); + passed_bq = int(round(bqual_mean_[minor])) >= snp_min_bq; + } + if (not passed_bq){ + continue; + } + } var_code = minor; var_count = minor_count; } @@ -456,7 +483,6 @@ int main(int argc, char **argv) { } var_cols.push_back(std::max(0,int(ncol)-1)); - // for (auto i=std::max(processed_col+1,j-matrix_base_pad-1); i < std::min(int(ncol),j+matrix_base_pad+1); ++i){ std::vector count_cols; if (report_count_for_all_positions){ @@ -547,6 +573,7 @@ int main(int argc, char **argv) { } auto base_freq_ = condensed_array.GetBaseFreq(i); + base_freq_.erase(base_freq_.begin() + 5); std::vector pileup_counts(base_freq_.size()); int total_count=0; @@ -554,8 +581,6 @@ int main(int argc, char **argv) { pileup_counts[base] = base_freq_[base]; total_count+=base_freq_[base]; } - - if (total_count==0) {continue;} if (calculate_qual_stat){ diff --git a/neusomatic/include/Options.h b/neusomatic/include/Options.h index 9227ed1..727c811 100644 --- a/neusomatic/include/Options.h +++ b/neusomatic/include/Options.h @@ -16,13 +16,18 @@ namespace neusomatic { {"ref", required_argument, 0, 'r'}, {"calculate_qual_stat", no_argument, 0, 'c'}, {"min_mapq", required_argument, 0, 'q'}, - {"min_af", required_argument, 0, 'a'}, + {"snp_min_bq", required_argument, 0, 'n'}, + {"snp_min_af", required_argument, 0, 'a'}, + {"ins_min_af", required_argument, 0, 'i'}, + {"del_min_af", required_argument, 0, 'l'}, + {"snp_min_ao", required_argument, 0, 'M'}, {"out_vcf_file", required_argument, 0, 'f'}, {"out_count_file", required_argument, 0, 'o'}, {"fully_contained", no_argument, 0, 'y'}, {"window_size", required_argument, 0, 'w'}, {"num_threads", required_argument, 0, 't'}, {"max_depth", required_argument, 0, 'd'}, + {"min_depth", required_argument, 0, 'h'}, {"include_secondary", no_argument, 0, 's'}, {"filter_duplicate", no_argument, 0, 'D'}, {"filter_QCfailed", no_argument, 0, 'Q'}, @@ -47,13 +52,18 @@ namespace neusomatic { std::cerr<< "-r/--ref, reference file path, required.\n"; std::cerr<< "-c/--calculate_qual_stat, calculating base quality and other stats, default False.\n"; std::cerr<< "-q/--min_mapq, minimum mapping quality, default 0.\n"; - std::cerr<< "-a/--min_af, minimum allele freq, default 0.1.\n"; + std::cerr<< "-n/--snp_min_bq, SNP minimum base quality, default 10.\n"; + std::cerr<< "-a/--snp_min_af, SNP minimum allele freq, default 0.01.\n"; + std::cerr<< "-i/--ins_min_af, INS minimum allele freq, default 0.01.\n"; + std::cerr<< "-l/--del_min_af, DEL minimum allele freq, default 0.01.\n"; + std::cerr<< "-M/--snp_min_ao, SNP minimum alternate count for low AF candidates, default 3.\n"; std::cerr<< "-f/--out_vcf_file, output vcf file path, required.\n"; std::cerr<< "-o/--out_count_file, output count file path, required.\n"; std::cerr<< "-w/--window_size, window size to scan the variants, default is 15.\n"; std::cerr<< "-y/--fully_contained, if this option is on. A read has to be fully contained in the region, default is False.\n"; // std::cerr<< "-t/--num_threads, number or thread used for building the count matrix, default is 4.\n"; std::cerr<< "-d/--max_depth, maximum depth for building the count matrix, default is 40,000.\n"; + std::cerr<< "-h/--min_depth, minimum depth for building the count matrix, default is 0.\n"; std::cerr<< "-s/--include_secondary, consider secondary alignments, default is False.\n"; std::cerr<< "-D/--filter_duplicate, filter duplicate reads if the flag is set, default is False.\n"; std::cerr<< "-Q/--filter_QCfailed, filter QC failed reads if the flag is set, default is False.\n"; @@ -134,12 +144,18 @@ namespace neusomatic { case 'q': opt.min_mapq() = parseInt(optarg, 0, "-q/--min_mapq must be at least 0", print_help); break; + case 'n': + opt.snp_min_bq() = parseInt(optarg, 0, "-n/--snp_min_bq must be at least 0", print_help); + break; case 'f': opt.vcf_out() = optarg; break; case 'd': opt.max_depth() = parseInt(optarg, 1, "-d/--max_depth must be at least 1", print_help); break; + case 'h': + opt.min_depth() = parseInt(optarg, 1, "-h/--min_depth must be at least 1", print_help); + break; case 'o': opt.count_out() = optarg; break; @@ -174,7 +190,16 @@ namespace neusomatic { opt.report_count_for_all_positions() = true; break; case 'a': - opt.min_allele_freq() = parseFloat(optarg, 0.0, 1.0, "-a/--min_af must be between 0 and 1", print_help); + opt.snp_min_allele_freq() = parseFloat(optarg, 0.0, 1.0, "-a/--snp_min_af must be between 0 and 1", print_help); + break; + case 'i': + opt.ins_min_allele_freq() = parseFloat(optarg, 0.0, 1.0, "-i/--ins_min_af must be between 0 and 1", print_help); + break; + case 'l': + opt.del_min_allele_freq() = parseFloat(optarg, 0.0, 1.0, "-l/--del_min_af must be between 0 and 1", print_help); + break; + case 'M': + opt.snp_min_ao() = parseInt(optarg, 1, "-M/--snp_min_ao must be at least 1", print_help); break; case 't': //opt.num_threads() = parseInt(optarg, 1, "-t/--num_threads must be at least 1", print_help); @@ -267,10 +292,18 @@ struct Options { return (min_mapq_); } + decltype(auto) snp_min_bq() const { + return (snp_min_bq_); + } + decltype(auto) min_mapq() { return (min_mapq_); } + decltype(auto) snp_min_bq() { + return (snp_min_bq_); + } + decltype(auto) calculate_qual_stat() { return (calculate_qual_stat_); } @@ -279,8 +312,20 @@ struct Options { return (window_size_); } - decltype(auto) min_allele_freq() { - return (min_allele_freq_); + decltype(auto) snp_min_allele_freq() { + return (snp_min_allele_freq_); + } + + decltype(auto) ins_min_allele_freq() { + return (ins_min_allele_freq_); + } + + decltype(auto) del_min_allele_freq() { + return (del_min_allele_freq_); + } + + decltype(auto) snp_min_ao() { + return (snp_min_ao_); } decltype(auto) fully_contained() { @@ -291,6 +336,10 @@ struct Options { return (max_depth_); } + decltype(auto) min_depth() { + return (min_depth_); + } + decltype(auto) include_secondary() { return (include_secondary_); } @@ -365,11 +414,16 @@ struct Options { std::string ref_; bool calculate_qual_stat_ = false; bool fully_contained_ = false; - float min_allele_freq_ = 0.01; + float snp_min_allele_freq_ = 0.01; + float ins_min_allele_freq_ = 0.01; + float del_min_allele_freq_ = 0.01; + int snp_min_ao_ = 3; int min_mapq_ = 0; + int snp_min_bq_ = 0; int window_size_ = 500; int num_threads_ = 1; int max_depth_ = 5000000; + int min_depth_ = 0; bool include_secondary_ = false; bool filter_duplicate_ = false; bool filter_QCfailed_ = false; diff --git a/neusomatic/include/msa_utils.hpp b/neusomatic/include/msa_utils.hpp index da68f4a..d96960a 100644 --- a/neusomatic/include/msa_utils.hpp +++ b/neusomatic/include/msa_utils.hpp @@ -201,14 +201,6 @@ class CondensedArray{ ++cspace_[pp].base_freq_[DnaCharToDnaCode(seq[pp])]; } - - // for (int pp = s; pp <= e; ++pp) { - // std::map alt_counts; - // int dp = 0; - // auto ref_base = ref[pp]; - // Compute_ALTs(alt_counts, cspace_[pp].base_freq_, dp, ref_base, report_all_alleles, del_min_af, snp_min_af, ins_min_af); - - // auto end_3p0p1 = std::chrono::system_clock::now(); // std::chrono::duration elapsed_seconds_3p0p1 = end_3p0p1-start_3p0p1; // std::time_t end_time_3p0p1 = std::chrono::system_clock::to_time_t(end_3p0p1); @@ -639,96 +631,6 @@ std::string add_tag_col(auto & data_array, bool is_int, int idx, const std::str } -// void Compute_ALTs(auto & alt_counts, auto & base_freq_, auto & dp, auto ref_base, bool report_all_alleles){ -// ref_base = std::toupper(ref_base); -// auto ref_code = DnaCharToDnaCode(ref_base); - -// if (ref_base == 'N') { -// ref_base = '-'; -// } - -// base_freq_.erase(base_freq_.begin() + 5); - -// std::vector pileup_counts(base_freq_.size()); -// int total_count=0; -// for (int base = 0; base < (int) base_freq_.size(); ++base) { -// pileup_counts[base] = base_freq_[base]; -// total_count+=base_freq_[base]; -// } - -// if (total_count==0) { -// return; -// } -// auto ref_count = base_freq_[ref_code]; -// auto var_code = ref_code; -// int var_count = 0; -// dp = ref_count; -// if (report_all_alleles and ref_base != '-'){ -// for (int row = 0; row < base_freq_.size(); ++row) { -// auto alt_cnt = base_freq_[row]; -// if (( row != ref_code) and (alt_cnt > 0)){ -// auto af = alt_cnt/float(alt_cnt+ref_count); -// if ((alt_cnt >= ref_count) or ((row == 4 and af > del_min_af ) or -// (row != 4 and ref_base != '-' and af > snp_min_af ) or -// (ref_base =='-' and af > ins_min_af))){ -// alt_counts.insert(std::pair(row, alt_cnt)); -// dp += alt_cnt; -// } -// } -// } -// }else{ -// int major = -1; -// int major_count = 0; -// int minor = -1; -// int minor_count = 0; -// int minor2 = -1; -// int minor2_count = 0; - -// for (int row = 0; row < base_freq_.size(); ++row) { -// if (base_freq_[row] > major_count) { -// minor2 = minor; -// minor2_count = minor_count; -// minor_count = major_count; -// minor = major; -// major_count = base_freq_[row]; -// major = row; -// } else if (base_freq_[row] > minor_count) { -// minor2 = minor; -// minor2_count = minor_count; -// minor_count = base_freq_[row]; -// minor = row; -// } else if (base_freq_[row] > minor2_count) { -// minor2_count = base_freq_[row]; -// minor2 = row; -// } -// } - -// if (minor != -1 and major != -1){ -// if (minor2 != -1 and ref_code == major and minor == 4 and ref_code != 4 ){ -// if (minor2_count>0.5*minor_count){ -// minor = minor2; -// minor_count = minor2_count; -// } -// } -// } -// auto af = minor_count/float(major_count+minor_count); -// if (major != ref_code){ -// var_code = major; -// var_count = major_count; -// } else if (minor != ref_code and ( (minor == 4 and af > del_min_af ) or -// (minor != 4 and ref_base != '-' and af > snp_min_af ) or -// (ref_base =='-' and af > ins_min_af))){ -// var_code = minor; -// var_count = minor_count; -// } -// if (var_count > 0) { -// alt_counts.insert(std::pair(var_code,var_count)); -// dp += var_count; -// } -// } -// } - - }// end neusomatic #endif diff --git a/neusomatic/python/generate_dataset.py b/neusomatic/python/generate_dataset.py index 9aca780..2bf8b3b 100755 --- a/neusomatic/python/generate_dataset.py +++ b/neusomatic/python/generate_dataset.py @@ -1988,9 +1988,10 @@ def parallel_generation(inputs): del tumor_tabix_records, normal_tabix_records # thread_logger.info(["ffff-3",time.time()-t2]) - thread_logger.info("Gener-8") - for i_ in range(len(map_args)): - w = map_args[i_] + thread_logger.info(["Gener-8",len(map_args)]) + + records_done=[] + for w in map_args: record = w[3] chrom = record[0] pos = int(record[1]) @@ -1998,11 +1999,8 @@ def parallel_generation(inputs): e_pos = min(pos + matrix_base_pad, chrom_lengths[chrom] - 2) tumor_counts ={x_pos:tumor_tabix_records_dict[chrom][x_pos] for x_pos in range(s_pos,e_pos+1) if x_pos in tumor_tabix_records_dict[chrom]} normal_counts ={x_pos:normal_tabix_records_dict[chrom][x_pos] for x_pos in range(s_pos,e_pos+1) if x_pos in normal_tabix_records_dict[chrom]} - map_args[i_][1] = tumor_counts - map_args[i_][2] = normal_counts - - records_done=[] - for w in map_args: + w[1] = tumor_counts + w[2] = normal_counts o=prep_data_single_tabix(w) if o is None: aaaa @@ -2142,7 +2140,6 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be logger.info("Write {}/{} split to {} for cnts ({}..{})/{}".format( is_ + 1, candidates_split, candidates_tsv_file, is_current, is_end, total_ims)) - pool = multiprocessing.Pool(num_threads) map_args_records = [] for records_r, none_records, vtype, record_len, record_center, chroms_order, anns in records_data: if len(records_r) + len(none_records) + cnt < is_current: @@ -2182,9 +2179,10 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be logger.info("Gener-9") pool = multiprocessing.Pool(num_threads) try: + split_len=max(1,len_records//num_threads) records_done_ = pool.map_async( - parallel_generation, [[map_args_records[i_split:i_split+(len_records//num_threads)],matrix_base_pad, chrom_lengths, tumor_count_bed, normal_count_bed] - for i_split in range(0, len_records, len_records//num_threads)]).get() + parallel_generation, [[map_args_records[i_split:i_split+(split_len)],matrix_base_pad, chrom_lengths, tumor_count_bed, normal_count_bed] + for i_split in range(0, len_records, split_len)]).get() pool.close() except Exception as inst: logger.error(inst) diff --git a/neusomatic/python/preprocess.py b/neusomatic/python/preprocess.py index 2ae1bf5..4f9f2f8 100755 --- a/neusomatic/python/preprocess.py +++ b/neusomatic/python/preprocess.py @@ -40,8 +40,11 @@ def process_split_region(tn, work, region, reference, mode, alignment_bam, logger = logging.getLogger(process_split_region.__name__) logger.info("Scan bam.") scan_outputs = scan_alignments(work, merge_d_for_scan, scan_alignments_binary, alignment_bam, - region, reference, num_splits, num_threads, scan_window_size, scan_maf, - min_mapq, max_dp, report_all_alleles, report_count_for_all_positions, + region, reference, num_splits, num_threads, scan_window_size, + snp_min_ao, + snp_min_af, scan_maf, scan_maf, + min_mapq, snp_min_bq, max_dp, min_dp, + report_all_alleles, report_count_for_all_positions, filter_duplicate, restart=restart, split_region_files=regions, calc_qual=calc_qual) if filtered_candidates_vcf: @@ -355,7 +358,7 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, logger.info("Scan normal bam (and extracting quality scores).") normal_counts, _, _ = process_split_region("normal", work_normal, region_bed, reference, mode, normal_bam, scan_window_size, 0.2, min_mapq, - None, min_dp, max_dp, + None, 1, max_dp, filter_duplicate, good_ao, min_ao, snp_min_af, snp_min_bq, snp_min_ao, ins_min_af, del_min_af, del_merge_min_af, diff --git a/neusomatic/python/scan_alignments.py b/neusomatic/python/scan_alignments.py index 60c31d1..3e7b56a 100755 --- a/neusomatic/python/scan_alignments.py +++ b/neusomatic/python/scan_alignments.py @@ -25,7 +25,11 @@ def run_scan_alignments(record): work, reference, merge_d_for_scan, scan_alignments_binary, split_region_file, \ - input_bam, window_size, maf, min_mapq, max_dp, report_all_alleles, report_count_for_all_positions, filter_duplicate, calc_qual = record + input_bam, window_size, \ + snp_min_ao, \ + snp_min_af, ins_min_af, del_min_af, \ + min_mapq, snp_min_bq, max_dp, min_dp, \ + report_all_alleles, report_count_for_all_positions, filter_duplicate, calc_qual = record if filter_duplicate: filter_duplicate_str = "--filter_duplicate" @@ -58,10 +62,17 @@ def run_scan_alignments(record): if os.path.getsize(split_region_file_) > 0: cmd = "{} --ref {} -b {} -L {} --out_vcf_file {}/candidates.vcf --out_count_file {}/count.bed \ - --window_size {} --min_af {} --min_mapq {} --max_depth {} {} {} {}".format( + --window_size {} \ + --snp_min_ao {} \ + --snp_min_af {} --ins_min_af {} --del_min_af {} \ + --min_mapq {} --snp_min_bq {} --max_depth {} --min_depth {} \ + {} {} {}".format( scan_alignments_binary, reference, input_bam, split_region_file_, - work, work, window_size, maf, min_mapq, max_dp * window_size / 100.0, report_all_alleles_str, - report_count_for_all_positions_str, filter_duplicate_str) + work, work, window_size, + snp_min_ao, + snp_min_af, ins_min_af, del_min_af, + min_mapq, snp_min_bq, max_dp * window_size / 100.0, min_dp, + report_all_alleles_str, report_count_for_all_positions_str, filter_duplicate_str) if calc_qual: cmd += " --calculate_qual_stat" run_shell_command(cmd, stdout=os.path.join(work, "scan.out"), @@ -86,10 +97,23 @@ def run_scan_alignments(record): "Please check error log at {}".format(stderr_file)) return None + outputs = scan_alignments(args.work, args.merge_d_for_scan, args.scan_alignments_binary, args.input_bam, + args.regions_bed_file, args.reference, args.num_splits, + args.num_threads, args.window_size, + args.snp_min_ao, + args.snp_min_af, args.ins_min_af, args.del_min_af, + args.min_mapq, args.snp_min_bq, args.max_dp, args.min_dp, + args.report_all_alleles, args.report_count_for_all_positions, + args.filter_duplicate) + def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, regions_bed_file, reference, num_splits, - num_threads, window_size, maf, min_mapq, max_dp, report_all_alleles, + num_threads, window_size, + snp_min_ao, + snp_min_af, ins_min_af, del_min_af, + min_mapq, snp_min_bq, max_dp, min_dp, + report_all_alleles, report_count_for_all_positions, filter_duplicate, restart=True, split_region_files=[], calc_qual=True): @@ -162,7 +186,11 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, shutil.rmtree(work_) map_args.append((os.path.join(work, "work.{}".format(i)), reference, merge_d_for_scan, scan_alignments_binary, split_region_file, - input_bam, window_size, maf, min_mapq, max_dp, report_all_alleles, report_count_for_all_positions, filter_duplicate, calc_qual)) + input_bam, window_size, + snp_min_ao, + snp_min_af, ins_min_af, del_min_af, + min_mapq, snp_min_bq, max_dp, min_dp, + report_all_alleles, report_count_for_all_positions, filter_duplicate, calc_qual)) not_done.append(i) else: all_outputs[i] = [os.path.join(work, "work.{}".format(i), "candidates.vcf"), @@ -207,12 +235,21 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, help='binary for scanning alignment bam', default="../bin/scan_alignments") parser.add_argument('--window_size', type=int, help='window size to scan the variants', default=2000) - parser.add_argument('--maf', type=float, - help='minimum allele freq', default=0.01) + parser.add_argument('--snp_min_ao', type=float, + help='SNP min alternate count for low AF candidates', default=3) + parser.add_argument('--snp_min_af', type=float, + help='SNP min allele freq', default=0.05) + parser.add_argument('--ins_min_af', type=float, + help='INS min allele freq', default=0.01) + parser.add_argument('--del_min_af', type=float, + help='DEL min allele freq', default=0.01) parser.add_argument('--min_mapq', type=int, help='minimum mapping quality', default=1) + parser.add_argument('--snp_min_bq', type=float, + help='SNP min base quality', default=10) parser.add_argument('--max_dp', type=float, help='max depth', default=100000) + parser.add_argument('--min_dp', type=float, help='min depth', default=1) parser.add_argument('--filter_duplicate', help='filter duplicate reads when preparing pileup information', action="store_true") @@ -235,8 +272,11 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, try: outputs = scan_alignments(args.work, args.merge_d_for_scan, args.scan_alignments_binary, args.input_bam, args.regions_bed_file, args.reference, args.num_splits, - args.num_threads, args.window_size, args.maf, - args.min_mapq, args.max_dp, args.report_all_alleles, args.report_count_for_all_positions, + args.num_threads, args.window_size, + args.snp_min_ao, + args.snp_min_af, args.ins_min_af, args.del_min_af, + args.min_mapq, args.snp_min_bq, args.max_dp, args.min_dp, + args.report_all_alleles, args.report_count_for_all_positions, args.filter_duplicate) except Exception as e: logger.error(traceback.format_exc()) From 9b41956eb2e7974823a7e3d26444f0959f0e6b20 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Fri, 16 Apr 2021 20:55:03 -0700 Subject: [PATCH 04/10] small fix --- neusomatic/python/generate_dataset.py | 67 +++++++++++++++------------ neusomatic/python/preprocess.py | 57 +++++++++++++++++++++-- 2 files changed, 90 insertions(+), 34 deletions(-) diff --git a/neusomatic/python/generate_dataset.py b/neusomatic/python/generate_dataset.py index 2bf8b3b..4a9e93d 100755 --- a/neusomatic/python/generate_dataset.py +++ b/neusomatic/python/generate_dataset.py @@ -1950,7 +1950,7 @@ def parallel_generation(inputs): chrom_pos[chrom] = [min(s_pos,chrom_pos[chrom][0]), max(e_pos,chrom_pos[chrom][1])] thread_logger.info(chrom_pos) - thread_logger.info("Gener-7") + # thread_logger.info("Gener-7") tb_tumor = pysam.TabixFile(tumor_count_bed, parser=pysam.asTuple()) tb_normal = pysam.TabixFile(normal_count_bed, parser=pysam.asTuple()) @@ -2026,7 +2026,7 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be logger.info(tumor_count_bed) t1=time.time() - logger.info("Gener-0") + # logger.info("Gener-0") if not os.path.exists(work): os.mkdir(work) @@ -2093,21 +2093,25 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be # logger.info(["rrr-1",time.time()-t1]) t1 = time.time() - logger.info("Gener-1") - pool = multiprocessing.Pool(num_threads) + # logger.info("Gener-1") map_args = [] for i, split_region_file in enumerate(split_region_files): map_args.append((work, split_region_file, truth_vcf_file, tumor_pred_vcf_file, ref_file, ensemble_bed, num_ens_features, strict_labeling, i)) - try: - records_data = pool.map_async(find_records, map_args).get() - pool.close() - except Exception as inst: - logger.error(inst) - pool.close() - traceback.print_exc() - raise Exception - + if num_threads == 1: + records_data = [] + for w in map_args: + records_data.append(find_records(w)) + else: + pool = multiprocessing.Pool(num_threads) + try: + records_data = pool.map_async(find_records, map_args).get() + pool.close() + except Exception as inst: + logger.error(inst) + pool.close() + traceback.print_exc() + raise Exception for o in records_data: if o is None: raise Exception("find_records failed!") @@ -2122,14 +2126,14 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be for records_r, none_records, vtype, record_len, record_center, chroms_order, anns in records_data: total_ims += len(records_r) + len(none_records) - logger.info("Gener-2") + # logger.info("Gener-2") candidates_split = int(total_ims // tsv_batch_size) + 1 is_split = total_ims // candidates_split with open(var_vcf, "w") as vv, open(none_vcf, "w") as nv: is_current = 0 is_ = -1 while is_ < candidates_split: - logger.info("Gener-3") + # logger.info("Gener-3") is_ += 1 cnt = -1 if is_ < candidates_split - 1: @@ -2171,24 +2175,27 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be if cnt >= is_end: break - logger.info("Gener-4") + # logger.info("Gener-4") len_records = len(map_args_records) records_done = [] if len_records>0: - logger.info("Gener-9") - pool = multiprocessing.Pool(num_threads) - try: - split_len=max(1,len_records//num_threads) - records_done_ = pool.map_async( - parallel_generation, [[map_args_records[i_split:i_split+(split_len)],matrix_base_pad, chrom_lengths, tumor_count_bed, normal_count_bed] - for i_split in range(0, len_records, split_len)]).get() - pool.close() - except Exception as inst: - logger.error(inst) - pool.close() - traceback.print_exc() - raise Exception + # logger.info("Gener-9") + if num_threads == 1: + records_done_ = [parallel_generation([map_args_records, matrix_base_pad, chrom_lengths, tumor_count_bed, normal_count_bed])] + else: + pool = multiprocessing.Pool(num_threads) + try: + split_len=max(1,len_records//num_threads) + records_done_ = pool.map_async( + parallel_generation, [[map_args_records[i_split:i_split+(split_len)],matrix_base_pad, chrom_lengths, tumor_count_bed, normal_count_bed] + for i_split in range(0, len_records, split_len)]).get() + pool.close() + except Exception as inst: + logger.error(inst) + pool.close() + traceback.print_exc() + raise Exception for o in records_done_: if o is None: @@ -2196,7 +2203,7 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be records_done.extend(o) shuffle(records_done) - logger.info("Gener-10") + # logger.info("Gener-10") cnt_ims = 0 tsv_idx = [] with open(candidates_tsv_file, "w") as b_o: diff --git a/neusomatic/python/preprocess.py b/neusomatic/python/preprocess.py index 4f9f2f8..bc5aa9a 100755 --- a/neusomatic/python/preprocess.py +++ b/neusomatic/python/preprocess.py @@ -105,6 +105,7 @@ def generate_dataset_region(work, truth_vcf, mode, filtered_candidates_vcf, regi matrix_dtype, strict_labeling, tsv_batch_size) + return True def get_ensemble_region(record): @@ -206,6 +207,38 @@ def extract_candidate_split_regions( return candidates_split_regions +def generate_dataset_region_parallel(record): + work_dataset_split, truth_vcf, mode, filtered_vcf, \ + candidates_split_region, tumor_count, normal_count, reference, \ + matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, \ + ensemble_bed_i, \ + ensemble_custom_header, \ + no_seq_complexity, no_feature_recomp_for_ensemble, \ + zero_vscore, \ + matrix_dtype, \ + strict_labeling, \ + tsv_batch_size = record + thread_logger = logging.getLogger( + "{} ({})".format(generate_dataset_region_parallel.__name__, multiprocessing.current_process().name)) + try: + ret = generate_dataset_region(work_dataset_split, truth_vcf, mode, filtered_vcf, + candidates_split_region, tumor_count, normal_count, reference, + matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, 1, + ensemble_bed_i, + ensemble_custom_header, + no_seq_complexity, no_feature_recomp_for_ensemble, + zero_vscore, + matrix_dtype, + strict_labeling, + tsv_batch_size) + return ret + + except Exception as ex: + thread_logger.error(traceback.format_exc()) + thread_logger.error(ex) + return None + + def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, scan_window_size, scan_maf, min_mapq, min_dp, max_dp, good_ao, min_ao, snp_min_af, snp_min_bq, snp_min_ao, @@ -374,6 +407,7 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, if restart or not os.path.exists(work_dataset): os.mkdir(work_dataset) logger.info("Generate dataset.") + map_args_gen = [] for i, (tumor_count, normal_count, filtered_vcf, candidates_split_region) in enumerate(zip(tumor_counts, normal_counts, filtered_candidates_vcfs, candidates_split_regions)): logger.info("Dataset for region {}".format(candidates_split_region)) work_dataset_split = os.path.join(work_dataset, "work.{}".format(i)) @@ -602,17 +636,32 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, ensemble_bed_i = merged_features_bed else: ensemble_bed_i = extra_features_bed - - generate_dataset_region(work_dataset_split, truth_vcf, mode, filtered_vcf, + map_args_gen.append([work_dataset_split, truth_vcf, mode, filtered_vcf, candidates_split_region, tumor_count, normal_count, reference, - matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, num_threads, + matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, ensemble_bed_i, ensemble_custom_header, no_seq_complexity, no_feature_recomp_for_ensemble, zero_vscore, matrix_dtype, strict_labeling, - tsv_batch_size) + tsv_batch_size]) + + pool = multiprocessing.Pool(num_threads) + try: + done_gen = pool.map_async( + generate_dataset_region_parallel, map_args_gen).get() + pool.close() + except Exception as inst: + logger.error(inst) + pool.close() + traceback.print_exc() + raise Exception + + for o in done_gen: + if o is None: + raise Exception("Generate dataset failed!") + shutil.rmtree(bed_tempdir) tempfile.tempdir = original_tempdir From 4a99dc4c6b472185ea75f3eefc0bad209b210dd5 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Tue, 29 Jun 2021 17:31:14 -0700 Subject: [PATCH 05/10] small fix --- neusomatic/python/generate_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neusomatic/python/generate_dataset.py b/neusomatic/python/generate_dataset.py index 4a9e93d..76949f8 100755 --- a/neusomatic/python/generate_dataset.py +++ b/neusomatic/python/generate_dataset.py @@ -1988,7 +1988,7 @@ def parallel_generation(inputs): del tumor_tabix_records, normal_tabix_records # thread_logger.info(["ffff-3",time.time()-t2]) - thread_logger.info(["Gener-8",len(map_args)]) + # thread_logger.info(["Gener-8",len(map_args)]) records_done=[] for w in map_args: From 239c99c6ac405a7d67638026e0252867ded4a410 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Tue, 13 Jul 2021 17:32:00 -0700 Subject: [PATCH 06/10] fix for is_none --- neusomatic/python/generate_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neusomatic/python/generate_dataset.py b/neusomatic/python/generate_dataset.py index 76949f8..b51561c 100755 --- a/neusomatic/python/generate_dataset.py +++ b/neusomatic/python/generate_dataset.py @@ -2149,7 +2149,7 @@ def generate_dataset(work, truth_vcf_file, mode, tumor_pred_vcf_file, region_be if len(records_r) + len(none_records) + cnt < is_current: cnt += len(records_r)+len(none_records) else: - for is_none, records in [["False", records_r],["True",none_records]]: + for is_none, records in [[False, records_r],[True,none_records]]: for record in records: cnt += 1 if is_current <= cnt < is_end: From 344a0e5284ddc95c29417e5f3a0e6f18f79df911 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Mon, 19 Jul 2021 18:47:20 -0700 Subject: [PATCH 07/10] fix for missed ensemble positions --- neusomatic/python/preprocess.py | 352 ++++++++++++++++++++------- neusomatic/python/scan_alignments.py | 122 +++++----- 2 files changed, 330 insertions(+), 144 deletions(-) diff --git a/neusomatic/python/preprocess.py b/neusomatic/python/preprocess.py index bc5aa9a..1aa4b5a 100755 --- a/neusomatic/python/preprocess.py +++ b/neusomatic/python/preprocess.py @@ -19,9 +19,9 @@ from filter_candidates import filter_candidates from generate_dataset import generate_dataset, extract_ensemble -from scan_alignments import scan_alignments +from scan_alignments import scan_alignments, split_my_region from extend_features import extend_features -from utils import concatenate_vcfs, run_bedtools_cmd, bedtools_sort, bedtools_merge, bedtools_intersect, bedtools_slop, get_tmp_file, skip_empty, vcf_2_bed +from utils import concatenate_vcfs, run_bedtools_cmd, bedtools_sort, bedtools_merge, bedtools_intersect, bedtools_slop, bedtools_window, get_tmp_file, skip_empty, vcf_2_bed from defaults import MAT_DTYPES @@ -40,11 +40,11 @@ def process_split_region(tn, work, region, reference, mode, alignment_bam, logger = logging.getLogger(process_split_region.__name__) logger.info("Scan bam.") scan_outputs = scan_alignments(work, merge_d_for_scan, scan_alignments_binary, alignment_bam, - region, reference, num_splits, num_threads, scan_window_size, + region, reference, num_splits, num_threads, scan_window_size, snp_min_ao, snp_min_af, scan_maf, scan_maf, min_mapq, snp_min_bq, max_dp, min_dp, - report_all_alleles, report_count_for_all_positions, + report_all_alleles, report_count_for_all_positions, filter_duplicate, restart=restart, split_region_files=regions, calc_qual=calc_qual) if filtered_candidates_vcf: @@ -157,15 +157,17 @@ def get_ensemble_beds(work, reference, ensemble_bed, split_regions, matrix_base_ def extract_candidate_split_regions( - work, filtered_candidates_vcfs, split_regions, ensemble_beds, + filtered_candidates_vcfs, split_regions, ensemble_beds, + tags, reference, matrix_base_pad, merge_d_for_short_read): logger = logging.getLogger(extract_candidate_split_regions.__name__) candidates_split_regions = [] - for i, (filtered_vcf, split_region_) in enumerate(zip(filtered_candidates_vcfs, - split_regions)): + for i, (filtered_vcf, split_region_, tags_i) in enumerate(zip(filtered_candidates_vcfs, + split_regions, tags)): + work = os.path.dirname(filtered_vcf) candidates_region_file = os.path.join( - work, "candidates_region_{}.bed".format(i)) + work, "candidates_region_{}.bed".format(tags_i)) is_empty = True with open(filtered_vcf) as f_: @@ -209,28 +211,28 @@ def extract_candidate_split_regions( def generate_dataset_region_parallel(record): work_dataset_split, truth_vcf, mode, filtered_vcf, \ - candidates_split_region, tumor_count, normal_count, reference, \ - matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, \ - ensemble_bed_i, \ - ensemble_custom_header, \ - no_seq_complexity, no_feature_recomp_for_ensemble, \ - zero_vscore, \ - matrix_dtype, \ - strict_labeling, \ - tsv_batch_size = record + candidates_split_region, tumor_count, normal_count, reference, \ + matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, \ + ensemble_bed_i, \ + ensemble_custom_header, \ + no_seq_complexity, no_feature_recomp_for_ensemble, \ + zero_vscore, \ + matrix_dtype, \ + strict_labeling, \ + tsv_batch_size = record thread_logger = logging.getLogger( "{} ({})".format(generate_dataset_region_parallel.__name__, multiprocessing.current_process().name)) try: ret = generate_dataset_region(work_dataset_split, truth_vcf, mode, filtered_vcf, - candidates_split_region, tumor_count, normal_count, reference, - matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, 1, - ensemble_bed_i, - ensemble_custom_header, - no_seq_complexity, no_feature_recomp_for_ensemble, - zero_vscore, - matrix_dtype, - strict_labeling, - tsv_batch_size) + candidates_split_region, tumor_count, normal_count, reference, + matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, 1, + ensemble_bed_i, + ensemble_custom_header, + no_seq_complexity, no_feature_recomp_for_ensemble, + zero_vscore, + matrix_dtype, + strict_labeling, + tsv_batch_size) return ret except Exception as ex: @@ -239,6 +241,58 @@ def generate_dataset_region_parallel(record): return None +def process_missed_ensemble_positions(ensemble_bed_i, filtered_vcf, matrix_base_pad, reference): + logger = logging.getLogger(process_missed_ensemble_positions.__name__) + logger.info([ensemble_bed_i, filtered_vcf]) + tmp_e = get_tmp_file() + header = [] + with open(ensemble_bed_i) as i_f: + with open(tmp_e, "w") as o_f: + for line in i_f: + if line.startswith("#"): + header.append(line) + x = line.strip().split() + chrom, st, en = x[0:3] + o_f.write("\t".join([chrom, st, en]) + "\t" + line) + tmp_e = bedtools_slop( + tmp_e, reference + ".fai", args=" -b {}".format(matrix_base_pad), run_logger=logger) + tmp_f = get_tmp_file() + vcf_2_bed(filtered_vcf, tmp_f, + len_ref=True, keep_ref_alt=False) + tmp_f = bedtools_sort(tmp_f, run_logger=logger) + tmp_f = bedtools_slop( + tmp_f, reference + ".fai", args=" -b {}".format(matrix_base_pad), + run_logger=logger) + tmp_f = bedtools_merge(tmp_f, run_logger=logger) + done_e = bedtools_intersect(tmp_e, tmp_f, args="-f 1 -u") + missed_e = bedtools_intersect(tmp_e, tmp_f, args="-f 1 -v") + with open(ensemble_bed_i, "w") as o_f: + with open(done_e) as i_f: + for line in header: + o_f.write(line) + for line in skip_empty(i_f): + x = line.strip().split() + chrom, st, en = x[0:3] + o_f.write("\t".join(x[3:]) + "\n") + missed_ensemble_bed_i = ensemble_bed_i + ".missed.bed" + with open(missed_ensemble_bed_i, "w") as o_f: + with open(missed_e) as i_f: + for line in header: + o_f.write(line) + for line in skip_empty(i_f): + x = line.strip().split() + chrom, st, en = x[0:3] + o_f.write("\t".join(x[3:]) + "\n") + tmp_m = bedtools_sort(missed_ensemble_bed_i, run_logger=logger) + tmp_m = bedtools_slop( + tmp_m, reference + ".fai", args=" -b {}".format(matrix_base_pad + 1), + run_logger=logger) + missed_ensemble_beds_region_i = ensemble_bed_i + ".missed.region.bed" + bedtools_merge(tmp_m, run_logger=logger, + output_fn=missed_ensemble_beds_region_i) + return missed_ensemble_beds_region_i, missed_ensemble_bed_i + + def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, scan_window_size, scan_maf, min_mapq, min_dp, max_dp, good_ao, min_ao, snp_min_af, snp_min_bq, snp_min_ao, @@ -319,9 +373,18 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, zero_vscore=zero_vscore, is_extend=False) - merge_d_for_short_read = 100 - candidates_split_regions = [] + split_region_files = split_my_region(work, region_bed, num_threads, num_splits, reference, scan_window_size, + restart) + ensemble_beds = [] + if ensemble_tsv and not ensemble_beds: + ensemble_beds = get_ensemble_beds( + work, reference, ensemble_bed, split_region_files, matrix_base_pad, num_threads) + + tags = ["{}".format(i) for i in range(len(split_region_files))] + + merge_d_for_short_read = 100 + candidates_split_regions = split_region_files if not long_read and first_do_without_qual: logger.info("Scan tumor bam (first without quality scores).") work_tumor_without_q = os.path.join(work, "work_tumor_without_q") @@ -330,26 +393,43 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, filtered_candidates_vcf_without_q = os.path.join( work_tumor_without_q, "filtered_candidates.vcf") - tumor_outputs_without_q = process_split_region("tumor", work_tumor_without_q, region_bed, reference, mode, - tumor_bam, scan_window_size, scan_maf, min_mapq, - filtered_candidates_vcf_without_q, min_dp, max_dp, - filter_duplicate, - good_ao, min_ao, - snp_min_af, -10000, snp_min_ao, - ins_min_af, del_min_af, del_merge_min_af, - ins_merge_min_af, merge_r, - merge_d_for_scan, - report_all_alleles, - False, - scan_alignments_binary, restart, num_splits, num_threads, - calc_qual=False) - tumor_counts_without_q, split_regions, filtered_candidates_vcfs_without_q = tumor_outputs_without_q - - if ensemble_tsv: - ensemble_beds = get_ensemble_beds( - work, reference, ensemble_bed, split_regions, matrix_base_pad, num_threads) + tumor_outputs_without_q = process_split_region(tn="tumor", + work=work_tumor_without_q, + region=region_bed, + reference=reference, + mode=mode, + alignment_bam=tumor_bam, + scan_window_size=scan_window_size, + scan_maf=scan_maf, + min_mapq=min_mapq, + filtered_candidates_vcf=filtered_candidates_vcf_without_q, + min_dp=min_dp, + max_dp=max_dp, + filter_duplicate=filter_duplicate, + good_ao=good_ao, + min_ao=min_ao, + snp_min_af=snp_min_af, + snp_min_bq=-10000, + snp_min_ao=snp_min_ao, + ins_min_af=ins_min_af, + del_min_af=del_min_af, + del_merge_min_af=del_merge_min_af, + ins_merge_min_af=ins_merge_min_af, + merge_r=merge_r, + merge_d_for_scan=merge_d_for_scan, + report_all_alleles=report_all_alleles, + report_count_for_all_positions=False, + scan_alignments_binary=scan_alignments_binary, + restart=restart, + num_splits=num_splits, + num_threads=num_threads, + calc_qual=False, + regions=split_region_files) + tumor_counts_without_q, _, filtered_candidates_vcfs_without_q = tumor_outputs_without_q + candidates_split_regions = extract_candidate_split_regions( - work_tumor_without_q, filtered_candidates_vcfs_without_q, split_regions, ensemble_beds, + filtered_candidates_vcfs_without_q, split_region_files, ensemble_beds, + tags, reference, matrix_base_pad, merge_d_for_short_read) work_tumor = os.path.join(work, "work_tumor") if restart or not os.path.exists(work_tumor): @@ -358,59 +438,154 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, work_tumor, "filtered_candidates.vcf") logger.info("Scan tumor bam (and extracting quality scores).") - tumor_outputs = process_split_region("tumor", work_tumor, region_bed, reference, mode, - tumor_bam, scan_window_size, scan_maf, min_mapq, - filtered_candidates_vcf, min_dp, max_dp, - filter_duplicate, - good_ao, min_ao, - snp_min_af, snp_min_bq, snp_min_ao, - ins_min_af, del_min_af, del_merge_min_af, - ins_merge_min_af, merge_r, - merge_d_for_scan, - report_all_alleles, - False, - scan_alignments_binary, restart, num_splits, num_threads, + + tumor_outputs = process_split_region(tn="tumor", + work=work_tumor, + region=region_bed, + reference=reference, + mode=mode, + alignment_bam=tumor_bam, + scan_window_size=scan_window_size, + scan_maf=scan_maf, + min_mapq=min_mapq, + filtered_candidates_vcf=filtered_candidates_vcf, + min_dp=min_dp, + max_dp=max_dp, + filter_duplicate=filter_duplicate, + good_ao=good_ao, + min_ao=min_ao, + snp_min_af=snp_min_af, + snp_min_bq=snp_min_bq, + snp_min_ao=snp_min_ao, + ins_min_af=ins_min_af, + del_min_af=del_min_af, + del_merge_min_af=del_merge_min_af, + ins_merge_min_af=ins_merge_min_af, + merge_r=merge_r, + merge_d_for_scan=merge_d_for_scan, + report_all_alleles=report_all_alleles, + report_count_for_all_positions=False, + scan_alignments_binary=scan_alignments_binary, + restart=restart, + num_splits=num_splits, + num_threads=num_threads, calc_qual=True, regions=candidates_split_regions) - tumor_counts, split_regions, filtered_candidates_vcfs = tumor_outputs - - if ensemble_tsv and not ensemble_beds: - ensemble_beds = get_ensemble_beds( - work, reference, ensemble_bed, split_regions, matrix_base_pad, num_threads) + tumor_counts, _, filtered_candidates_vcfs = tumor_outputs + + work_tumor_missed = os.path.join(work, "work_tumor_missed") + if restart or not os.path.exists(work_tumor_missed): + os.mkdir(work_tumor_missed) + filtered_candidates_vcf_missed = os.path.join( + work_tumor_missed, "filtered_candidates.vcf") + + if ensemble_beds: + missed_ensemble_beds_region = [] + missed_ensemble_beds = [] + for i, (filtered_vcf, ensemble_bed_i) in enumerate(zip(filtered_candidates_vcfs, ensemble_beds)): + missed_ensemble_beds_region_i, missed_ensemble_beds_i = process_missed_ensemble_positions( + ensemble_bed_i, filtered_vcf, matrix_base_pad, reference) + missed_ensemble_beds_region.append(missed_ensemble_beds_region_i) + missed_ensemble_beds.append(missed_ensemble_beds_i) + + tumor_outputs_missed = process_split_region(tn="tumor", + work=work_tumor_missed, + region=region_bed, + reference=reference, + mode=mode, + alignment_bam=tumor_bam, + scan_window_size=scan_window_size, + scan_maf=scan_maf, + min_mapq=min_mapq, + filtered_candidates_vcf=filtered_candidates_vcf_missed, + min_dp=min_dp, + max_dp=max_dp, + filter_duplicate=filter_duplicate, + good_ao=good_ao, + min_ao=min_ao, + snp_min_af=snp_min_af, + snp_min_bq=snp_min_bq, + snp_min_ao=snp_min_ao, + ins_min_af=ins_min_af, + del_min_af=del_min_af, + del_merge_min_af=del_merge_min_af, + ins_merge_min_af=ins_merge_min_af, + merge_r=merge_r, + merge_d_for_scan=merge_d_for_scan, + report_all_alleles=report_all_alleles, + report_count_for_all_positions=True, + scan_alignments_binary=scan_alignments_binary, + restart=restart, + num_splits=num_splits, + num_threads=num_threads, + calc_qual=True, + regions=missed_ensemble_beds_region) + tumor_counts_missed, _, filtered_candidates_vcfs_missed = tumor_outputs_missed + + tumor_counts += tumor_counts_missed + filtered_candidates_vcfs += filtered_candidates_vcfs_missed + ensemble_beds += missed_ensemble_beds + ensemble_beds += missed_ensemble_beds + split_region_files += missed_ensemble_beds_region + tags += ["m.{}".format(i) + for i in range(len(filtered_candidates_vcfs_missed))] if (not long_read): candidates_split_regions = extract_candidate_split_regions( - work_tumor, filtered_candidates_vcfs, split_regions, ensemble_beds, + filtered_candidates_vcfs, split_region_files, ensemble_beds, + tags, reference, matrix_base_pad, merge_d_for_short_read) if not candidates_split_regions: - candidates_split_regions = split_regions + candidates_split_regions = split_region_files work_normal = os.path.join(work, "work_normal") if restart or not os.path.exists(work_normal): os.mkdir(work_normal) logger.info("Scan normal bam (and extracting quality scores).") - normal_counts, _, _ = process_split_region("normal", work_normal, region_bed, reference, mode, normal_bam, - scan_window_size, 0.2, min_mapq, - None, 1, max_dp, - filter_duplicate, - good_ao, min_ao, snp_min_af, snp_min_bq, snp_min_ao, - ins_min_af, del_min_af, del_merge_min_af, - ins_merge_min_af, merge_r, - merge_d_for_scan, - report_all_alleles, - True, - scan_alignments_binary, restart, num_splits, num_threads, + + normal_counts, _, _ = process_split_region(tn="normal", + work=work_normal, + region=region_bed, + reference=reference, + mode=mode, + alignment_bam=normal_bam, + scan_window_size=scan_window_size, + scan_maf=0.2, + min_mapq=min_mapq, + filtered_candidates_vcf=None, + min_dp=1, + max_dp=max_dp, + filter_duplicate=filter_duplicate, + good_ao=good_ao, + min_ao=min_ao, + snp_min_af=snp_min_af, + snp_min_bq=snp_min_bq, + snp_min_ao=snp_min_ao, + ins_min_af=ins_min_af, + del_min_af=del_min_af, + del_merge_min_af=del_merge_min_af, + ins_merge_min_af=ins_merge_min_af, + merge_r=merge_r, + merge_d_for_scan=merge_d_for_scan, + report_all_alleles=report_all_alleles, + report_count_for_all_positions=True, + scan_alignments_binary=scan_alignments_binary, + restart=restart, + num_splits=num_splits, + num_threads=num_threads, calc_qual=True, regions=candidates_split_regions) work_dataset = os.path.join(work, "dataset") if restart or not os.path.exists(work_dataset): os.mkdir(work_dataset) + logger.info("Generate dataset.") map_args_gen = [] - for i, (tumor_count, normal_count, filtered_vcf, candidates_split_region) in enumerate(zip(tumor_counts, normal_counts, filtered_candidates_vcfs, candidates_split_regions)): + for i, (tumor_count, normal_count, filtered_vcf, candidates_split_region, tags_i) in enumerate(zip(tumor_counts, normal_counts, filtered_candidates_vcfs, candidates_split_regions, tags)): logger.info("Dataset for region {}".format(candidates_split_region)) - work_dataset_split = os.path.join(work_dataset, "work.{}".format(i)) + work_dataset_split = os.path.join( + work_dataset, "work.{}".format(tags_i)) if restart or not os.path.exists("{}/done.txt".format(work_dataset_split)): if os.path.exists(work_dataset_split): shutil.rmtree(work_dataset_split) @@ -637,15 +812,15 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, else: ensemble_bed_i = extra_features_bed map_args_gen.append([work_dataset_split, truth_vcf, mode, filtered_vcf, - candidates_split_region, tumor_count, normal_count, reference, - matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, - ensemble_bed_i, - ensemble_custom_header, - no_seq_complexity, no_feature_recomp_for_ensemble, - zero_vscore, - matrix_dtype, - strict_labeling, - tsv_batch_size]) + candidates_split_region, tumor_count, normal_count, reference, + matrix_width, matrix_base_pad, min_ev_frac_per_col, min_dp, + ensemble_bed_i, + ensemble_custom_header, + no_seq_complexity, no_feature_recomp_for_ensemble, + zero_vscore, + matrix_dtype, + strict_labeling, + tsv_batch_size]) pool = multiprocessing.Pool(num_threads) try: @@ -662,7 +837,6 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, if o is None: raise Exception("Generate dataset failed!") - shutil.rmtree(bed_tempdir) tempfile.tempdir = original_tempdir diff --git a/neusomatic/python/scan_alignments.py b/neusomatic/python/scan_alignments.py index 3e7b56a..7c5ecbf 100755 --- a/neusomatic/python/scan_alignments.py +++ b/neusomatic/python/scan_alignments.py @@ -23,6 +23,64 @@ from split_bed import split_region +def split_my_region(work, region_bed, num_threads, num_splits, reference, window_size, restart): + logger = logging.getLogger(split_my_region.__name__) + split_region_files = [] + work_regions = os.path.join(work, "regions") + if not os.path.exists(work_regions): + os.mkdir(work_regions) + + split_len_ratio = 0.98 + if region_bed: + tmp_regions_bed = get_tmp_file() + with open(region_bed) as i_f, open(tmp_regions_bed, "w") as o_f: + for line in skip_empty(i_f): + chrom, st, en = line.strip().split()[0:3] + o_f.write("\t".join([chrom, st, en]) + "\n") + tmp_regions_bed = bedtools_sort(tmp_regions_bed, run_logger=logger) + tmp_regions_bed = bedtools_merge( + tmp_regions_bed, args=" -d 0", run_logger=logger) + else: + tmp_regions_bed = get_tmp_file() + with pysam.FastaFile(reference) as ref_file: + with open(tmp_regions_bed, "w") as tmpfile: + for chrom, length in zip(ref_file.references, ref_file.lengths): + tmpfile.write("{}\t{}\t{}\t.\t.\t.\n".format( + chrom, 1, length - 1)) + + total_len = 0 + with open(tmp_regions_bed) as r_f: + for line in skip_empty(r_f): + chrom, st, en = line.strip().split("\t")[0:3] + total_len += int(en) - int(st) + 1 + if not restart: + split_region_files = glob.glob( + os.path.join(work_regions, "region_*.bed")) + spilt_total_len = 0 + for split_file in split_region_files: + with open(split_file) as s_f: + for line in skip_empty(s_f): + chrom, st, en = line.strip().split("\t")[0:3] + spilt_total_len += int(en) - int(st) + if spilt_total_len >= split_len_ratio * total_len: + split_region_files = sorted(split_region_files, + key=lambda x: int( + os.path.basename(x).split(".bed")[0].split( + "_")[1])) + if not split_region_files: + all_region_bed = os.path.join(work_regions, "all_regions.bed") + shutil.move(tmp_regions_bed, all_region_bed) + + if num_splits is not None: + num_split = num_splits + else: + num_split = max(int(np.ceil((total_len // 10000000) // + num_threads) * num_threads), num_threads) + split_region_files = split_region(work_regions, all_region_bed, num_split, + min_region=window_size, max_region=1e20) + return split_region_files + + def run_scan_alignments(record): work, reference, merge_d_for_scan, scan_alignments_binary, split_region_file, \ input_bam, window_size, \ @@ -99,21 +157,21 @@ def run_scan_alignments(record): outputs = scan_alignments(args.work, args.merge_d_for_scan, args.scan_alignments_binary, args.input_bam, args.regions_bed_file, args.reference, args.num_splits, - args.num_threads, args.window_size, + args.num_threads, args.window_size, args.snp_min_ao, args.snp_min_af, args.ins_min_af, args.del_min_af, - args.min_mapq, args.snp_min_bq, args.max_dp, args.min_dp, + args.min_mapq, args.snp_min_bq, args.max_dp, args.min_dp, args.report_all_alleles, args.report_count_for_all_positions, args.filter_duplicate) def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, regions_bed_file, reference, num_splits, - num_threads, window_size, + num_threads, window_size, snp_min_ao, snp_min_af, ins_min_af, del_min_af, min_mapq, snp_min_bq, max_dp, min_dp, - report_all_alleles, + report_all_alleles, report_count_for_all_positions, filter_duplicate, restart=True, split_region_files=[], calc_qual=True): @@ -121,55 +179,9 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, logger.info("-------------------Scan Alignment BAM----------------------") - split_len_ratio = 0.98 if not split_region_files: - if regions_bed_file: - regions_bed = get_tmp_file() - with open(regions_bed_file) as i_f, open(regions_bed, "w") as o_f: - for line in skip_empty(i_f): - chrom, st, en = line.strip().split()[0:3] - o_f.write("\t".join([chrom, st, en]) + "\n") - regions_bed = bedtools_sort(regions_bed, run_logger=logger) - regions_bed = bedtools_merge( - regions_bed, args=" -d 0", run_logger=logger) - else: - regions_bed = get_tmp_file() - with pysam.AlignmentFile(input_bam, "rb") as samfile: - with open(regions_bed, "w") as tmpfile: - for chrom, length in zip(samfile.references, samfile.lengths): - tmpfile.write("{}\t{}\t{}\t.\t.\t.\n".format( - chrom, 1, length - 1)) - if not os.path.exists(work): - os.mkdir(work) - total_len = 0 - with open(regions_bed) as r_f: - for line in skip_empty(r_f): - chrom, st, en = line.strip().split("\t")[0:3] - total_len += int(en) - int(st) + 1 - if not restart: - split_region_files = glob.glob(os.path.join(work, "region_*.bed")) - spilt_total_len = 0 - for split_file in split_region_files: - with open(split_file) as s_f: - for line in skip_empty(s_f): - chrom, st, en = line.strip().split("\t")[0:3] - spilt_total_len += int(en) - int(st) - if spilt_total_len >= split_len_ratio * total_len: - split_region_files = sorted(split_region_files, - key=lambda x: int( - os.path.basename(x).split(".bed")[0].split( - "_")[1])) - if not split_region_files: - regions_bed_file = os.path.join(work, "all_regions.bed") - shutil.move(regions_bed, regions_bed_file) - - if num_splits is not None: - num_split = num_splits - else: - num_split = max(int(np.ceil((total_len // 10000000) // - num_threads) * num_threads), num_threads) - split_region_files = split_region(work, regions_bed_file, num_split, - min_region=window_size, max_region=1e20) + split_region_files = split_my_region( + work, region_bed, num_threads, num_splits, reference, window_size, restart) else: logger.info("split_regions to be used (will ignore region_bed): {}".format( " ".join(split_region_files))) @@ -186,7 +198,7 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, shutil.rmtree(work_) map_args.append((os.path.join(work, "work.{}".format(i)), reference, merge_d_for_scan, scan_alignments_binary, split_region_file, - input_bam, window_size, + input_bam, window_size, snp_min_ao, snp_min_af, ins_min_af, del_min_af, min_mapq, snp_min_bq, max_dp, min_dp, @@ -272,10 +284,10 @@ def scan_alignments(work, merge_d_for_scan, scan_alignments_binary, input_bam, try: outputs = scan_alignments(args.work, args.merge_d_for_scan, args.scan_alignments_binary, args.input_bam, args.regions_bed_file, args.reference, args.num_splits, - args.num_threads, args.window_size, + args.num_threads, args.window_size, args.snp_min_ao, args.snp_min_af, args.ins_min_af, args.del_min_af, - args.min_mapq, args.snp_min_bq, args.max_dp, args.min_dp, + args.min_mapq, args.snp_min_bq, args.max_dp, args.min_dp, args.report_all_alleles, args.report_count_for_all_positions, args.filter_duplicate) except Exception as e: From 625469fdc7ac2a0a8e5197c3a72561cadead5235 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Mon, 19 Jul 2021 22:20:48 -0700 Subject: [PATCH 08/10] small fix --- neusomatic/python/preprocess.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/neusomatic/python/preprocess.py b/neusomatic/python/preprocess.py index 1aa4b5a..ec8ffd0 100755 --- a/neusomatic/python/preprocess.py +++ b/neusomatic/python/preprocess.py @@ -22,7 +22,7 @@ from scan_alignments import scan_alignments, split_my_region from extend_features import extend_features from utils import concatenate_vcfs, run_bedtools_cmd, bedtools_sort, bedtools_merge, bedtools_intersect, bedtools_slop, bedtools_window, get_tmp_file, skip_empty, vcf_2_bed -from defaults import MAT_DTYPES +from defaults import MAT_DTYPES, VCF_HEADER def process_split_region(tn, work, region, reference, mode, alignment_bam, @@ -473,13 +473,12 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, regions=candidates_split_regions) tumor_counts, _, filtered_candidates_vcfs = tumor_outputs - work_tumor_missed = os.path.join(work, "work_tumor_missed") - if restart or not os.path.exists(work_tumor_missed): - os.mkdir(work_tumor_missed) - filtered_candidates_vcf_missed = os.path.join( - work_tumor_missed, "filtered_candidates.vcf") - if ensemble_beds: + work_tumor_missed = os.path.join(work, "work_tumor_missed") + if restart or not os.path.exists(work_tumor_missed): + os.mkdir(work_tumor_missed) + filtered_candidates_vcf_missed = os.path.join( + work_tumor_missed, "filtered_candidates.vcf") missed_ensemble_beds_region = [] missed_ensemble_beds = [] for i, (filtered_vcf, ensemble_bed_i) in enumerate(zip(filtered_candidates_vcfs, ensemble_beds)): @@ -521,6 +520,11 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, calc_qual=True, regions=missed_ensemble_beds_region) tumor_counts_missed, _, filtered_candidates_vcfs_missed = tumor_outputs_missed + for filtered_vcf in filtered_candidates_vcfs_missed + [filtered_candidates_vcf_missed]: + with open(filtered_vcf, "w") as o_f: + o_f.write("{}\n".format(VCF_HEADER)) + o_f.write( + "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tSAMPLE\n") tumor_counts += tumor_counts_missed filtered_candidates_vcfs += filtered_candidates_vcfs_missed From 7071d4e3fbfdf5baa8d2d7ceaca2c8e588075d37 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Wed, 21 Jul 2021 20:28:32 -0700 Subject: [PATCH 09/10] small fix --- neusomatic/python/generate_dataset.py | 2 +- neusomatic/python/preprocess.py | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/neusomatic/python/generate_dataset.py b/neusomatic/python/generate_dataset.py index b51561c..be3c0e6 100755 --- a/neusomatic/python/generate_dataset.py +++ b/neusomatic/python/generate_dataset.py @@ -1225,7 +1225,7 @@ def find_records(input_record): keep_in_region(input_file=tmp_, region_bed=split_region_file, output_fn=split_ensemble_bed_file) tmp_ = bedtools_window( - split_ensemble_bed_file, split_pred_vcf_file, args=" -w 5 -v", run_logger=thread_logger) + split_ensemble_bed_file, split_pred_vcf_file, args=" -w 1 -v", run_logger=thread_logger) vcf_2_bed(tmp_, split_missed_ensemble_bed_file, add_fields=[".", ".", ".", ".", "."]) diff --git a/neusomatic/python/preprocess.py b/neusomatic/python/preprocess.py index ec8ffd0..de63b94 100755 --- a/neusomatic/python/preprocess.py +++ b/neusomatic/python/preprocess.py @@ -473,6 +473,15 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, regions=candidates_split_regions) tumor_counts, _, filtered_candidates_vcfs = tumor_outputs + if (not long_read): + candidates_split_regions = extract_candidate_split_regions( + filtered_candidates_vcfs, split_region_files, ensemble_beds, + tags, + reference, matrix_base_pad, merge_d_for_short_read) + + if not candidates_split_regions: + candidates_split_regions = split_region_files + if ensemble_beds: work_tumor_missed = os.path.join(work, "work_tumor_missed") if restart or not os.path.exists(work_tumor_missed): @@ -530,18 +539,11 @@ def preprocess(work, mode, reference, region_bed, tumor_bam, normal_bam, dbsnp, filtered_candidates_vcfs += filtered_candidates_vcfs_missed ensemble_beds += missed_ensemble_beds ensemble_beds += missed_ensemble_beds + candidates_split_regions += missed_ensemble_beds_region split_region_files += missed_ensemble_beds_region tags += ["m.{}".format(i) for i in range(len(filtered_candidates_vcfs_missed))] - if (not long_read): - candidates_split_regions = extract_candidate_split_regions( - filtered_candidates_vcfs, split_region_files, ensemble_beds, - tags, - reference, matrix_base_pad, merge_d_for_short_read) - - if not candidates_split_regions: - candidates_split_regions = split_region_files work_normal = os.path.join(work, "work_normal") if restart or not os.path.exists(work_normal): os.mkdir(work_normal) From c0af977e2c2a358439b9f5dd1aec4ded604b3f90 Mon Sep 17 00:00:00 2001 From: Sahraeian Date: Wed, 11 Aug 2021 15:14:57 -0700 Subject: [PATCH 10/10] small fix --- neusomatic/python/resolve_variants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neusomatic/python/resolve_variants.py b/neusomatic/python/resolve_variants.py index 4e7c009..bbc518f 100755 --- a/neusomatic/python/resolve_variants.py +++ b/neusomatic/python/resolve_variants.py @@ -103,7 +103,7 @@ def push_left_var(ref_fasta, chrom, pos, ref, alt): logger = logging.getLogger(push_left_var.__name__) pos = int(pos) while ref[-1] == alt[-1] and pos > 1: - prev_base = ref_fasta.fetch(chrom, pos - 2, pos - 1) + prev_base = ref_fasta.fetch(chrom, pos - 2, pos - 1).upper() pos -= 1 ref = prev_base + ref[:-1] alt = prev_base + alt[:-1]