From 83cab6d0c46f116cf5bc51cfc35fa8aef3bb2584 Mon Sep 17 00:00:00 2001 From: Terazus Date: Wed, 20 Mar 2024 14:04:09 +0000 Subject: [PATCH] removed old code --- isatools/isatab/load/__init__.py | 9 +- isatools/isatab/load/core.py | 391 ++++--------------------------- isatools/isatab/load/read.py | 175 -------------- 3 files changed, 52 insertions(+), 523 deletions(-) delete mode 100644 isatools/isatab/load/read.py diff --git a/isatools/isatab/load/__init__.py b/isatools/isatab/load/__init__.py index 28e0d128..c35bb98e 100644 --- a/isatools/isatab/load/__init__.py +++ b/isatools/isatab/load/__init__.py @@ -1,3 +1,8 @@ -from isatools.isatab.load.read import read_investigation_file, read_tfile from isatools.isatab.load.ProcessSequenceFactory import ProcessSequenceFactory, preprocess -from isatools.isatab.load.core import load, merge_study_with_assay_tables, load_table +from isatools.isatab.load.core import ( + load, + merge_study_with_assay_tables, + load_table, + read_investigation_file, + read_tfile +) diff --git a/isatools/isatab/load/core.py b/isatools/isatab/load/core.py index 9eaeb3c5..1619d285 100644 --- a/isatools/isatab/load/core.py +++ b/isatools/isatab/load/core.py @@ -12,10 +12,9 @@ from numpy import nan from isatools.utils import utf8_text_file_open -from isatools.isatab.load.read import read_tfile, read_investigation_file from isatools.isatab.load.ProcessSequenceFactory import ProcessSequenceFactory from isatools.isatab.defaults import _RX_COMMENT, log -from isatools.isatab.utils import strip_comments +from isatools.isatab.utils import strip_comments, IsaTabDataFrame from isatools.model import ( OntologyAnnotation, Publication, @@ -673,354 +672,21 @@ def load_tables(self): self.update_protocols(process, self.__study, self.protocol_map) -def load(isatab_path_or_ifile: object, skip_load_tables: object = False) -> object: +def load(isatab_path_or_ifile: TextIO, skip_load_tables: bool = False) -> Investigation: """Load an ISA-Tab into ISA Data Model objects - :rtype: object :param isatab_path_or_ifile: Full path to an ISA-Tab directory or file-like buffer object pointing to an investigation file - :param skip_load_tables: Whether or not to skip loading the table files + :param skip_load_tables: Whether to skip loading the table files :return: Investigation objects """ + investigation_loader: ISATabInvestigationLoader = ISATabInvestigationLoader( + file=isatab_path_or_ifile, skip_load_table=skip_load_tables + ) + return investigation_loader.investigation - # from DF of investigation file - def get_ontology_source(term_source_ref): - try: - current_onto_source = ontology_source_map[term_source_ref] - except KeyError: - current_onto_source = None - return current_onto_source - - def get_oa(val, accession, ts_ref): - """Gets a OntologyAnnotation for a give value, accession and - term source REF - - :param val: Value of the OA - :param accession: Term Accession Number of the OA - :param ts_ref: Term Source REF of the OA - :return: An OntologyAnnotation object - """ - if val == '' and accession == '': - return None - else: - return OntologyAnnotation( - term=val, - term_accession=accession, - term_source=get_ontology_source(ts_ref) - ) - - def get_oa_list_from_semi_c_list(vals, accessions, ts_refs): - """Gets a list of OntologyAnnotations from semi-colon delimited lists - - :param vals: A list of values, separated by semi-colons - :param accessions: A list of accessions, separated by semi-colons - :param ts_refs: A list of term source REFs, separated by semi-colons - :return: A list of OntologyAnnotation objects - """ - oa_list = [] - accession_split = accessions.split(';') - ts_refs_split = ts_refs.split(';') - # if no acc or ts_refs - if accession_split == [''] and ts_refs_split == ['']: - for val in vals.split(';'): - oa_list.append(OntologyAnnotation(term=val, )) - else: # try parse all three sections - for _, val in enumerate(vals.split(';')): - oa = get_oa(val, accessions.split(';')[_], ts_refs.split(';')[_]) - if oa is not None: - oa_list.append(oa) - return oa_list - - def get_publications(section_df): - """Get a list of Publications from the relevant investigation file - section - - :param section_df: A PUBLICATIONS section DataFrame - :return: A list of Publication objects - """ - if 'Investigation PubMed ID' in section_df.columns: - prefix = 'Investigation ' - elif 'Study PubMed ID' in section_df.columns: - prefix = 'Study ' - else: - raise KeyError - - publications = [] - - for _, current_row in section_df.iterrows(): - publication = Publication(pubmed_id=current_row[prefix + 'PubMed ID'], - doi=current_row[prefix + 'Publication DOI'], - author_list=current_row[ - prefix + 'Publication Author List'], - title=current_row[prefix + 'Publication Title']) - - publication.status = get_oa( - current_row[prefix + 'Publication Status'], - current_row[prefix + 'Publication Status Term Accession Number'], - current_row[prefix + 'Publication Status Term Source REF']) - publication.comments = get_comments_row(section_df.columns, current_row) - publications.append(publication) - - return publications - - def get_contacts(section_df): - """Get a list of Person objects from the relevant investigation file - section - - :param section_df: A CONTACTS section DataFrame - :return: A list of Person objects - """ - if 'Investigation Person Last Name' in section_df.columns: - prefix = 'Investigation ' - elif 'Study Person Last Name' in section_df.columns: - prefix = 'Study ' - else: - raise KeyError - - contacts = [] - - for _, current_row in section_df.iterrows(): - person = Person(last_name=current_row[prefix + 'Person Last Name'], - first_name=current_row[prefix + 'Person First Name'], - mid_initials=current_row[prefix + 'Person Mid Initials'], - email=current_row[prefix + 'Person Email'], - phone=current_row[prefix + 'Person Phone'], - fax=current_row[prefix + 'Person Fax'], - address=current_row[prefix + 'Person Address'], - affiliation=current_row[prefix + 'Person Affiliation']) - - person.roles = get_oa_list_from_semi_c_list( - current_row[prefix + 'Person Roles'], - current_row[prefix + 'Person Roles Term Accession Number'], - current_row[prefix + 'Person Roles Term Source REF']) - person.comments = get_comments_row(section_df.columns, current_row) - contacts.append(person) - - return contacts - - def get_comments(section_df): - """Get Comments from a section DataFrame - - :param section_df: A section DataFrame - :return: A list of Comment objects as found in the section - """ - comments = [] - for col in [x for x in section_df.columns if _RX_COMMENT.match(str(x))]: - for _, current_row in section_df.iterrows(): - comment = Comment( - name=next(iter(_RX_COMMENT.findall(col))), value=current_row[col]) - comments.append(comment) - return comments - - def get_comments_row(cols, row): - """Get Comments in a given DataFrame row - - :param cols: List of DataFrame columns - :param row: DataFrame row as a Series object - :return: A list of Comment objects - """ - comments = [] - for col in [x for x in cols if _RX_COMMENT.match(str(x))]: - comment = Comment( - name=next(iter(_RX_COMMENT.findall(col))), value=row[col]) - comments.append(comment) - return comments - - def get_ontology_sources(r): - ontology_source = OntologySource( - name=r['Term Source Name'], - file=r['Term Source File'], - version=r['Term Source Version'], - description=r['Term Source Description']) - ontology_source.comments = get_comments_row(df_dict['ontology_sources'].columns, r) - investigation.ontology_source_references.append(ontology_source) - - FP = None - - if isinstance(isatab_path_or_ifile, str): - if path.isdir(isatab_path_or_ifile): - fnames = glob(path.join(isatab_path_or_ifile, "i_*.txt")) - assert len(fnames) == 1 - FP = utf8_text_file_open(fnames[0]) - elif hasattr(isatab_path_or_ifile, 'read'): - FP = isatab_path_or_ifile - else: - raise IOError("Cannot resolve input file") - - try: - df_dict = read_investigation_file(FP) - investigation = Investigation() - - df_dict['ontology_sources'].apply(lambda x: get_ontology_sources(x), axis=1) - ontology_source_map = dict(map(lambda x: (x.name, x), investigation.ontology_source_references)) - - if not df_dict['investigation'].empty: - row = df_dict['investigation'].iloc[0] - investigation.identifier = str(row['Investigation Identifier']) - investigation.title = row['Investigation Title'] - investigation.description = row['Investigation Description'] - investigation.submission_date = row['Investigation Submission Date'] - investigation.public_release_date = row['Investigation Public Release Date'] - investigation.publications = get_publications(df_dict['i_publications']) - investigation.contacts = get_contacts(df_dict['i_contacts']) - investigation.comments = get_comments(df_dict['investigation']) - - for i in range(0, len(df_dict['studies'])): - row = df_dict['studies'][i].iloc[0] - study = Study() - study.identifier = str(row['Study Identifier']) - study.title = row['Study Title'] - study.description = row['Study Description'] - study.submission_date = row['Study Submission Date'] - study.public_release_date = row['Study Public Release Date'] - study.filename = row['Study File Name'] - - study.publications = get_publications(df_dict['s_publications'][i]) - study.contacts = get_contacts(df_dict['s_contacts'][i]) - study.comments = get_comments(df_dict['studies'][i]) - - for _, row in df_dict['s_design_descriptors'][i].iterrows(): - design_descriptor = get_oa( - row['Study Design Type'], - row['Study Design Type Term Accession Number'], - row['Study Design Type Term Source REF']) - these_comments = get_comments_row(df_dict['s_design_descriptors'][i].columns, row) - design_descriptor.comments = these_comments - study.design_descriptors.append(design_descriptor) - - for _, row in df_dict['s_factors'][i].iterrows(): - factor = StudyFactor(name=row['Study Factor Name']) - factor.factor_type = get_oa( - row['Study Factor Type'], - row['Study Factor Type Term Accession Number'], - row['Study Factor Type Term Source REF']) - factor.comments = get_comments_row(df_dict['s_factors'][i].columns, row) - study.factors.append(factor) - - protocol_map = {} - for _, row in df_dict['s_protocols'][i].iterrows(): - protocol = Protocol() - protocol.name = row['Study Protocol Name'] - protocol.description = row['Study Protocol Description'] - protocol.uri = row['Study Protocol URI'] - protocol.version = row['Study Protocol Version'] - protocol.protocol_type = get_oa( - row['Study Protocol Type'], - row['Study Protocol Type Term Accession Number'], - row['Study Protocol Type Term Source REF']) - params = get_oa_list_from_semi_c_list( - row['Study Protocol Parameters Name'], - row['Study Protocol Parameters Name Term Accession Number'], - row['Study Protocol Parameters Name Term Source REF']) - for param in params: - protocol_param = ProtocolParameter(parameter_name=param) - protocol.parameters.append(protocol_param) - protocol.comments = get_comments_row(df_dict['s_protocols'][i].columns, row) - study.protocols.append(protocol) - protocol_map[protocol.name] = protocol - study.protocols = list(protocol_map.values()) - if skip_load_tables: - pass - else: - study_tfile_df = read_tfile(path.join(path.dirname(FP.name), study.filename)) - iosrs = investigation.ontology_source_references - sources, samples, _, __, processes, characteristic_categories, unit_categories = \ - ProcessSequenceFactory( - ontology_sources=iosrs, - study_protocols=study.protocols, - study_factors=study.factors - ).create_from_df(study_tfile_df) - study.sources = sorted(list(sources.values()), key=lambda x: x.name, reverse=False) - study.samples = sorted(list(samples.values()), key=lambda x: x.name, reverse=False) - study.process_sequence = list(processes.values()) - study.characteristic_categories = sorted( - list(characteristic_categories.values()), - key=lambda x: x.term, - reverse=False) - study.units = sorted(list(unit_categories.values()), key=lambda x: x.term, reverse=False) - - for process in study.process_sequence: - try: - process.executes_protocol = protocol_map[process.executes_protocol] - except KeyError: - try: - unknown_protocol = protocol_map['unknown'] - except KeyError: - description = "This protocol was auto-generated where a protocol could not be determined." - protocol_map['unknown'] = Protocol(name="unknown protocol", description=description) - unknown_protocol = protocol_map['unknown'] - study.protocols.append(unknown_protocol) - process.executes_protocol = unknown_protocol - - for _, row in df_dict['s_assays'][i].iterrows(): - assay_dict = { - "filename": row['Study Assay File Name'], - "measurement_type": get_oa( - row['Study Assay Measurement Type'], - row['Study Assay Measurement Type Term Accession Number'], - row['Study Assay Measurement Type Term Source REF'] - ), - "technology_type": get_oa( - row['Study Assay Technology Type'], - row['Study Assay Technology Type Term Accession Number'], - row['Study Assay Technology Type Term Source REF'] - ), - "technology_platform": row['Study Assay Technology Platform'], - "comments": get_comments_row(df_dict['s_assays'][i].columns, row) - } - assay = Assay(**assay_dict) - - if skip_load_tables: - pass - else: - iosrs = investigation.ontology_source_references - assay_tfile_df = read_tfile(path.join(path.dirname(FP.name), assay.filename)) - _, samples, other, data, processes, characteristic_categories, unit_categories = \ - ProcessSequenceFactory( - ontology_sources=iosrs, - study_samples=study.samples, - study_protocols=study.protocols, - study_factors=study.factors).create_from_df( - assay_tfile_df) - assay.samples = sorted( - list(samples.values()), key=lambda x: x.name, - reverse=False) - assay.other_material = sorted( - list(other.values()), key=lambda x: x.name, - reverse=False) - assay.data_files = sorted( - list(data.values()), key=lambda x: x.filename, - reverse=False) - assay.process_sequence = list(processes.values()) - assay.characteristic_categories = sorted( - list(characteristic_categories.values()), - key=lambda x: x.term, reverse=False) - assay.units = sorted( - list(unit_categories.values()), key=lambda x: x.term, - reverse=False) - - for process in assay.process_sequence: - try: - process.executes_protocol = protocol_map[process.executes_protocol] - except KeyError: - try: - unknown_protocol = protocol_map['unknown'] - except KeyError: - description = "This protocol was auto-generated where a protocol could not be determined." - protocol_map['unknown'] = Protocol(name="unknown protocol", description=description) - unknown_protocol = protocol_map['unknown'] - study.protocols.append(unknown_protocol) - process.executes_protocol = unknown_protocol - - study.assays.append(assay) - investigation.studies.append(study) - finally: - FP.close() - return investigation - - -def merge_study_with_assay_tables(study_file_path, assay_file_path, target_file_path): +def merge_study_with_assay_tables(study_file_path: str, assay_file_path: str, target_file_path: str): """ Utility function to merge a study table file with an assay table file. The merge uses the Sample Name as the @@ -1034,14 +700,15 @@ def merge_study_with_assay_tables(study_file_path, assay_file_path, target_file_ '/path/to/assay.txt', '/path/to/merged.txt') """ log.info("Reading study file %s into DataFrame", study_file_path) - study_DF = read_tfile(study_file_path) + study_dataframe = read_tfile(study_file_path) log.info("Reading assay file %s into DataFrame", assay_file_path) - assay_DF = read_tfile(assay_file_path) + assay_dataframe = read_tfile(assay_file_path) log.info("Merging DataFrames...") - merged_DF = merge(study_DF, assay_DF, on='Sample Name') + merged_dataframe = merge(study_dataframe, assay_dataframe, on='Sample Name') log.info("Writing merged DataFrame to file %s", target_file_path) + headers = study_dataframe.isatab_header + assay_dataframe.isatab_header[1:] with open(target_file_path, 'w', encoding='utf-8') as fp: - merged_DF.to_csv(fp, sep='\t', index=False, header=study_DF.isatab_header + assay_DF.isatab_header[1:]) + merged_dataframe.to_csv(fp, sep='\t', index=False, header=headers) def load_table(fp): @@ -1081,3 +748,35 @@ def load_table(fp): new_labels.append(label) df.columns = new_labels return df + + +def read_tfile(tfile_path: str, index_col=None, factor_filter=None) -> IsaTabDataFrame: + """Read a table file into a DataFrame + + :param tfile_path: Path to a table file to load + :param index_col: The column to use as study_index + :param factor_filter: Factor filter tuple, e.g. ('Gender', 'Male') will + filter on FactorValue[Gender] == Male + :return: A table file DataFrame + """ + with utf8_text_file_open(tfile_path) as tfile_fp: + tfile_fp.seek(0) + tfile_fp = strip_comments(tfile_fp) + csv = read_csv(tfile_fp, dtype=str, sep='\t', index_col=index_col, encoding='utf-8').fillna('') + tfile_df = IsaTabDataFrame(csv) + if factor_filter: + log.debug("Filtering DataFrame contents on Factor Value %s", factor_filter) + return tfile_df[tfile_df['Factor Value[{}]'.format(factor_filter[0])] == factor_filter[1]] + return tfile_df + + +def read_investigation_file(fp): + """Reads an investigation file into a dictionary of DataFrames, each + DataFrame being each section of the investigation file. e.g. One DataFrame + for the INVESTIGATION PUBLICATIONS section + + :param fp: A file-like buffer object of the investigation file + :return: A dictionary holding a set of DataFrames for each section of the + investigation file. See below implementation for detail + """ + return ISATabReader(fp).run() \ No newline at end of file diff --git a/isatools/isatab/load/read.py b/isatools/isatab/load/read.py deleted file mode 100644 index b454b82c..00000000 --- a/isatools/isatab/load/read.py +++ /dev/null @@ -1,175 +0,0 @@ -from __future__ import annotations -from io import StringIO - -from pandas import read_csv -from numpy import nan - -from isatools.utils import utf8_text_file_open -from isatools.isatab.defaults import log -from isatools.isatab.utils import strip_comments, IsaTabDataFrame - - -def read_investigation_file(fp): - """Reads an investigation file into a dictionary of DataFrames, each - DataFrame being each section of the investigation file. e.g. One DataFrame - for the INVESTIGATION PUBLICATIONS section - - :param fp: A file-like buffer object of the investigation file - :return: A dictionary holding a set of DataFrames for each section of the - investigation file. See below implementation for detail - """ - - def _peek(f): - """Peek at the next line without moving to the next line. This function - get the position of the next line, reads the next line, then resets the - file pointer to the original position - - :param f: A file-like buffer object - :return: The next line past the current line - """ - position = f.tell() - line = f.readline() - f.seek(position) - return line - - def _read_tab_section(f, sec_key, next_sec_key=None): - """Slices a file by section delimited by section keys - - :param f: A file-like buffer object - :param sec_key: Delimiter key of beginning of section - :param next_sec_key: Delimiter key of end of section - :return: A memory file of the section slice, as a string buffer object - """ - fileline = f.readline() - normed_line = fileline.rstrip() - if normed_line[0] == '"': - normed_line = normed_line[1:] - if normed_line[len(normed_line) - 1] == '"': - normed_line = normed_line[:len(normed_line) - 1] - if not normed_line == sec_key: - raise IOError("Expected: " + sec_key + " section, but got: " - + normed_line) - memf = StringIO() - while not _peek(f=f).rstrip() == next_sec_key: - fileline = f.readline() - if not fileline: - break - memf.write(fileline.rstrip() + '\n') - memf.seek(0) - return memf - - def _build_section_df(f: StringIO): - """Reads a file section into a DataFrame - - :param f: A file-like buffer object - :return: A DataFrame corresponding to the file section - """ - df = read_csv(f, names=range(0, 128), sep='\t', engine='python', - encoding='utf-8').dropna(axis=1, how='all') - df = df.T - df.replace(nan, '', regex=True, inplace=True) - # Strip out the nan entries - df.reset_index(inplace=True) - # Reset study_index so it is accessible as column - df.columns = df.iloc[0] - # If all was OK, promote this row to the column headers - df = df.reindex(df.index.drop(0)) - # Reindex the DataFrame - return df - - memory_file = StringIO() - line = True - while line: - line = fp.readline() - if not line.lstrip().startswith('#'): - memory_file.write(line) - memory_file.seek(0) - - df_dict = dict() - - # Read in investigation file into DataFrames first - df_dict['ontology_sources'] = _build_section_df(_read_tab_section( - f=memory_file, - sec_key='ONTOLOGY SOURCE REFERENCE', - next_sec_key='INVESTIGATION' - )) - df_dict['investigation'] = _build_section_df(_read_tab_section( - f=memory_file, - sec_key='INVESTIGATION', - next_sec_key='INVESTIGATION PUBLICATIONS' - )) - df_dict['i_publications'] = _build_section_df(_read_tab_section( - f=memory_file, - sec_key='INVESTIGATION PUBLICATIONS', - next_sec_key='INVESTIGATION CONTACTS' - )) - df_dict['i_contacts'] = _build_section_df(_read_tab_section( - f=memory_file, - sec_key='INVESTIGATION CONTACTS', - next_sec_key='STUDY' - )) - df_dict['studies'] = list() - df_dict['s_design_descriptors'] = list() - df_dict['s_publications'] = list() - df_dict['s_factors'] = list() - df_dict['s_assays'] = list() - df_dict['s_protocols'] = list() - df_dict['s_contacts'] = list() - while _peek(memory_file): # Iterate through STUDY blocks until end of file - df_dict['studies'].append(_build_section_df(_read_tab_section( - f=memory_file, - sec_key='STUDY', - next_sec_key='STUDY DESIGN DESCRIPTORS' - ))) - df_dict['s_design_descriptors'].append( - _build_section_df(_read_tab_section( - f=memory_file, - sec_key='STUDY DESIGN DESCRIPTORS', - next_sec_key='STUDY PUBLICATIONS' - ))) - df_dict['s_publications'].append(_build_section_df(_read_tab_section( - f=memory_file, - sec_key='STUDY PUBLICATIONS', - next_sec_key='STUDY FACTORS' - ))) - df_dict['s_factors'].append(_build_section_df(_read_tab_section( - f=memory_file, - sec_key='STUDY FACTORS', - next_sec_key='STUDY ASSAYS' - ))) - df_dict['s_assays'].append(_build_section_df(_read_tab_section( - f=memory_file, - sec_key='STUDY ASSAYS', - next_sec_key='STUDY PROTOCOLS' - ))) - df_dict['s_protocols'].append(_build_section_df(_read_tab_section( - f=memory_file, - sec_key='STUDY PROTOCOLS', - next_sec_key='STUDY CONTACTS' - ))) - df_dict['s_contacts'].append(_build_section_df(_read_tab_section( - f=memory_file, - sec_key='STUDY CONTACTS', - next_sec_key='STUDY' - ))) - return df_dict - - -def read_tfile(tfile_path, index_col=None, factor_filter=None) -> IsaTabDataFrame: - """Read a table file into a DataFrame - - :param tfile_path: Path to a table file to load - :param index_col: The column to use as study_index - :param factor_filter: Factor filter tuple, e.g. ('Gender', 'Male') will - filter on FactorValue[Gender] == Male - :return: A table file DataFrame - """ - with utf8_text_file_open(tfile_path) as tfile_fp: - tfile_fp.seek(0) - tfile_fp = strip_comments(tfile_fp) - csv = read_csv(tfile_fp, dtype=str, sep='\t', index_col=index_col, encoding='utf-8').fillna('') - tfile_df = IsaTabDataFrame(csv) - if factor_filter: - log.debug("Filtering DataFrame contents on Factor Value %s", factor_filter) - return tfile_df[tfile_df['Factor Value[{}]'.format(factor_filter[0])] == factor_filter[1]] - return tfile_df