From ba22dacee09e363fbb0898993ad9689ba09e616a Mon Sep 17 00:00:00 2001 From: Oliver Schwengers Date: Thu, 7 Nov 2024 17:39:31 +0100 Subject: [PATCH] implement feature type figure --- bakta/io/insdc.py | 12 +- bakta/main.py | 4 +- bakta/plot.py | 297 +++++++++++++++++++++++++++++----------------- 3 files changed, 200 insertions(+), 113 deletions(-) diff --git a/bakta/io/insdc.py b/bakta/io/insdc.py index 49d7427..35bb755 100644 --- a/bakta/io/insdc.py +++ b/bakta/io/insdc.py @@ -18,9 +18,7 @@ log = logging.getLogger('INSDC') -def write_features(data: dict, features: Sequence[dict], genbank_output_path: Path, embl_output_path: Path): - log.debug('prepare: genbank=%s, embl=%s', genbank_output_path, embl_output_path) - +def build_biopython_sequence_list(data: dict, features: Sequence[dict]): sequence_list = [] for seq in data['sequences']: sequence_features = [feat for feat in features if feat['sequence'] == seq['id']] @@ -275,7 +273,13 @@ def write_features(data: dict, features: Sequence[dict], genbank_output_path: Pa seq_feature_list.append(acc_feature) sequence_record.features = seq_feature_list sequence_list.append(sequence_record) + return sequence_list + +def write_features(data: dict, features: Sequence[dict], genbank_output_path: Path, embl_output_path: Path): + log.debug('prepare: genbank=%s, embl=%s', genbank_output_path, embl_output_path) + + sequence_list = build_biopython_sequence_list(data, features) with genbank_output_path.open('wt', encoding='utf-8') as fh: log.info('write GenBank: path=%s', genbank_output_path) SeqIO.write(sequence_list, fh, format='genbank') @@ -283,8 +287,6 @@ def write_features(data: dict, features: Sequence[dict], genbank_output_path: Pa with embl_output_path.open('wt', encoding='utf-8') as fh: log.info('write EMBL: path=%s', embl_output_path) SeqIO.write(sequence_list, fh, format='embl') - - return sequence_list def select_ncrna_class(feature: dict) -> str: diff --git a/bakta/main.py b/bakta/main.py index 6d76d59..c89a394 100755 --- a/bakta/main.py +++ b/bakta/main.py @@ -553,7 +553,7 @@ def main(): print('\tINSDC GenBank & EMBL...') genbank_path = cfg.output_path.joinpath(f'{cfg.prefix}.gbff') embl_path = cfg.output_path.joinpath(f'{cfg.prefix}.embl') - biopython_sequences = insdc.write_features(data, features, genbank_path, embl_path) + insdc.write_features(data, features, genbank_path, embl_path) print('\tgenome sequences...') fna_path = cfg.output_path.joinpath(f'{cfg.prefix}.fna') @@ -575,7 +575,7 @@ def main(): print('\tskip generation of circular genome plot...') else: print('\tcircular genome plot...') - plot.write(biopython_sequences, sequences, cfg.output_path) + plot.write(data, features, cfg.output_path) if(cfg.skip_cds is False): hypotheticals = [feat for feat in features if feat['type'] == bc.FEATURE_CDS and 'hypothetical' in feat] diff --git a/bakta/plot.py b/bakta/plot.py index f4beef1..d9155f6 100644 --- a/bakta/plot.py +++ b/bakta/plot.py @@ -1,8 +1,8 @@ import atexit +import copy import json import logging import os -import subprocess as sp import sys from pathlib import Path @@ -12,15 +12,16 @@ import numpy as np from Bio import SeqUtils +from Bio.SeqFeature import FeatureLocation, CompoundLocation, AfterPosition, BeforePosition from matplotlib.patches import Patch from matplotlib.lines import Line2D from pycirclize import Circos -from pycirclize.utils import load_prokaryote_example_file import bakta import bakta.utils as bu import bakta.config as cfg import bakta.constants as bc +import bakta.io.insdc as insdc COLORS = { @@ -30,8 +31,8 @@ 'gc-skew-positive': '#fdbf6f', 'gc-skew-negative': '#1f78b4', 'features': { # feature type colors - bc.FEATURE_CDS: 'cccccc', - bc.FEATURE_SORF: 'cccccc', + bc.FEATURE_CDS: '#cccccc', + bc.FEATURE_SORF: '#cccccc', bc.FEATURE_T_RNA: '#b2df8a', bc.FEATURE_TM_RNA: '#b2df8a', bc.FEATURE_R_RNA: '#fb8072', @@ -174,9 +175,9 @@ def main(): # load genome annotations print('Parse genome annotations...') with annotation_path.open('r') as fh: - annotation = json.load(fh) - features = annotation['features'] - sequences = annotation['sequences'] + data = json.load(fh) + features = data['features'] + sequences = data['sequences'] # load colors if specified colors = COLORS @@ -187,7 +188,7 @@ def main(): print('Draw plots...') if args.sequences == 'all': # write whole genome plot print(f'\tdraw circular genome plot (type={plot_type}) containing all sequences...') - write(features, sequences, output_path, colors, plot_type=plot_type) + write(data, features, output_path, colors, plot_type=plot_type) else: # write genome plot containing provided sequences only plot_sequences = [] sequence_identifiers = [] @@ -204,110 +205,194 @@ def main(): print(f'\tdraw circular genome plot (type={plot_type}) containing sequences: {sequence_identifiers}...') plot_name_suffix = '_'.join(sequence_identifiers) plot_sequence_ids = [seq['id'] for seq in plot_sequences] - features = [feat for feat in features if feat['sequence'] in plot_sequence_ids] - write(features, plot_sequences, output_path, colors, plot_name_suffix=plot_name_suffix, plot_type=plot_type) - - -def write(features, sequences, output_path, colors=COLORS, plot_name_suffix=None, plot_type=bc.PLOT_FEATURES): - # config paths - circos_path = cfg.tmp_path.joinpath(f'circos') - circos_path.mkdir(parents=True, exist_ok=True) + data['features'] = [feat for feat in features if feat['sequence'] in plot_sequence_ids] # reduce feature list in data object + data['sequences'] = [seq for seq in sequences if seq['id'] in plot_sequence_ids] # reduce sequence list in data object + write(data, features, output_path, colors, plot_name_suffix=plot_name_suffix, plot_type=plot_type) + + +def write(data, features, output_path, colors=COLORS, plot_name_suffix=None, plot_type=bc.PLOT_FEATURES): + sequence_list = insdc.build_biopython_sequence_list(data, features) + for seq in sequence_list: # fix edge features because PyCirclize cannot handle them correctly + seq.features = [feat for feat in seq.features if feat.type != 'gene' and feat.type != 'source'] + # seq.features = [feat for feat in seq.features if isinstance(feat.location, FeatureLocation)]# and isinstance(feat.location.start, str) and isinstance(feat.location.end, str)] + for feat in seq.features: + feat_loc = feat.location + if isinstance(feat_loc, CompoundLocation): + log.debug('split edge feature: seq=%s, start=%i, stop=%i, strand=%s', seq.id, feat_loc.start, feat_loc.end, '+' if feat.strand==1 else '-') + feat.location = FeatureLocation(feat_loc.parts[0].start, len(seq.seq), strand=feat.strand) + feat_2 = copy.deepcopy(feat) + feat_2.location = FeatureLocation(0, feat_loc.parts[1].end, strand=feat.strand) + seq.features.append(feat_2) + elif isinstance(feat_loc.start, AfterPosition) or isinstance(feat_loc.start, BeforePosition): + feat.location = FeatureLocation(int(str(feat_loc.start)[1:]), feat_loc.end, strand=feat.strand) + elif isinstance(feat_loc.end, AfterPosition) or isinstance(feat_loc.end, BeforePosition): + feat.location = FeatureLocation(feat_loc.start, int(str(feat_loc.end)[1:]), strand=feat.strand) # select style if plot_type == bc.PLOT_COG: - plot = write_features_type_cog(features, sequences, circos_path, colors) + print('ÄTSCH') + # plot = write_features_type_cog(sequence_list, colors) else: - plot = write_features_type_feature(features, sequences, circos_path, colors) - - max_gc, max_gc_skew = write_gc_content_skew(sequences, circos_path, colors) + plot = write_features_type_feature(data, sequence_list, colors) + file_name = cfg.prefix if plot_name_suffix is None else f'{cfg.prefix}_{plot_name_suffix}' + for file_type in ['png', 'svg']: + file_path = output_path.joinpath(f'{file_name}.{file_type}') + plot.savefig(file_path) + + +def write_features_type_feature(data, sequence_list, colors): + # Get contig genome seqid & size, features dict + seqid2seq = {rec.id:rec.seq for rec in sequence_list} + seqid2size = {rec.id:len(rec.seq) for rec in sequence_list} + seqid2features = {rec.id:rec.features for rec in sequence_list} + + circos = Circos(seqid2size, space=2) + taxon = data['genome']['taxon'] if data['genome']['taxon'] else '' + circos.text(f"{taxon}", r=5, size=15) + for sector in circos.sectors: + # Plot outer track + outer_track = sector.add_track((98, 100)) + outer_track.axis(fc="lightgrey") + major_interval = 500_000 + minor_interval = int(major_interval / 5) + if sector.size > minor_interval: + outer_track.xticks_by_interval(major_interval, label_formatter=lambda v: f"{v / 1000000:.1f} Mbp") + outer_track.xticks_by_interval(minor_interval, tick_length=1, show_label=False) + + # Plot feature tracks + feature_forward_track = sector.add_track((85, 95), r_pad_ratio=0.1) + feature_reverse_track = sector.add_track((75, 85), r_pad_ratio=0.1) + for feature in seqid2features[sector.name]: + track = feature_forward_track if feature.location.strand == 1 else feature_reverse_track + if feature.type == bc.INSDC_FEATURE_CDS: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_CDS]) + elif feature.type == bc.INSDC_FEATURE_T_RNA: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_T_RNA], lw=0.1) + elif feature.type == bc.INSDC_FEATURE_TM_RNA: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_TM_RNA], lw=0.1) + elif feature.type == bc.INSDC_FEATURE_R_RNA: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_R_RNA]) + elif feature.type == bc.INSDC_FEATURE_NC_RNA: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_NC_RNA], lw=0.1) + elif feature.type == bc.INSDC_FEATURE_REGULATORY: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_NC_RNA_REGION], lw=0.1) + elif feature.type == bc.INSDC_FEATURE_REPEAT_REGION: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_CRISPR], lw=0.1) + elif feature.type == bc.INSDC_FEATURE_GAP: + track.genomic_features([feature], fc=colors['features'][bc.FEATURE_GAP], lw=0.1) + else: + track.genomic_features([feature], fc=colors['features']['misc'], lw=0.1) - + seq = str(seqid2seq[sector.name]) + + # GC content + gc_content_track = sector.add_track((55, 63)) + pos_list, gc_contents = calc_gc_content(seq) + gc_contents = gc_contents - gc_fraction(seq) * 100 + positive_gc_contents = np.where(gc_contents > 0, gc_contents, 0) + negative_gc_contents = np.where(gc_contents < 0, gc_contents, 0) + abs_max_gc_content = np.max(np.abs(gc_contents)) + gc_content_track.fill_between(pos_list, positive_gc_contents, 0, vmin=-abs_max_gc_content, vmax=abs_max_gc_content, color=colors['gc-positive']) + gc_content_track.fill_between(pos_list, negative_gc_contents, 0, vmin=-abs_max_gc_content, vmax=abs_max_gc_content, color=colors['gc-negative']) + + # Plot GC skew + gc_skew_track = sector.add_track((45, 53)) + pos_list, gc_skews = calc_gc_skew(seq) + positive_gc_skews = np.where(gc_skews > 0, gc_skews, 0) + negative_gc_skews = np.where(gc_skews < 0, gc_skews, 0) + abs_max_gc_skew = np.max(np.abs(gc_skews)) + gc_skew_track.fill_between(pos_list, positive_gc_skews, 0, vmin=-abs_max_gc_skew, vmax=abs_max_gc_skew, color=colors['gc-skew-positive']) + gc_skew_track.fill_between(pos_list, negative_gc_skews, 0, vmin=-abs_max_gc_skew, vmax=abs_max_gc_skew, color=colors['gc-skew-negative']) - - -def write_features_type_feature(features, sequences, circos_path, colors): - features_plus = [] - features_minus = [] - sequence_ids = set([seq['id'] for seq in sequences]) - for feat in features: - if feat['sequence'] not in sequence_ids: - continue - seq, start, stop, type = feat['sequence'], feat['start'], feat['stop'], feat['type'] - color = colors['features'].get(type, colors['features']['misc']) - if feat['strand'] == bc.STRAND_FORWARD: - features_plus.append(f"{seq} {start} {stop} {bc.STRAND_FORWARD} color={hex_to_rgb(color)}") - else: - features_minus.append(f"{seq} {start} {stop} {bc.STRAND_REVERSE} color={hex_to_rgb(color)}") - return [features_plus_path, features_minus_path] - - -def write_features_type_cog(features, sequences, circos_path, colors): - features_plus = [] - features_minus = [] - features_extra = [] - sequence_ids = set([seq['id'] for seq in sequences]) - for feat in features: - if feat['sequence'] not in sequence_ids: - continue - seq, start, stop = feat['sequence'], feat['start'], feat['stop'] - if feat['type'] == bc.FEATURE_CDS: - color = colors['features'][bc.FEATURE_CDS] - psc = feat.get('psc', None) - if psc is not None: - cog = psc.get('cog_category', None) - if cog is not None: - if len(cog) != 1: - cog = cog[:1] - color = colors['cog-classes'].get(cog.upper(), colors['cog-classes']['S']) - if feat['strand'] == bc.STRAND_FORWARD: - features_plus.append(f"{seq} {start} {stop} {feat['strand']} color={hex_to_rgb(color)}") - else: - features_minus.append(f"{seq} {start} {stop} {feat['strand']} color={hex_to_rgb(color)}") - else: - features_extra.append(f"{seq} {start} {stop} {feat['strand']} color={hex_to_rgb(colors['features']['misc'])}") - - -def write_gc_content_skew(sequences, circos_path, colors): - sequence_length = sum([seq['length'] for seq in sequences]) - step_size = int(sequence_length / 3600) # 10 * 360° - if step_size < 3: - step_size = 3 - window_size = 2 * step_size - if window_size < 50: - window_size = 50 - gc_contents = [] - gc_skews = [] - max_gc = 0 - max_gc_skew = 0 - if float(bp.__version__) >= 1.80: - gc_mean = SeqUtils.gc_fraction(''.join([seq['nt'] for seq in sequences])) - else: - gc_mean = SeqUtils.GC(''.join([seq['nt'] for seq in sequences])) / 100 - for seq in sequences: - nt = seq['nt'] - for w in range(0, len(nt), step_size): - start = w - window_size - if start < 0: - start += len(nt) - stop = w + window_size - if stop > len(nt): - stop -= len(nt) - nt_subseq = nt[start:stop] if start < stop else nt[start:] + nt[:stop] - if float(bp.__version__) >= 1.80: - gc_value = gc_mean - SeqUtils.gc_fraction(nt_subseq) - else: - gc_value = gc_mean - (SeqUtils.GC(nt_subseq) / 100) - if max_gc < abs(gc_value): - max_gc = abs(gc_value) - g, c = nt_subseq.count('G'), nt_subseq.count('C') - gc_skew = gc_skew = (g - c) / float(g + c) if (g + c) > 0 else 0.0 - if max_gc_skew < abs(gc_skew): - max_gc_skew = abs(gc_skew) - - log.debug('write gc config: seq-length=%i, step-size=%i, window-size=%i, max-gc=%i, max-gc-skew=%i', sequence_length, step_size, window_size, max_gc, max_gc_skew) - return max_gc, max_gc_skew - - + fig = circos.plotfig(dpi=600) + handles=[ + Patch(color=colors['features'][bc.FEATURE_CDS], label='CDS'), + Patch(color=colors['features'][bc.FEATURE_T_RNA], label='tRNA'), + Patch(color=colors['features'][bc.FEATURE_R_RNA], label='rRNA'), + Patch(color=colors['features'][bc.FEATURE_NC_RNA], label='ncRNA'), + Patch(color=colors['features'][bc.FEATURE_CRISPR], label='CRISPR'), + Line2D([], [], color=colors['gc-positive'], label="+ GC", marker="^", ms=6, ls="None"), + Line2D([], [], color=colors['gc-negative'], label="- GC", marker="v", ms=6, ls="None"), + Line2D([], [], color=colors['gc-skew-positive'], label="+ GC Skew", marker="^", ms=6, ls="None"), + Line2D([], [], color=colors['gc-skew-negative'], label="- GC Skew", marker="v", ms=6, ls="None") + ] + _ = circos.ax.legend( + handles=handles, + bbox_to_anchor=(0.5, 0.45), + loc='center', + ncols=2, + fontsize=6 + ) + return fig + + +# def write_features_type_cog(features, sequences, circos_path, colors): +# features_plus = [] +# features_minus = [] +# features_extra = [] +# sequence_ids = set([seq['id'] for seq in sequences]) +# for feat in features: +# if feat['sequence'] not in sequence_ids: +# continue +# seq, start, stop = feat['sequence'], feat['start'], feat['stop'] +# if feat['type'] == bc.FEATURE_CDS: +# color = colors['features'][bc.FEATURE_CDS] +# psc = feat.get('psc', None) +# if psc is not None: +# cog = psc.get('cog_category', None) +# if cog is not None: +# if len(cog) != 1: +# cog = cog[:1] +# color = colors['cog-classes'].get(cog.upper(), colors['cog-classes']['S']) +# if feat['strand'] == bc.STRAND_FORWARD: +# features_plus.append(f"{seq} {start} {stop} {feat['strand']} color={hex_to_rgb(color)}") +# else: +# features_minus.append(f"{seq} {start} {stop} {feat['strand']} color={hex_to_rgb(color)}") +# else: +# features_extra.append(f"{seq} {start} {stop} {feat['strand']} color={hex_to_rgb(colors['features']['misc'])}") + + +def calc_gc_content(seq: str): + pos_list, gc_content_list = [], [] + window_size = int(len(seq) / 500) + step_size = int(len(seq) / 1000) + if window_size == 0 or step_size == 0: + window_size, step_size = len(seq), int(len(seq) / 2) + pos_list = list(range(0, len(seq), step_size)) + [len(seq)] + for pos in pos_list: + window_start_pos = pos - int(window_size / 2) + window_end_pos = pos + int(window_size / 2) + window_start_pos = 0 if window_start_pos < 0 else window_start_pos + window_end_pos = len(seq) if window_end_pos > len(seq) else window_end_pos + subseq = seq[window_start_pos:window_end_pos] + gc_content = gc_fraction(subseq) * 100 + gc_content_list.append(gc_content) + return np.array(pos_list).astype(np.int64), np.array(gc_content_list).astype(np.float64) + + +def calc_gc_skew(seq: str): + pos_list, gc_skew_list = [], [] + window_size = int(len(seq) / 500) + step_size = int(len(seq) / 1000) + if window_size == 0 or step_size == 0: + window_size, step_size = len(seq), int(len(seq) / 2) + pos_list = list(range(0, len(seq), step_size)) + [len(seq)] + for pos in pos_list: + window_start_pos = pos - int(window_size / 2) + window_end_pos = pos + int(window_size / 2) + window_start_pos = 0 if window_start_pos < 0 else window_start_pos + window_end_pos = len(seq) if window_end_pos > len(seq) else window_end_pos + subseq = seq[window_start_pos:window_end_pos] + g, c = subseq.count('G'), subseq.count('C') + gc_skew = (g - c) / float(g + c) if (g + c) > 0 else 0.0 + gc_skew_list.append(gc_skew) + return np.array(pos_list).astype(np.int64), np.array(gc_skew_list).astype(np.float64) + + +def gc_fraction(seq: str): + gc = sum(seq.count(x) for x in 'CGS') + length = len(seq) + return 0 if length == 0 else gc / length if __name__ == '__main__': main()