From 4ca441d33aaa87d5404075412985952b8195a084 Mon Sep 17 00:00:00 2001 From: kedhammar Date: Mon, 16 Sep 2024 15:43:36 +0200 Subject: [PATCH 1/2] add methods for finding and copying LIMS-generated manifests --- taca/element/Element_Runs.py | 90 ++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 9 deletions(-) diff --git a/taca/element/Element_Runs.py b/taca/element/Element_Runs.py index 78f6763e..0a7cab61 100644 --- a/taca/element/Element_Runs.py +++ b/taca/element/Element_Runs.py @@ -1,8 +1,11 @@ import json import logging import os +import re import shutil +import zipfile from datetime import datetime +from glob import glob from taca.utils import misc from taca.utils.filesystem import chdir @@ -28,12 +31,6 @@ def __init__(self, run_dir, configuration): self.demux_dir, "RunStats.json", # Assumes demux is finished when this file is created ) - self.run_manifest_file = os.path.join(self.run_dir, "RunManifest.csv") - self.run_manifest_zip_file = os.path.join( - self.CONFIG.get("Aviti").get("manifest_zip_location"), - self.flowcell_id + ".tar.gz", - ) # TODO: change and add to taca.yaml - # TODO, need to be real careful when using the flowcell_id as it is manually entered and can mean three different things # Instrument generated files self.run_parameters_file = os.path.join(self.run_dir, "RunParameters.json") @@ -47,6 +44,10 @@ def __init__(self, run_dir, configuration): # Fields to be set by TACA self.status = None + self.lims_step_id = None + self.lims_full_manifest = None + self.lims_start_manifest = None + self.lims_demux_manifests = None # Fields that will be set when parsing run parameters self.run_name = None @@ -166,9 +167,79 @@ def update_statusdb(self): def manifest_exists(self): return os.path.isfile(self.run_manifest_zip_file) - def copy_manifests(self): - shutil.copy(self.run_manifest_zip_file, self.run_dir) - # TODO: unzip + def get_lims_step_id(self) -> str | None: + """If the run was started using a LIMS-generated manifest, + the ID of the LIMS step can be extracted from it. + """ + assert self.manifest_exists(), "Run manifest not found" + with open(self.run_manifest_file_from_instrument) as csv_file: + manifest_lines = csv_file.readlines() + for line in manifest_lines: + if "lims_step_id" in line: + lims_step_id = line.split(",")[1] + return lims_step_id + return None + + def copy_manifests(self) -> bool: + """Fetch the LIMS-generated run manifests from ngi-nas-ns and unzip them into a run subdir.""" + + # Specify dir in which LIMS drop the manifest zip files + dir_to_search = os.path.join( + self.CONFIG.get("Aviti").get( + "manifest_zip_location" + ), # TODO: change and add to taca.yaml + datetime.now().year, + ) + + # Use LIMS step ID if available, else flowcell ID, to make a query pattern + if self.lims_step_id: + logging.info( + f"Using LIMS step ID '{self.lims_step_id}' to find LIMS run manifests." + ) + glob_pattern = f"{dir_to_search}/*{self.lims_step_id}*.zip" + else: + logging.warning( + "LIMS step ID not available, using flowcell ID to find LIMS run manifests." + ) + glob_pattern = f"{dir_to_search}/*{self.flowcell_id}*.zip" + + # Find paths matching the pattern + glob_results = glob(glob_pattern) + if len(glob_results) == 0: + logger.warning( + f"No manifest found for run '{self.run_dir}' with pattern '{glob_pattern}'." + ) + return False # TODO determine whether to raise an error here instead + elif len(glob_results) > 1: + logger.warning( + f"Multiple manifests found for run '{self.run_dir}' with pattern '{glob_pattern}', using latest one." + ) + glob_results.sort() + zip_src_path = glob_results[-1] + else: + zip_src_path = glob_results[0] + + # Make a run subdir named after the zip file and extract manifests there + zip_name = os.path.basename(zip_src_path) + zip_dst_path = os.path.join(self.run_dir, zip_name) + os.mkdir(zip_dst_path) + + with zipfile.ZipFile(zip_src_path, "r") as zip_ref: + zip_ref.extractall(zip_dst_path) + + # Set the paths of the different manifests as attributes + manifests = os.listdir(zip_dst_path) + self.lims_full_manifest = [ + m for m in manifests if re.match(r".*_untrimmed\.csv$", m) + ][0] + self.lims_start_manifest = [ + m for m in manifests if re.match(r".*_trimmed\.csv$", m) + ][0] + self.lims_demux_manifests = [ + m for m in manifests if re.match(r".*_\d+\.csv$", m) + ] + + return True def generate_demux_command(self, run_manifest, demux_dir): command = [ @@ -184,6 +255,7 @@ def generate_demux_command(self, run_manifest, demux_dir): def start_demux(self, run_manifest, demux_dir): with chdir(self.run_dir): cmd = self.generate_demux_command(run_manifest, demux_dir) + # TODO handle multiple composite manifests for demux misc.call_external_command_detached( cmd, with_log_files=True, prefix="demux_" ) From c840ea8f749513b4d0fc452ecbff4ebc548cf52c Mon Sep 17 00:00:00 2001 From: kedhammar Date: Mon, 16 Sep 2024 16:48:43 +0200 Subject: [PATCH 2/2] add very rushed function for deriving new composite demux manifests --- taca/element/Element_Runs.py | 115 +++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/taca/element/Element_Runs.py b/taca/element/Element_Runs.py index c0c2c0c2..a338d691 100644 --- a/taca/element/Element_Runs.py +++ b/taca/element/Element_Runs.py @@ -7,6 +7,8 @@ from datetime import datetime from glob import glob +import pandas as pd + from taca.utils import misc from taca.utils.filesystem import chdir from taca.utils.statusdb import ElementRunsConnection @@ -174,6 +176,9 @@ def get_lims_step_id(self) -> str | None: """If the run was started using a LIMS-generated manifest, the ID of the LIMS step can be extracted from it. """ + + # TODO test me + assert self.manifest_exists(), "Run manifest not found" with open(self.run_manifest_file_from_instrument) as csv_file: manifest_lines = csv_file.readlines() @@ -186,6 +191,8 @@ def get_lims_step_id(self) -> str | None: def copy_manifests(self) -> bool: """Fetch the LIMS-generated run manifests from ngi-nas-ns and unzip them into a run subdir.""" + # TODO test me + # Specify dir in which LIMS drop the manifest zip files dir_to_search = os.path.join( self.CONFIG.get("Aviti").get( @@ -244,6 +251,114 @@ def copy_manifests(self) -> bool: return True + def make_demux_manifests( + self, manifest_to_split: os.PathLike, outdir: os.PathLike | None = None + ) -> list[os.PathLike]: + """Derive composite demultiplexing manifests (grouped by index duplicity and lengths) + from a single information-rich manifest. + """ + + # TODO test me + + # Read specified manifest + with open(manifest_to_split) as f: + manifest_contents = f.read() + + # Get '[SAMPLES]' section + split_contents = "[SAMPLES]".split(manifest_contents) + assert ( + len(split_contents) == 2 + ), f"Could not split sample rows out of manifest {manifest_contents}" + sample_section = split_contents[1].split("\n") + + # Split into header and rows + header = sample_section[0] + sample_rows = sample_section[1:] + + # Convert to list of dicts + sample_dicts = [] + for row in sample_rows: + row_dict = dict(zip(header.split(","), row.split(","))) + sample_dicts.append(row_dict) + + # Convert to dataframe + df = pd.DataFrame.from_dict(sample_dicts) + + # Separate samples from controls + df_samples = df[df["Project"] != "Control"].copy() + df_controls = df[df["Project"] == "Control"].copy() + + # Apply default dir path for output + if outdir is None: + outdir = self.run_dir + + ## Build composite manifests + + manifest_root_name = f"{self.NGI_run_id}_demux" + + # Get idx lengths for calculations + df_samples.loc[:, "len_idx1"] = df["Index1"].apply(len) + df_samples.loc[:, "len_idx2"] = df["Index2"].apply(len) + + # Break down by index lengths and lane, creating composite manifests + manifests = [] + n = 0 + for (len_idx1, len_idx2, lane), group in df_samples.groupby( + ["len_idx1", "len_idx2", "Lane"] + ): + file_name = f"{manifest_root_name}_{n}.csv" + runValues_section = "\n".join( + [ + "[RUNVALUES]", + "KeyName, Value", + f'manifest_file, "{file_name}"', + f"manifest_group, {n+1}/{len(df.groupby(['len_idx1', 'len_idx2', 'Lane']))}", + f"grouped_by, len_idx1:{len_idx1} len_idx2:{len_idx2} lane:{lane}", + ] + ) + + settings_section = "\n".join( + [ + "[SETTINGS]", + "SettingName, Value", + ] + ) + + # Add PhiX stratified by index length + if group["phix_loaded"].any(): + # Subset controls by lane + group_controls = df_controls[df_controls["Lane"] == lane].copy() + + # Trim PhiX indexes to match group + group_controls.loc[:, "Index1"] = group_controls.loc[:, "Index1"].apply( + lambda x: x[:len_idx1] + ) + group_controls.loc[:, "Index2"] = group_controls.loc[:, "Index2"].apply( + lambda x: x[:len_idx2] + ) + + # Add PhiX to group + group = pd.concat([group, group_controls], axis=0, ignore_index=True) + + samples_section = ( + f"[SAMPLES]\n{group.iloc[:, 0:6].to_csv(index=None, header=True)}" + ) + + manifest_contents = "\n\n".join( + [runValues_section, settings_section, samples_section] + ) + + file_path = os.path.join(outdir, file_name) + manifests.append((file_path, manifest_contents)) + n += 1 + + for manifest_path, manifest_contents in manifests: + with open(os.path.join(outdir, manifest_path), "w") as f: + f.write(manifest_contents) + + manifest_paths = [t[0] for t in manifests] + return manifest_paths + def generate_demux_command(self, run_manifest, demux_dir): command = [ self.CONFIG.get(self.software)[