-
Notifications
You must be signed in to change notification settings - Fork 0
/
Acc2tree.py
163 lines (129 loc) · 6.99 KB
/
Acc2tree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import csv
import time
import os
import warnings
import subprocess
from Bio.Blast import NCBIWWW, NCBIXML
from Bio import Entrez, SeqIO
from Bio.SeqFeature import BiopythonParserWarning
def perform_blast_search(accession_id, max_target_seqs):
Entrez.email = input("Please Enter your email: ")
print("Performing BLAST search...")
result_handle = NCBIWWW.qblast(
"blastp",
"nr",
accession_id,
hitlist_size=max_target_seqs,
expect=10,
matrix_name="BLOSUM62",
)
print("BLAST search completed.")
return result_handle
def retrieve_protein_sequence(accession):
try:
handle = Entrez.efetch(db="protein", id=accession, rettype="gb", retmode="text", timeout=30)
record = SeqIO.read(handle, "genbank")
handle.close()
return record
except Exception as e:
print(f"Error retrieving protein sequence for {accession}: {e}")
return None
def main():
warnings.filterwarnings("ignore", category=BiopythonParserWarning, message="Dropping bond qualifier in feature location")
accession_id = input("Enter the accession ID: ")
max_target_seqs = input("Enter your desired Maximum Target Sequences: ")
while True:
preferred_tree = input("What's your preferred program for making the tree? 1 for RAxML 2 for IQtree: ")
if preferred_tree in ['1', '2']:
break
else:
print(" Invalid input. Please enter 1 or 2.")
result_handle = perform_blast_search(accession_id, max_target_seqs)
blast_records = NCBIXML.parse(result_handle)
selected_records = []
fasta_sequence_dict = {}
for blast_record in blast_records:
try:
taxon_records = {}
for alignment in blast_record.alignments:
accession = alignment.accession
e_value = alignment.hsps[0].expect
definition = alignment.hit_def
scientific_name = definition.split("[")[1].split("]")[0]
taxon = scientific_name.lower()
if accession.startswith("WP_"):
if taxon not in taxon_records:
taxon_records[taxon] = [(accession, e_value)]
else:
taxon_records[taxon].append((accession, e_value))
else:
if taxon not in taxon_records:
taxon_records[taxon] = [(accession, e_value)]
selected_records.append((taxon, accession, e_value))
else:
if e_value < taxon_records[taxon][0][1]:
taxon_records[taxon][0] = (accession, e_value)
selected_records = [rec for rec in selected_records if rec[0] != taxon]
for taxon, records in taxon_records.items():
if len(records) == 1:
selected_records.append((taxon, records[0][0], records[0][1]))
except ValueError as e:
print(f"Error processing record: {e}")
continue
if selected_records:
selected_records.sort(key=lambda x: (x[1].startswith("WP_"), x[2]))
fasta_sequences = []
fasta_records = {}
for i, record in enumerate(selected_records):
protein_record = retrieve_protein_sequence(record[1])
if protein_record:
fasta_sequences.append(protein_record.format("fasta"))
fasta_records[record[1]] = protein_record.format("fasta")
fasta_sequence_dict[record[1]] = protein_record.seq
organism = protein_record.annotations.get('organism', '')
print(f"\rRecord {i+1}/{len(selected_records)} processed", end="", flush=True)
directory_path = f'C:\\ACC2TREE\\{accession_id}'
if not os.path.exists(directory_path):
os.makedirs(directory_path)
with open(f'{directory_path}\\combined_fasta_sequences.fasta', "w") as fasta_file:
for accession, fasta_record in fasta_records.items():
fasta_file.write(fasta_record)
print("\nCombined FASTA sequences saved to combined_fasta_sequences.fasta")
print("Writing results to CSV...")
with open(f'{directory_path}\\blast_results.csv', "w", newline="") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(["DEFINITION", "Accession ID", "e value", "FASTA Sequence", "Link"])
for i in range(len(selected_records)):
taxon, accession, e_value = selected_records[i]
fasta_sequence = fasta_sequence_dict.get(accession, "N/A")
link = f"https://www.ncbi.nlm.nih.gov/protein/{accession}"
csv_writer.writerow([taxon, accession, e_value, fasta_sequence, link])
print(f"\rRecord {i+1}/{len(selected_records)} written to CSV", end="", flush=True)
time.sleep(1)
try:
print("\nMuscle is Aligning the sequences...")
subprocess.run(['muscle', '-align', f'{directory_path}\\combined_fasta_sequences.fasta', '-output', f'{directory_path}\\aligned_output_file'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
print('\r' + 'Alignment completed.', end="", flush=True)
except subprocess.CalledProcessError:
print("Alignment failed.")
def perform_RAxML(directory_path):
print("\nRAxML is making the tree...")
try:
subprocess.run(['Raxml', "-s", f'{directory_path}\\aligned_output_file', "-w", f'{directory_path}', "-n", "output", "-m", "PROTGAMMAAUTO", "-f", "a", "-x", "1000", "-p", "1988", "-k", "-N", "autoMRE"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
print('\r' + 'Making the tree completed.', end="", flush=True)
except subprocess.CalledProcessError:
print("Making the tree failed.")
def perform_IQtree(directory_path):
print("\nIQtree is making the tree...")
try:
subprocess.run(['iqtree', "-s", f'{directory_path}\\aligned_output_file', "-m", "MFP", "-nt", "AUTO", "-bb", "1000", "-alrt", "1000", "-quiet"])
print('\r' + 'Making the tree completed.', end="", flush=True)
except subprocess.CalledProcessError:
print("Making the tree failed.")
if preferred_tree == "1":
perform_RAxML(directory_path)
else: perform_IQtree(directory_path)
print("\nProcess completed.")
current_directory = os.path.dirname(os.path.abspath(__file__))
if __name__ == "__main__":
main()