diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 796444ad..0550f01b 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -12,7 +12,12 @@ "features": {}, "customizations": { "vscode": { - "extensions": ["ms-python.python", "eamodio.gitlens"] + "extensions": [ + "ms-python.python", + "eamodio.gitlens", + "charliermarsh.ruff", + "ms-python.mypy-type-checker" + ] } }, // Features to add to the dev container. More info: https://containers.dev/features. diff --git a/.github/workflows/lint-code.yml b/.github/workflows/lint-code.yml index 59536920..bbb74445 100644 --- a/.github/workflows/lint-code.yml +++ b/.github/workflows/lint-code.yml @@ -11,7 +11,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.11.5" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -29,7 +29,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.11.5" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -46,7 +46,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.11.5" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -67,7 +67,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.11.5" - name: Install dependencies run: | diff --git a/.github/workflows/test-code.yml b/.github/workflows/test-code.yml index 1ff360e3..7ac1ed0d 100644 --- a/.github/workflows/test-code.yml +++ b/.github/workflows/test-code.yml @@ -12,7 +12,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.11.5" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -21,8 +21,8 @@ jobs: - name: Install TACA run: pip install -e . - name: pytest - # Options are configured in pyproject.toml - run: pytest --cov=genologics --cov-report=xml + # Default options are configured in pyproject.toml + run: pytest --cov=./taca --cov-report=xml --cov-report term-missing -vv - name: CodeCov uses: codecov/codecov-action@v4 with: diff --git a/Dockerfile b/Dockerfile index db69561c..5f9e41f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.10 AS base +FROM python:3.11.5 AS base # Update pip to latest version RUN python -m pip install --upgrade pip @@ -23,7 +23,6 @@ COPY requirements-dev.txt requirements-dev.txt RUN python -m pip install -r requirements-dev.txt RUN mkdir /root/.taca/ -COPY tests/data/taca_test_cfg.yaml /root/.taca/taca.yaml FROM base AS testing COPY . /taca diff --git a/VERSIONLOG.md b/VERSIONLOG.md index 80739235..bda399a1 100644 --- a/VERSIONLOG.md +++ b/VERSIONLOG.md @@ -1,4 +1,7 @@ # TACA Version Log +## 20241025.1 + +Add support for processing Element Aviti data ## 20241016.1 diff --git a/pyproject.toml b/pyproject.toml index cf0d04c8..0fc1fcb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,3 @@ -title = "taca" - # === LINTING ================================================================ [tool.ruff] @@ -28,6 +26,7 @@ ignore = [ [tool.mypy] ignore_missing_imports = true follow_imports = 'skip' +exclude = "build" # === Testing ================================================================ @@ -37,7 +36,8 @@ filterwarnings = [ 'ignore::DeprecationWarning:couchdb.*', 'ignore::DeprecationWarning:pkg_resources.*', ] -addopts = "--cov=./taca --cov-report term-missing -vv --cache-clear tests/" +# Default addopts +addopts = "--ignore tests_old/" [tool.coverage.run] # The comment "# pragma: no cover" can be used to exclude a line from coverage diff --git a/requirements-dev.txt b/requirements-dev.txt index 0ef1b795..8126d039 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,13 +1,13 @@ -r requirements.txt - -nose +dirhash +ipdb +ipython mock -sphinx -sphinx-rtd-theme +mypy +nose +pipreqs pytest pytest-cov -ipython -ipdb ruff -mypy -pipreqs +sphinx +sphinx-rtd-theme diff --git a/setup.py b/setup.py index e278a522..da2b5026 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ keywords="bioinformatics", author="NGI-stockholm", author_email="ngi_pipeline_operators@scilifelab.se", + python_requires=">=3.11.5", url="http://taca.readthedocs.org/en/latest/", license="MIT", packages=find_packages(exclude=["ez_setup", "examples", "tests"]), diff --git a/taca/__init__.py b/taca/__init__.py index b85b2cf5..c516d006 100644 --- a/taca/__init__.py +++ b/taca/__init__.py @@ -1,3 +1,3 @@ """Main TACA module""" -__version__ = "1.0.0" +__version__ = "1.1.0" diff --git a/taca/analysis/analysis_element.py b/taca/analysis/analysis_element.py new file mode 100755 index 00000000..9f71efb5 --- /dev/null +++ b/taca/analysis/analysis_element.py @@ -0,0 +1,163 @@ +"""Analysis methods for sequencing runs produced by Element instruments.""" + +import glob +import logging +import os + +from taca.element.Aviti_Runs import Aviti_Run +from taca.utils.config import CONFIG +from taca.utils.misc import send_mail + +logger = logging.getLogger(__name__) + + +def run_preprocessing(given_run): + """Run demultiplexing in all data directories. + + :param str given_run: Process a particular run instead of looking for runs + """ + + def _process(run): + """Process a run/flowcell and transfer to analysis server. + + :param taca.element.Run run: Run to be processed and transferred + """ + try: + run.parse_run_parameters() + except FileNotFoundError: + logger.warning( + f"Cannot reliably set NGI_run_id for {run} due to missing RunParameters.json. Aborting run processing" + ) + email_subject = f"Issues processing {run}" + email_message = ( + f"RunParameters.json missing for {run}. Processing was aborted." + ) + send_mail(email_subject, email_message, CONFIG["mail"]["recipients"]) + raise + + #### Sequencing status #### + sequencing_done = run.check_sequencing_status() + if not sequencing_done: + run.status = "sequencing" + if run.status_changed(): + run.update_statusdb() + return + + #### Demultiplexing status #### + demultiplexing_status = run.get_demultiplexing_status() + if demultiplexing_status == "not started": + lims_zip_path = run.find_lims_zip() + if lims_zip_path is not None: + os.mkdir(run.demux_dir) + run.copy_manifests(lims_zip_path) + demux_manifests = run.make_demux_manifests( + manifest_to_split=run.lims_manifest + ) + sub_demux_count = 0 + for demux_manifest in sorted(demux_manifests): + sub_demux_dir = os.path.join( + run.run_dir, f"Demultiplexing_{sub_demux_count}" + ) + os.mkdir(sub_demux_dir) + run.start_demux(demux_manifest, sub_demux_dir) + sub_demux_count += 1 + run.status = "demultiplexing" + if run.status_changed(): + run.update_statusdb() + return + else: + logger.warning( + f"Run manifest is missing for {run}, demultiplexing aborted" + ) + email_subject = f"Issues processing {run}" + email_message = ( + f"Run manifest is missing for {run}, demultiplexing aborted" + ) + send_mail(email_subject, email_message, CONFIG["mail"]["recipients"]) + return + elif demultiplexing_status == "ongoing": + run.status = "demultiplexing" + if run.status_changed(): + run.update_statusdb() + return + + elif demultiplexing_status != "finished": + logger.warning( + f"Unknown demultiplexing status {demultiplexing_status} of run {run}. Please investigate." + ) + email_subject = f"Issues processing {run}" + email_message = f"Unknown demultiplexing status {demultiplexing_status} of run {run}. Please investigate." + send_mail(email_subject, email_message, CONFIG["mail"]["recipients"]) + return + + #### Transfer status #### + transfer_status = run.get_transfer_status() + if transfer_status == "not started": + demux_results_dirs = glob.glob( + os.path.join(run.run_dir, "Demultiplexing_*") + ) + run.aggregate_demux_results(demux_results_dirs) + run.sync_metadata() + run.make_transfer_indicator() + run.status = "transferring" + if run.status_changed(): + run.update_statusdb() + # TODO: Also update statusdb with a timestamp of when the transfer started + run.transfer() + return + elif transfer_status == "ongoing": + run.status = "transferring" + if run.status_changed(): + run.update_statusdb() + logger.info( + f"{run} is being transferred. Skipping." + ) # TODO: fix formatting, currently prints "ElementRun(20240910_AV242106_B2403418431) is being transferred" + return + elif transfer_status == "rsync done": + if run.rsync_successful(): + run.remove_transfer_indicator() + run.update_transfer_log() + run.status = "transferred" + if run.status_changed(): + run.update_statusdb() + run.move_to_nosync() + run.status = "processed" + + if run.status_changed(): + run.update_statusdb() + else: + run.status = "transfer failed" + logger.warning( + f"An issue occurred while transfering {run} to the analysis cluster." + ) + email_subject = f"Issues processing {run}" + email_message = f"An issue occurred while transfering {run} to the analysis cluster." + send_mail(email_subject, email_message, CONFIG["mail"]["recipients"]) + return + else: + logger.warning( + f"Unknown transfer status {transfer_status} of run {run}, please investigate." + ) + email_subject = f"Issues processing {run}" + email_message = f"Unknown transfer status {transfer_status} of run {run}, please investigate." + send_mail(email_subject, email_message, CONFIG["mail"]["recipients"]) + return + + if given_run: + run = Aviti_Run(given_run, CONFIG) + _process(run) + else: + data_dirs = CONFIG.get("element_analysis").get("data_dirs") + for data_dir in data_dirs: + # Run folder looks like DATE_*_*, the last section is the FC side (A/B) and name + runs = glob.glob(os.path.join(data_dir, "[1-9]*_*_*")) + for run in runs: + runObj = Aviti_Run(run, CONFIG) + try: + _process(runObj) + except: + # This function might throw and exception, + # it is better to continue processing other runs + logger.warning(f"There was an error processing the run {run}") + # TODO: Think about how to avoid silent errors (email?) + pass diff --git a/taca/analysis/cli.py b/taca/analysis/cli.py index 342a8b1c..131e78e0 100644 --- a/taca/analysis/cli.py +++ b/taca/analysis/cli.py @@ -3,7 +3,7 @@ import click from taca.analysis import analysis as an -from taca.analysis import analysis_nanopore +from taca.analysis import analysis_element, analysis_nanopore @click.group() @@ -72,6 +72,29 @@ def updatedb(rundir, software): an.upload_to_statusdb(rundir, software) +# Element analysis subcommands + + +@analysis.command() +@click.option( + "-r", + "--run", + type=click.Path(exists=True), + default=None, + help="Demultiplex only a particular run", +) +def demultiplex_element(run): + """Demultiplex and transfer all runs present in the data directories.""" + analysis_element.run_preprocessing(run) + + +@analysis.command() +@click.argument("run") +def element_updatedb(run): + """Save the run to statusdb.""" + analysis_element.upload_to_statusdb(run) + + # Nanopore analysis subcommands diff --git a/taca/element/Aviti_Runs.py b/taca/element/Aviti_Runs.py new file mode 100644 index 00000000..63b01bf7 --- /dev/null +++ b/taca/element/Aviti_Runs.py @@ -0,0 +1,8 @@ +from taca.element.Element_Runs import Run + + +class Aviti_Run(Run): + def __init__(self, run_dir, configuration): + self.sequencer_type = "Aviti" + self.demux_dir = "Demultiplexing" + super().__init__(run_dir, configuration) diff --git a/taca/element/Element_Runs.py b/taca/element/Element_Runs.py new file mode 100644 index 00000000..3fa697a0 --- /dev/null +++ b/taca/element/Element_Runs.py @@ -0,0 +1,1302 @@ +import csv +import glob +import json +import logging +import os +import re +import shutil +import subprocess +import zipfile +from datetime import datetime +from pathlib import Path + +import pandas as pd + +from taca.utils.filesystem import chdir +from taca.utils.statusdb import ElementRunsConnection + +logger = logging.getLogger(__name__) + + +def get_mask( + seq: str, + keep_Ns: bool, + prefix: str, + cycles_used: int, +) -> str: + """ + Inputs: + seq Sequence string to make mask from + keep_Ns Whether Ns should be "Y" or "N" in the mask, vice versa for ACGT + prefix Prefix to add to the mask + cycles_used Number of cycles used in the sequencing run + + Example usage: + get_mask( "ACGTNNN", True, "I1:", 7 ) -> 'I1:N4Y3' + get_mask( "ACGTNNN", False, "I2:", 10 ) -> 'I2:Y4N6' + """ + + # Input assertions + if seq != "": + assert re.match(r"^[ACGTN]+$", seq), f"Index '{seq}' has non-ACGTN characters" + assert prefix in [ + "R1:", + "R2:", + "I1:", + "I2:", + ], f"Mask prefix {prefix} not recognized" + + # Handle no-input cases + if seq == "": + mask = f"{prefix}N{cycles_used}" + return mask + + # Define dict to convert base to mask classifier + base2mask = ( + { + "N": "N", + "A": "Y", + "C": "Y", + "G": "Y", + "T": "Y", + } + if keep_Ns is False + else { + "N": "Y", + "A": "N", + "C": "N", + "G": "N", + "T": "N", + } + ) + + # Instantiate the mask string + mask = "" + # Add the prefix + mask += prefix + # Loop through the input sequence and dynamically add mask groups + current_group = "" + current_group_len = 0 + mask_len = 0 + for base in seq: + mask_len += 1 + if base2mask[base] == current_group: + current_group_len += 1 + else: + mask += ( + f"{current_group}{current_group_len}" if current_group_len > 0 else "" + ) + current_group = base2mask[base] + current_group_len = 1 + # For the last mask group, check if we need to pad with Ns to match the number of cycles used + if cycles_used > mask_len: + diff = cycles_used - mask_len + if current_group == "N": + current_group_len += diff + mask += f"{current_group}{current_group_len}" + else: + mask += f"{current_group}{current_group_len}" + mask += f"N{diff}" + else: + mask += f"{current_group}{current_group_len}" + + # Parse mask string to check that it matches the number of cycles used + assert ( + sum( + [ + int(n) + for n in mask[3:] + .replace("N", "-") + .replace("Y", "-") + .strip("-") + .split("-") + ] + ) + == cycles_used + ), f"Length of mask '{mask}' does not match number of cycles used '{cycles_used}'." + + return mask + + +class Run: + """Defines an Element run""" + + def __init__(self, run_dir, configuration): + if not hasattr(self, "sequencer_type"): + # Mostly for testing, since this class is not meant to be instantiated + self.sequencer_type = "GenericElement" + + if not os.path.exists(run_dir): + raise RuntimeError(f"Could not locate run directory {run_dir}") + self.run_parameters_parsed = False + + self.run_dir = os.path.abspath(run_dir) + self.CONFIG = configuration + + self.demux_dir = os.path.join(self.run_dir, "Demultiplexing") + self.final_sequencing_file = os.path.join(self.run_dir, "RunUploaded.json") + self.demux_stats_file = ( + "*RunStats.json" # Assumes demux is finished when this file is created + ) + self.transfer_file = ( + self.CONFIG.get("element_analysis") + .get("Element", {}) + .get(self.sequencer_type, {}) + .get("transfer_log") + ) + self.rsync_exit_file = os.path.join(self.run_dir, ".rsync_exit_status") + + # Instrument generated files + self.run_parameters_file = os.path.join(self.run_dir, "RunParameters.json") + self.run_stats_file = os.path.join(self.run_dir, "AvitiRunStats.json") + self.run_manifest_file_from_instrument = os.path.join( + self.run_dir, "RunManifest.json" + ) + self.run_uploaded_file = os.path.join(self.run_dir, "RunUploaded.json") + + self.db = ElementRunsConnection( + self.CONFIG.get("statusdb", {}), dbname="element_runs" + ) + + # Fields to be set by TACA + self.status = None + self.lims_step_id = None + self.lims_full_manifest = None + self.lims_start_manifest = None + self.lims_demux_manifests = None + + # Fields that will be set when parsing run parameters + self.run_name = None + self.run_id = None + self.side = None + self.side_letter = None + self.run_type = None + self.flowcell_id = None + self.instrument_name = None + self.date = None + self.operator_name = None + + def __str__(self) -> str: + if self.run_parameters_parsed: + return f"ElementRun({self.NGI_run_id})" + else: + return f"ElementRun({self.run_dir})" + + @property + def NGI_run_id(self): + if self.run_parameters_parsed: + return f"{self.date}_{self.instrument_name}_{self.side_letter}{self.flowcell_id}" + else: + raise RuntimeError(f"Run parameters not parsed for run {self.run_dir}") + + def parse_run_parameters(self) -> None: + """Parse run-information from the RunParameters.json file""" + try: + with open(self.run_parameters_file) as json_file: + run_parameters = json.load(json_file) + except FileNotFoundError: + logger.warning( + f"Run parameters file not found for {self}, might not be ready yet" + ) + raise + + # Manually entered, but should be side and flowcell id + self.run_name = run_parameters.get("RunName") + + self.run_id = run_parameters.get( + "RunID" + ) # Unique hash that we don't really use + self.side = run_parameters.get("Side") # SideA or SideB + self.side_letter = self.side[ + -1 + ] # A or B TODO: compare side letter with manually entered letter in run name + self.run_type = run_parameters.get( + "RunType" + ) # Sequencing, wash or prime I believe? + self.flowcell_id = run_parameters.get("FlowcellID") + self.cycles = run_parameters.get("Cycles", {"R1": 0, "R2": 0, "I1": 0, "I2": 0}) + self.instrument_name = run_parameters.get("InstrumentName") + self.date = run_parameters.get("Date")[0:10].replace("-", "") + self.year = self.date[0:4] + self.operator_name = run_parameters.get("OperatorName") + self.run_parameters_parsed = True + + def to_doc_obj(self): + # TODO: are we sure what we should do when the RunParameters.json file is missing? + + # Read in all instrument generated files + instrument_generated_files = {} + for file in [ + self.run_parameters_file, + self.run_stats_file, + self.run_manifest_file_from_instrument, + self.run_uploaded_file, + ]: + if os.path.exists(file): + with open(file) as json_file: + instrument_generated_files[os.path.basename(file)] = json.load( + json_file + ) + else: + instrument_generated_files[os.path.basename(file)] = None + # Aggregated demux stats files + index_assignement_file = os.path.join( + self.run_dir, "Demultiplexing", "IndexAssignment.csv" + ) + if os.path.exists(index_assignement_file): + with open(index_assignement_file) as index_file: + reader = csv.DictReader(index_file) + index_assignments = [row for row in reader] + else: + index_assignments = None + + unassigned_sequences_file = os.path.join( + self.run_dir, "Demultiplexing", "UnassignedSequences.csv" + ) + if os.path.exists(unassigned_sequences_file): + with open(unassigned_sequences_file) as unassigned_file: + reader = csv.DictReader(unassigned_file) + unassigned_sequences = [row for row in reader] + else: + unassigned_sequences = None + + demultiplex_stats = { + "Demultiplex_Stats": { + "Index_Assignment": index_assignments, + "Unassigned_Sequences": unassigned_sequences, + } + } + + demux_command_file = os.path.join(self.run_dir, ".bases2fastq_command") + if os.path.exists(demux_command_file): + with open(demux_command_file) as command_file: + demux_commands = command_file.readlines() + else: + demux_commands = None + demux_version_file = os.path.join( + self.run_dir, "Demultiplexing_0", "RunStats.json" + ) + if os.path.exists(demux_version_file): + with open(demux_version_file) as json_file: + demux_info = json.load(json_file) + demux_version = demux_info.get("AnalysisVersion") + else: + demux_version = None + + software_info = { + "Version": demux_version, + "bin": self.CONFIG.get("element_analysis").get("bases2fastq"), + "options": demux_commands, + } + + doc_obj = { + "name": self.NGI_run_id, + "run_path": self.run_dir, + "run_status": self.status, + "NGI_run_id": self.NGI_run_id, + "instrument_generated_files": instrument_generated_files, + "Element": demultiplex_stats, + "Software": software_info, + } + + return doc_obj + + def check_sequencing_status(self): + if os.path.exists(self.final_sequencing_file): + with open(self.final_sequencing_file) as json_file: + sequencing_outcome = json.load(json_file).get("outcome") + if sequencing_outcome != "OutcomeCompleted": + return False + else: + return True + else: + return False + + def get_demultiplexing_status(self): + if not os.path.exists(self.demux_dir): + return "not started" + sub_demux_dirs = glob.glob(os.path.join(self.run_dir, "Demultiplexing_*")) + finished_count = 0 + for demux_dir in sub_demux_dirs: + found_demux_stats_file = glob.glob( + os.path.join(demux_dir, self.demux_stats_file) + ) + if not found_demux_stats_file: + return "ongoing" + elif found_demux_stats_file: + finished_count += ( + 1 # TODO: check exit status of demux in exit status file + ) + if finished_count == len(sub_demux_dirs): + return "finished" + else: + return "unknown" + + def status_changed(self): + if not self.run_parameters_parsed: + raise RuntimeError( + f"Run parameters not parsed for run {self.run_dir}, cannot check status" + ) + db_run_status = self.db.check_db_run_status(self.NGI_run_id) + return db_run_status != self.status + + def update_statusdb(self): + doc_obj = self.to_doc_obj() + self.db.upload_to_statusdb(doc_obj) + + def get_lims_step_id(self) -> str | None: + """If the run was started using a LIMS-generated manifest, + the ID of the LIMS step can be extracted from it. + """ + + with open(self.run_manifest_file_from_instrument) as json_file: + manifest_json = json.load(json_file) + + lims_step_id = manifest_json.get("RunValues").get("lims_step_id") + + return lims_step_id + + def find_lims_zip(self) -> str | None: + # Specify dir in which LIMS drop the manifest zip files + dir_to_search = os.path.join( + self.CONFIG.get("element_analysis") + .get("Element", {}) + .get(self.sequencer_type, {}) + .get("manifest_zip_location"), + str(self.year), + ) + + # Use LIMS step ID if available, else flowcell ID, to make a query pattern + self.lims_step_id = self.get_lims_step_id() + if self.lims_step_id is not None: + logging.info( + f"Using LIMS step ID '{self.lims_step_id}' to find LIMS run manifests." + ) + glob_pattern = f"{dir_to_search}/*{self.lims_step_id}*.zip" + else: + logging.warning( + "LIMS step ID not available, using flowcell ID to find LIMS run manifests." + ) + glob_pattern = f"{dir_to_search}/*{self.flowcell_id}*.zip" + + # Find paths matching the pattern + glob_results = glob.glob(glob_pattern) + if len(glob_results) == 0: + logger.warning( + f"No manifest found for run '{self.run_dir}' with pattern '{glob_pattern}'." + ) + return None + elif len(glob_results) > 1: + logger.warning( + f"Multiple manifests found for run '{self.run_dir}' with pattern '{glob_pattern}', using latest one." + ) + glob_results.sort() # TODO: add CLI option to specify manifest for re-demux + lims_zip_src_path = glob_results[-1] + else: + lims_zip_src_path = glob_results[0] + return lims_zip_src_path + + def copy_manifests(self, zip_src_path): + """Fetch the LIMS-generated run manifests from ngi-nas-ns and unzip them into a run subdir.""" + # Make a run subdir named after the zip file and extract manifests there + + # Extract the contents of the zip file into the destination directory + unzipped_manifests = [] + with zipfile.ZipFile(zip_src_path, "r") as zip_ref: + for member in zip_ref.namelist(): + # Extract each file individually into the destination directory + filename = os.path.basename(member) + if filename: # Skip directories + source = zip_ref.open(member) + target = open(os.path.join(self.run_dir, filename), "wb") + unzipped_manifests.append(target.name) + with source, target: + target.write(source.read()) + + # Pick out the manifest to use + self.lims_manifest = [ + m for m in unzipped_manifests if re.match(r".*_untrimmed\.csv$", m) + ][0] + + def make_demux_manifests( + self, manifest_to_split: os.PathLike, outdir: os.PathLike | None = None + ) -> list[str]: + """Derive composite demultiplexing manifests + from a single information-rich manifest. + """ + + # Read specified manifest + with open(manifest_to_split) as f: + manifest_contents = f.read() + + # Get '[SAMPLES]' section + split_contents = manifest_contents.split("[SAMPLES]") + assert ( + len(split_contents) == 2 + ), f"Could not split sample rows out of manifest {manifest_contents}" + sample_section = split_contents[1].strip().split("\n") + + # Split into header and rows + header = sample_section[0] + sample_rows = sample_section[1:] + + # Convert to list of dicts + sample_dicts = [] + for row in sample_rows: + row_dict = dict(zip(header.split(","), row.split(","))) + sample_dicts.append(row_dict) + + # Convert to dataframe + df = pd.DataFrame.from_dict(sample_dicts) + + # Separate samples from controls + df_samples = df[df["Project"] != "Control"].copy() + df_controls = df[df["Project"] == "Control"].copy() + + # Add masks + df_samples["I1Mask"] = df_samples["Index1"].apply( + lambda seq: get_mask( + seq=seq, + keep_Ns=False, + prefix="I1:", + cycles_used=self.cycles["I1"], + ) + ) + df_samples["I2Mask"] = df_samples["Index2"].apply( + lambda seq: get_mask( + seq=seq, + keep_Ns=False, + prefix="I2:", + cycles_used=self.cycles["I2"], + ) + ) + df_samples["I1UmiMask"] = df_samples["Index1"].apply( + lambda seq: get_mask( + seq=seq, + keep_Ns=True, + prefix="I1:", + cycles_used=self.cycles["I1"], + ) + ) + df_samples["I2UmiMask"] = df_samples["Index2"].apply( + lambda seq: get_mask( + seq=seq, + keep_Ns=True, + prefix="I2:", + cycles_used=self.cycles["I2"], + ) + ) + df_samples["R1Mask"] = df_samples["Recipe"].apply( + lambda recipe: get_mask( + seq="N" * int(recipe.split("-")[0]), + keep_Ns=True, + prefix="R1:", + cycles_used=self.cycles["R1"], + ) + ) + df_samples["R2Mask"] = df_samples["Recipe"].apply( + lambda recipe: get_mask( + seq="N" * int(recipe.split("-")[3]), + keep_Ns=True, + prefix="R2:", + cycles_used=self.cycles["R2"], + ) + ) + + # Re-make Index2 column without any Ns + df_samples["Index2_with_Ns"] = df_samples["Index2"] + df_samples.loc[:, "Index2"] = df_samples["Index2"].apply( + lambda x: x.replace("N", "") + ) + + # Apply default dir path for output + if outdir is None: + outdir = self.run_dir + + # Break down into groups by non-consolable properties + grouped_df = df_samples.groupby( + [ + "I1Mask", + "I2Mask", + "I1UmiMask", + "I2UmiMask", + "R1Mask", + "R2Mask", + "settings", + ] + ) + + # Sanity check + if sum([len(group) for _, group in grouped_df]) < len(df_samples): + msg = "Some samples were not included in any submanifest." + logging.error(msg) + raise AssertionError(msg) + elif sum([len(group) for _, group in grouped_df]) > len(df_samples): + logging.warning("Some samples were included in multiple submanifests.") + + # Iterate over groups to build composite manifests + manifest_root_name = f"{self.NGI_run_id}_demux" + manifests = [] + n = 0 + for ( + I1Mask, + I2Mask, + I1UmiMask, + I2UmiMask, + R1Mask, + R2Mask, + settings, + ), group in grouped_df: + file_name = f"{manifest_root_name}_{n}.csv" + + runValues_section = "\n".join( + [ + "[RUNVALUES]", + "KeyName, Value", + f"manifest_file, {file_name}", + f"manifest_group, {n+1}/{len(grouped_df)}", + f"built_from, {manifest_to_split}", + ] + ) + + # Instantiate settings + settings_kvs = { + "R1FastqMask": R1Mask, + "I1Mask": I1Mask, + "I2Mask": I2Mask, + "R2FastqMask": R2Mask, + } + + # Add UMI settings + if "Y" in I1UmiMask and "Y" not in I2UmiMask: + settings_kvs["UmiMask"] = I1UmiMask + settings_kvs["UmiFastQ"] = "TRUE" + elif "Y" in I2UmiMask and "Y" not in I1UmiMask: + settings_kvs["UmiMask"] = I2UmiMask + settings_kvs["UmiFastQ"] = "TRUE" + elif "Y" not in I1UmiMask and "Y" not in I2UmiMask: + pass + else: + raise AssertionError("Both I1 and I2 appear to contain UMIs.") + + # Unpack settings from LIMS manifest + if settings: + for kv in settings.split(" "): + k, v = kv.split(":") + settings_kvs[k] = v + + settings_section = "\n".join( + [ + "[SETTINGS]", + "SettingName, Value", + ] + + [f"{k}, {v}" for k, v in settings_kvs.items()] + ) + + # Add PhiX stratified by index length + group_controls = df_controls[ + df_controls["Lane"].isin(group["Lane"].unique()) + ].copy() + + # Trim PhiX indexes to match group + i1_len = group["Index1"].apply(len).max() + group_controls.loc[:, "Index1"] = group_controls.loc[:, "Index1"].apply( + lambda x: x[:i1_len] + ) + i2_len = group["Index2"].apply(len).max() + group_controls.loc[:, "Index2"] = group_controls.loc[:, "Index2"].apply( + lambda x: x[:i2_len] + ) + + # Add PhiX to group + group = pd.concat([group, group_controls], axis=0, ignore_index=True) + + samples_section = ( + f"[SAMPLES]\n{group.iloc[:, 0:6].to_csv(index=None, header=True)}" + ) + + manifest_contents = "\n\n".join( + [runValues_section, settings_section, samples_section] + ) + + file_path = os.path.join(outdir, file_name) + manifests.append((file_path, manifest_contents)) + n += 1 + + for manifest_path, manifest_contents in manifests: + with open(os.path.join(outdir, manifest_path), "w") as f: + f.write(manifest_contents) + + manifest_paths = [t[0] for t in manifests] + return manifest_paths + + def generate_demux_command(self, run_manifest, demux_dir): + command = ( + f"{self.CONFIG.get('element_analysis').get('bases2fastq')}" + + f" {self.run_dir}" + + f" {demux_dir}" + + " -p 8" + + " --num-unassigned 500" + + f" -r {run_manifest}" + + " --legacy-fastq" + + " --force-index-orientation" + ) + with open( + os.path.join(self.run_dir, ".bases2fastq_command"), "a" + ) as command_file: + command_file.write(command) + return command + + def start_demux(self, run_manifest, demux_dir): + with chdir(self.run_dir): + cmd = self.generate_demux_command(run_manifest, demux_dir) + stderr_abspath = f"{self.run_dir}/bases2fastq_stderr.txt" # TODO: individual files for each sub-demux + try: + with open(stderr_abspath, "w") as stderr: + process = subprocess.Popen( + cmd, + shell=True, + cwd=self.run_dir, + stderr=stderr, + ) + logger.info( + "Bases2Fastq conversion and demultiplexing " + f"started for run {self} on {datetime.now()}" + f"with p_handle {process}" + ) + except subprocess.CalledProcessError: + logger.warning( + "An error occurred while starting demultiplexing for " + f"{self} on {datetime.now()}." + ) + return + + def get_transfer_status(self): + if ( + not self.in_transfer_log() + and not self.transfer_ongoing() + and not self.rsync_complete() + ): + return "not started" + elif self.transfer_ongoing() and not self.rsync_complete(): + return "ongoing" + elif self.rsync_complete() and not self.in_transfer_log(): + return "rsync done" + elif self.in_transfer_log(): + return "unknown" + + def in_transfer_log(self): + with open(self.transfer_file) as transfer_file: + for row in transfer_file.read(): + if self.NGI_run_id in row: + return True + return False + + def transfer_ongoing(self): + return os.path.isfile(os.path.join(self.run_dir, ".rsync_ongoing")) + + def rsync_complete(self): + return os.path.isfile(self.rsync_exit_file) + + def rsync_successful(self): + with open(os.path.join(self.run_dir, ".rsync_exit_status")) as rsync_exit_file: + rsync_exit_status = rsync_exit_file.readlines() + if rsync_exit_status[0].strip() == "0": + return True + else: + return False + + # Clear all content under a dir + def clear_dir(self, dir): + for filename in os.listdir(dir): + file_path = os.path.join(dir, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + print(f"Failed to delete {file_path} Reason {e}") + + # Write to csv + def write_to_csv(self, data, filename): + # Get the fieldnames from the keys of the first dictionary + fieldnames = data[0].keys() + # Open the file and write the CSV + with open(filename, mode="w", newline="") as file: + writer = csv.DictWriter(file, fieldnames=fieldnames) + # Write the header (fieldnames) + writer.writeheader() + # Write the data (rows) + writer.writerows(data) + + # Collect demux info into a list of dictionaries + # Structure: [{'sub_demux_count':XXX, 'SampleName':XXX, 'Index1':XXX, 'Index2':XXX, 'Lane':XXX, 'Project':XXX, 'Recipe':XXX}] + def collect_demux_runmanifest(self, demux_results_dirs): + demux_runmanifest = [] + for demux_dir in demux_results_dirs: + sub_demux_count = os.path.basename(demux_dir).split("_")[1] + with open(os.path.join(self.run_dir, demux_dir, "RunManifest.csv")) as file: + lines = file.readlines() + sample_section = False + headers = [] + # Loop through each line + for line in lines: + # Check if we reached the "[SAMPLES]" section + if "[SAMPLES]" in line: + sample_section = True + continue + # Exit the sample section if another section is encountered + if sample_section and line.startswith("["): + break + # If in the sample section, process the sample lines + if sample_section: + # Clean up the line + line = line.strip() + # Skip empty lines + if not line: + continue + # Get the headers from the first line + if not headers: + headers = line.split(",") + else: + # Parse sample data + values = line.split(",") + sample_dict = dict(zip(headers, values)) + sample_dict["sub_demux_count"] = sub_demux_count + demux_runmanifest.append(sample_dict) + sorted_demux_runmanifest = sorted( + demux_runmanifest, + key=lambda x: (x["Lane"], x["SampleName"], x["sub_demux_count"]), + ) + return sorted_demux_runmanifest + + # Aggregate the output FastQ files of samples from multiple demux + def aggregate_sample_fastq(self, demux_runmanifest): + lanes = sorted(list(set(sample["Lane"] for sample in demux_runmanifest))) + for lane in lanes: + unique_sample_demux = set() + sample_count = 1 + for sample in demux_runmanifest: + lanenr = sample["Lane"] + project = sample["Project"] + sample_name = sample["SampleName"] + sub_demux_count = sample["sub_demux_count"] + # Skip PhiX + if lanenr == lane and sample_name != "PhiX": + sample_tuple = (sample_name, sub_demux_count) + if sample_tuple not in unique_sample_demux: + project_dest = os.path.join( + self.run_dir, self.demux_dir, project + ) + sample_dest = os.path.join( + self.run_dir, + self.demux_dir, + project, + f"Sample_{sample_name}", + ) + if not os.path.exists(project_dest): + os.makedirs(project_dest) + if not os.path.exists(sample_dest): + os.makedirs(sample_dest) + fastqfiles = glob.glob( + os.path.join( + self.run_dir, + f"Demultiplexing_{sub_demux_count}", + "Samples", + project, + sample_name, + f"*L00{lane}*.fastq.gz", + ) + ) + for fastqfile in fastqfiles: + old_name = os.path.basename(fastqfile) + read_label = re.search( + rf"L00{lane}_(.*?)_001", old_name + ).group(1) + new_name = "_".join( + [ + sample_name, + f"S{sample_count}", + f"L00{lane}", + read_label, + "001.fastq.gz", + ] + ) + os.symlink(fastqfile, os.path.join(sample_dest, new_name)) + unique_sample_demux.add(sample_tuple) + sample_count += 1 + + # Symlink the output FastQ files of undet only if a lane does not have multiple demux + def aggregate_undet_fastq(self, demux_runmanifest): + lanes = sorted(list(set(sample["Lane"] for sample in demux_runmanifest))) + for lane in lanes: + sub_demux = list( + set( + sample["sub_demux_count"] + for sample in demux_runmanifest + if sample["Lane"] == lane + ) + ) + if len(sub_demux) == 1: + project_dest = os.path.join( + self.run_dir, self.demux_dir, "Undetermined" + ) + if not os.path.exists(project_dest): + os.makedirs(project_dest) + fastqfiles = glob.glob( + os.path.join( + self.run_dir, + f"Demultiplexing_{sub_demux[0]}", + "Samples", + "Undetermined", + f"*L00{lane}*.fastq.gz", + ) + ) + for fastqfile in fastqfiles: + base_name = os.path.basename( + fastqfile + ) # TODO: Make symlinks relative instead of absolute to maintain them after archiving + os.symlink(fastqfile, os.path.join(project_dest, base_name)) + + # Read in each Project_RunStats.json to fetch PercentMismatch, PercentQ30, PercentQ40 and QualityScoreMean + # Note that Element promised that they would include these stats into IndexAssignment.csv + # But for now we have to do this by ourselves in this hard way + def get_project_runstats(self, sub_demux, demux_runmanifest): + project_runstats = [] + project_list = sorted( + list( + set( + sample["Project"] + for sample in demux_runmanifest + if sample["sub_demux_count"] == sub_demux + ) + ) + ) + for project in project_list: + project_runstats_json_path = os.path.join( + self.run_dir, + f"Demultiplexing_{sub_demux}", + "Samples", + project, + f"{project}_RunStats.json", + ) + if os.path.exists(project_runstats_json_path): + with open(project_runstats_json_path) as stats_json: + project_runstats_json = json.load(stats_json) + for sample in project_runstats_json["SampleStats"]: + sample_name = sample["SampleName"] + for occurrence in sample["Occurrences"]: + lane = occurrence["Lane"] + expected_sequence = occurrence["ExpectedSequence"] + percentage_mismatch = occurrence["PercentMismatch"] + percentage_q30 = occurrence["PercentQ30"] + percentage_q40 = occurrence["PercentQ40"] + quality_score_mean = occurrence["QualityScoreMean"] + project_runstats.append( + { + "SampleName": sample_name, + "Lane": str(lane), + "ExpectedSequence": expected_sequence, + "PercentMismatch": percentage_mismatch, + "PercentQ30": percentage_q30, + "PercentQ40": percentage_q40, + "QualityScoreMean": quality_score_mean, + } + ) + else: + continue + return project_runstats + + # Aggregate stats in IndexAssignment.csv + def aggregate_stats_assigned(self, demux_runmanifest): + aggregated_assigned_indexes = [] + sub_demux_list = sorted( + list(set(sample["sub_demux_count"] for sample in demux_runmanifest)) + ) + lanes = sorted(list(set(sample["Lane"] for sample in demux_runmanifest))) + for sub_demux in sub_demux_list: + # Read in each Project_RunStats.json to fetch PercentMismatch, PercentQ30, PercentQ40 and QualityScoreMean + # Note that Element promised that they would include these stats into IndexAssignment.csv + # But for now we have to do this by ourselves in this hard way + project_runstats = self.get_project_runstats(sub_demux, demux_runmanifest) + # Read in IndexAssignment.csv + assigned_csv = os.path.join( + self.run_dir, f"Demultiplexing_{sub_demux}", "IndexAssignment.csv" + ) + if os.path.exists(assigned_csv): + with open(assigned_csv) as assigned_file: + reader = csv.DictReader(assigned_file) + index_assignment = [row for row in reader] + for sample in index_assignment: + if sample["Lane"] in lanes: + project_runstats_sample = [ + d + for d in project_runstats + if d["SampleName"] == sample["SampleName"] + and d["Lane"] == sample["Lane"] + and d["ExpectedSequence"] == sample["I1"] + sample["I2"] + ] + sample["sub_demux_count"] = sub_demux + sample["PercentMismatch"] = project_runstats_sample[0][ + "PercentMismatch" + ] + sample["PercentQ30"] = project_runstats_sample[0]["PercentQ30"] + sample["PercentQ40"] = project_runstats_sample[0]["PercentQ40"] + sample["QualityScoreMean"] = project_runstats_sample[0][ + "QualityScoreMean" + ] + aggregated_assigned_indexes.append(sample) + else: + logger.warning( + f"No {os.path.basename(assigned_csv)} file found for sub-demultiplexing {sub_demux}." + ) + # Remove redundant rows for PhiX + aggregated_assigned_indexes_filtered = [] + phix_filtered = [] + for sample in aggregated_assigned_indexes: + # Add project name + sample["Project"] = [ + d for d in demux_runmanifest if d["SampleName"] == sample["SampleName"] + ][0]["Project"] + # Get the PhiX with the longest index combination. + if sample["SampleName"] == "PhiX": + lane = sample["Lane"] + idx1 = sample["I1"] + idx2 = sample["I2"] + num_polonies_assigned = sample["NumPoloniesAssigned"] + if not phix_filtered: + phix_filtered.append(sample) + else: + found_flag = False + for phix_record in phix_filtered: + if lane == phix_record["Lane"]: + idx1_shorter_len = min(len(idx1), len(phix_record["I1"])) + idx2_shorter_len = min(len(idx2), len(phix_record["I2"])) + if ( + idx1[:idx1_shorter_len] + == phix_record["I1"][:idx1_shorter_len] + and idx2[:idx2_shorter_len] + == phix_record["I2"][:idx2_shorter_len] + ): + found_flag = True + # When the new record has a longer index combination length, take the new record and remove the old one + # When the index combination length happen to be the same, keep the one with the higher polonies assigned + if len(idx1) + len(idx2) > len(phix_record["I1"]) + len( + phix_record["I2"] + ) or ( + len(idx1) + len(idx2) + == len(phix_record["I1"]) + len(phix_record["I2"]) + and num_polonies_assigned + >= phix_record["NumPoloniesAssigned"] + ): + phix_filtered.remove(phix_record) + phix_filtered.append(sample) + if not found_flag: + phix_filtered.append(sample) + else: + aggregated_assigned_indexes_filtered.append(sample) + # Combine the list of samples and PhiX + aggregated_assigned_indexes_filtered += phix_filtered + # Sort the list by Lane, SampleName and sub_demux_count + aggregated_assigned_indexes_filtered_sorted = sorted( + aggregated_assigned_indexes_filtered, + key=lambda x: (x["Lane"], x["SampleName"], x["sub_demux_count"]), + ) + # Fix new sample number based on SampleName and Lane + sample_count = 0 + previous_samplename_lane = ("NA", "NA") + for sample in aggregated_assigned_indexes_filtered_sorted: + if (sample["SampleName"], sample["Lane"]) != previous_samplename_lane: + sample_count += 1 + previous_samplename_lane = (sample["SampleName"], sample["Lane"]) + sample["SampleNumber"] = sample_count + # Write to a new UnassignedSequences.csv file under demux_dir + aggregated_assigned_indexes_csv = os.path.join( + self.run_dir, self.demux_dir, "IndexAssignment.csv" + ) + self.write_to_csv( + aggregated_assigned_indexes_filtered_sorted, aggregated_assigned_indexes_csv + ) + + return aggregated_assigned_indexes_filtered_sorted + + # Aggregate stats in UnassignedSequences.csv + def aggregate_stats_unassigned( + self, demux_runmanifest, aggregated_assigned_indexes_filtered_sorted + ): + aggregated_unassigned_indexes = [] + lanes = sorted(list(set(sample["Lane"] for sample in demux_runmanifest))) + for lane in lanes: + sub_demux_index_lens = set() + for sample in demux_runmanifest: + if sample["Lane"] == lane: + sub_demux_index_lens.add( + ( + sample["sub_demux_count"], + ( + len(sample.get("Index1", "")), + len(sample.get("Index2", "")), + ), + ) + ) + # List of sub-demux with a decreasing order of index lengths + sub_demux_list = [ + x[0] + for x in sorted( + sub_demux_index_lens, key=lambda x: sum(x[1]), reverse=True + ) + ] + sub_demux_with_max_index_lens = sub_demux_list[0] + # Start with the unassigned list with the longest index + max_unassigned_csv = os.path.join( + self.run_dir, + f"Demultiplexing_{sub_demux_with_max_index_lens}", + "UnassignedSequences.csv", + ) + if os.path.exists(max_unassigned_csv): + with open(max_unassigned_csv) as max_unassigned_file: + reader = csv.DictReader(max_unassigned_file) + max_unassigned_indexes = [row for row in reader] + else: + logger.warning( + f"No {os.path.basename(max_unassigned_csv)} file found for sub-demultiplexing {sub_demux_with_max_index_lens}." + ) + break + # Filter by lane + max_unassigned_indexes = [ + idx for idx in max_unassigned_indexes if idx["Lane"] == lane + ] + # Complicated case with multiple demuxes. Take the full list if there is only one sub-demux otherwise + if len(sub_demux_list) > 1: + # Order: from longer to shorter indexes + sub_demux_with_shorter_index_lens = sub_demux_list[1:] + for sub_demux in sub_demux_with_shorter_index_lens: + sub_demux_assigned_indexes = [ + sub_demux_assigned_index + for sub_demux_assigned_index in aggregated_assigned_indexes_filtered_sorted + if sub_demux_assigned_index["sub_demux_count"] == sub_demux + and sub_demux_assigned_index["Lane"] == lane + ] + # Remove overlapped indexes from the list of max_unassigned_indexes + idx1_overlapped_len = min( + [ + demux_lens_pair[1] + for demux_lens_pair in sub_demux_index_lens + if demux_lens_pair[0] == sub_demux + ][0][0], + [ + demux_lens_pair[1] + for demux_lens_pair in sub_demux_index_lens + if demux_lens_pair[0] == sub_demux_with_max_index_lens + ][0][0], + ) + idx2_overlapped_len = min( + [ + demux_lens_pair[1] + for demux_lens_pair in sub_demux_index_lens + if demux_lens_pair[0] == sub_demux + ][0][1], + [ + demux_lens_pair[1] + for demux_lens_pair in sub_demux_index_lens + if demux_lens_pair[0] == sub_demux_with_max_index_lens + ][0][1], + ) + for sub_demux_assigned_index in sub_demux_assigned_indexes: + idx1_overlapped_seq = sub_demux_assigned_index["I1"][ + :idx1_overlapped_len + ] + idx2_overlapped_seq = sub_demux_assigned_index["I2"][ + :idx2_overlapped_len + ] + # Remove the overlapped record from the max_unassigned_indexes list + max_unassigned_indexes = [ + max_unassigned_index + for max_unassigned_index in max_unassigned_indexes + if not ( + max_unassigned_index["I1"][:idx1_overlapped_len] + == idx1_overlapped_seq + and max_unassigned_index["I2"][:idx2_overlapped_len] + == idx2_overlapped_seq + ) + ] + # Append to the aggregated_unassigned_indexes list + aggregated_unassigned_indexes += max_unassigned_indexes + # Sort aggregated_unassigned_indexes list first by lane and then by Count in the decreasing order + aggregated_unassigned_indexes = sorted( + aggregated_unassigned_indexes, key=lambda x: (x["Lane"], -int(x["Count"])) + ) + # Fetch PFCount for each lane + # to calculate % of unassigned index in total lane PF polonies + pfcount_lane = {} + if os.path.exists(self.run_stats_file): + with open(self.run_stats_file) as stats_json: + aviti_runstats_json = json.load(stats_json) + # Check whether the lane numbers match between the run stat json and run manifests + if len(aviti_runstats_json["LaneStats"]) != len(lanes): + logger.warning( + f"Inconsistent lane numbers between the {os.path.basename(self.run_stats_file)} file and run manifests!" + ) + else: + # When there is no RunManifest uploaded at the sequencer, the lane numbers will all be 0 + # In this case we assume that the lanes are ordered by their numbers + if all( + lane_stats["Lane"] == 0 + for lane_stats in aviti_runstats_json["LaneStats"] + ): + lane_counter = 1 + for lane_stats in aviti_runstats_json["LaneStats"]: + pfcount_lane[str(lane_counter)] = float(lane_stats["PFCount"]) + lane_counter += 1 + # Otherwise we parse the PF counts by matching the lane numbers + else: + for lane_stats in aviti_runstats_json["LaneStats"]: + pfcount_lane[str(lane_stats["Lane"])] = float( + lane_stats["PFCount"] + ) + # Prepare the dict for pf assigned count for each lane + pf_assigned_lane = {} + for sample in aggregated_assigned_indexes_filtered_sorted: + lane = sample["Lane"] + num_polonies_assigned = int(sample["NumPoloniesAssigned"]) + if lane in pf_assigned_lane: + pf_assigned_lane[lane] += num_polonies_assigned + else: + pf_assigned_lane[lane] = num_polonies_assigned + # Modify the % Polonies values based on PFCount for each lane + for unassigned_index in aggregated_unassigned_indexes: + if pfcount_lane.get(unassigned_index["Lane"]): + unassigned_index["% Polonies"] = ( + float(unassigned_index["Count"]) + / pfcount_lane[unassigned_index["Lane"]] + * 100 + ) + # Calculate the % Polonies values in the total unassigned for each lane + if pf_assigned_lane.get(unassigned_index["Lane"]): + unassigned_index["% Unassigned"] = ( + float(unassigned_index["Count"]) + / ( + pfcount_lane[unassigned_index["Lane"]] + - pf_assigned_lane[unassigned_index["Lane"]] + ) + * 100 + ) + else: + unassigned_index["% Unassigned"] = 0 + else: + unassigned_index["% Polonies"] = 0 + else: + logger.warning( + f"No {os.path.basename(self.run_stats_file)} file found for the run." + ) + + # Write to a new UnassignedSequences.csv file under demux_dir + if aggregated_unassigned_indexes: + aggregated_unassigned_csv = os.path.join( + self.run_dir, self.demux_dir, "UnassignedSequences.csv" + ) + self.write_to_csv(aggregated_unassigned_indexes, aggregated_unassigned_csv) + + # Aggregate demux results + def aggregate_demux_results(self, demux_results_dirs): + # Ensure the destination directory exists + if not os.path.exists(os.path.join(self.run_dir, self.demux_dir)): + os.makedirs(os.path.join(self.run_dir, self.demux_dir)) + # Clear all content under dest_dir + self.clear_dir(os.path.join(self.run_dir, self.demux_dir)) + demux_runmanifest = self.collect_demux_runmanifest(demux_results_dirs) + # Aggregate the output FastQ files of samples from multiple demux + self.aggregate_sample_fastq(demux_runmanifest) + # Symlink the output FastQ files of undet only if a lane does not have multiple demux + self.aggregate_undet_fastq(demux_runmanifest) + # Aggregate stats in IndexAssignment.csv + aggregated_assigned_indexes_filtered_sorted = self.aggregate_stats_assigned( + demux_runmanifest + ) + # Aggregate stats in UnassignedSequences.csv + self.aggregate_stats_unassigned( + demux_runmanifest, aggregated_assigned_indexes_filtered_sorted + ) + + def sync_metadata(self): + files_to_copy = [ + self.run_stats_file, + os.path.join(self.run_dir, "Demultiplexing", "IndexAssignment.csv"), + os.path.join(self.run_dir, "Demultiplexing", "UnassignedSequences.csv"), + self.run_parameters_file, + ] + metadata_archive = self.CONFIG.get("element_analysis").get("metadata_location") + dest = os.path.join(metadata_archive, self.NGI_run_id) + if not os.path.exists(dest): + os.makedirs(dest) + for f in files_to_copy: # UnassignedSequences.csv missing in NoIndex case + if os.path.exists(f): + shutil.copy(f, dest) + else: + logger.warning(f"File {f} missing for run {self.run}") + + def make_transfer_indicator(self): + transfer_indicator = os.path.join(self.run_dir, ".rsync_ongoing") + Path(transfer_indicator).touch() + + def transfer(self): + transfer_details = self.CONFIG.get("element_analysis").get("transfer_details") + command = ( + "rsync" + + " -rLav" + + f" --chown={transfer_details.get('owner')}" + + f" --chmod={transfer_details.get('permissions')}" + + " --exclude BaseCalls" + + " --exclude Alignment" + + f" {self.run_dir}" + + f" {transfer_details.get('user')}@{transfer_details.get('host')}:/aviti" + + f"; echo $? > {os.path.join(self.run_dir, '.rsync_exit_status')}" + ) + try: + p_handle = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) + logger.info( + "Transfer to analysis cluster " + f"started for run {self} on {datetime.now()}" + f"with p_handle {p_handle}" + ) + except subprocess.CalledProcessError: + logger.warning( + "An error occurred while starting transfer to analysis cluster " + f"for {self} on {datetime.now()}." + ) + return + + def remove_transfer_indicator(self): + transfer_indicator = os.path.join(self.run_dir, ".rsync_ongoing") + Path(transfer_indicator).unlink() + + def update_transfer_log(self): + """Update transfer log with run id and date.""" + try: + with open(self.transfer_file, "a") as f: + tsv_writer = csv.writer(f, delimiter="\t") + tsv_writer.writerow([self.NGI_run_id, str(datetime.now())]) + except OSError: + msg = f"{self}: Could not update the transfer logfile {self.transfer_file}" + logger.error(msg) + raise OSError(msg) + + def update_paths_after_archiving(self, new_location): + self.run_dir = os.path.join( + new_location, self.NGI_run_id + ) # Needs to be redirected to new location so that TACA can find files to upload to statusdb + self.run_parameters_file = os.path.join(self.run_dir, "RunParameters.json") + self.run_stats_file = os.path.join(self.run_dir, "AvitiRunStats.json") + self.run_manifest_file_from_instrument = os.path.join( + self.run_dir, "RunManifest.json" + ) + self.run_uploaded_file = os.path.join(self.run_dir, "RunUploaded.json") + + def move_to_nosync(self): + """Move directory to nosync.""" + src = self.run_dir + parent_dir = Path(self.run_dir).parent.absolute() + dst = os.path.join(parent_dir, "nosync") + shutil.move(src, dst) + self.update_paths_after_archiving(dst) diff --git a/taca/element/__init__.py b/taca/element/__init__.py new file mode 100644 index 00000000..75a569ff --- /dev/null +++ b/taca/element/__init__.py @@ -0,0 +1,3 @@ +""" +Classes to parse and work with Element data +""" diff --git a/taca/utils/statusdb.py b/taca/utils/statusdb.py index 939e0606..a2920550 100644 --- a/taca/utils/statusdb.py +++ b/taca/utils/statusdb.py @@ -166,6 +166,33 @@ def finish_ongoing_run(self, ont_run, dict_json: dict): self.db[doc.id] = doc +class ElementRunsConnection(StatusdbSession): + def __init__(self, config, dbname="element_runs"): + super().__init__(config) + self.db = self.connection[dbname] + + def get_db_entry(self, run_id): + view_run_id = self.db.view("info/id") + try: + return view_run_id[run_id].rows[0] + except IndexError: + return None + + def check_if_run_exists(self, run_id) -> bool: + return self.get_db_entry(run_id) is not None + + def check_db_run_status(self, run_name) -> str: + view_status = self.db.view("info/status") + try: + status = view_status[run_name].rows[0].value + except IndexError: # No rows found + return "Unknown" + return status + + def upload_to_statusdb(self, run_obj: dict): + update_doc(self.db, run_obj) + + def update_doc(db, obj, over_write_db_entry=False): view = db.view("info/name") if len(view[obj["name"]].rows) == 1: @@ -183,7 +210,7 @@ def update_doc(db, obj, over_write_db_entry=False): db.save(obj) logger.info("Saving {}".format(obj["name"])) else: - logger.warn("More than one row with name {} found".format(obj["name"])) + logger.warning("More than one row with name {} found".format(obj["name"])) def merge_dicts(d1, d2): diff --git a/tests/analysis/test_analysis_element.py b/tests/analysis/test_analysis_element.py new file mode 100644 index 00000000..356a695a --- /dev/null +++ b/tests/analysis/test_analysis_element.py @@ -0,0 +1,192 @@ +import logging +import sys +from tempfile import TemporaryDirectory +from unittest.mock import patch + +import pytest + +from tests.element.test_Element_Runs import create_element_run_dir, get_config + +# from dirhash import dirhash TODO this might be useful for validating dir tree snapshots + + +@pytest.fixture() +def aviti_fixture(create_dirs, caplog): + # Create tempdir + tmp: TemporaryDirectory = create_dirs + + # Capture log + caplog.at_level(logging.INFO) + + # Mocks + mocks = { + "mock_config": patch("taca.utils.config.CONFIG", new=get_config(tmp)).start(), + "mock_db": patch( + "taca.element.Element_Runs.ElementRunsConnection", autospec=True + ).start(), + "mock_mail": patch("taca.analysis.analysis_element.send_mail").start(), + "mock_popen": patch("subprocess.Popen").start(), + } + + # Import module to test + from taca.analysis import analysis_element as to_test + + # Yield fixtures + yield to_test, tmp, caplog, mocks + + # Stop mocks + patch.stopall() + + # Purge module + del sys.modules["taca.analysis.analysis_element"] + + +def test_process_on_empty_dir(aviti_fixture): + to_test, tmp, caplog, mocks = aviti_fixture + """Should raise FileNotFoundError when no files are present in the run dir and send mail.""" + + # Create dir + run_dir = create_element_run_dir( + tmp=tmp, + lims_manifest=False, + metadata_files=False, + run_finished=False, + outcome_completed=False, + demux_dir=False, + demux_done=False, + rsync_ongoing=False, + rsync_exit_status=None, + nosync=False, + ) + + with pytest.raises(FileNotFoundError): + to_test.run_preprocessing(run_dir) + + # Assertions + mocks["mock_mail"].assert_called_once() + assert "Run parameters file not found" in caplog.text + + +def test_process_on_dir_w_metadata(aviti_fixture): + """Should update statusdb.""" + to_test, tmp, caplog, mocks = aviti_fixture + + # Sub-mock configuration + mocks["mock_db"].return_value.check_db_run_status.return_value = "ongoing" + mocks["mock_db"].return_value.upload_to_statusdb.return_value = None + + # Add metadata files + run_dir = create_element_run_dir( + tmp=tmp, + lims_manifest=False, + metadata_files=True, + run_finished=False, + outcome_completed=False, + demux_dir=False, + demux_done=False, + rsync_ongoing=False, + rsync_exit_status=None, + nosync=False, + ) + + to_test.run_preprocessing(run_dir) + + assert mocks["mock_db"].return_value.upload_to_statusdb.called + + +@pytest.mark.skip("Currently a failed run is treated as an ongoing run.") +def test_process_on_failed_run(aviti_fixture): + to_test, tmp, caplog, mocks = aviti_fixture + + # Sub-mock configuration + mocks["mock_db"].return_value.check_db_run_status.return_value = "ongoing" + mocks["mock_db"].return_value.upload_to_statusdb.return_value = None + + # Add metadata files + run_dir = create_element_run_dir( + tmp=tmp, + lims_manifest=False, + metadata_files=True, + run_finished=True, + outcome_completed=False, + demux_dir=False, + demux_done=False, + rsync_ongoing=False, + rsync_exit_status=None, + nosync=False, + ) + + to_test.run_preprocessing(run_dir) + + +def test_process_on_finished_run_wo_lims_manifest(aviti_fixture): + """Should fail to find LIMS run manifest and send mail.""" + to_test, tmp, caplog, mocks = aviti_fixture + + # Sub-mock configuration + mocks["mock_db"].return_value.check_db_run_status.return_value = "ongoing" + mocks["mock_db"].return_value.upload_to_statusdb.return_value = None + + # Add metadata files + run_dir = create_element_run_dir( + tmp=tmp, + lims_manifest=False, + metadata_files=True, + run_finished=True, + outcome_completed=True, + demux_dir=False, + demux_done=False, + rsync_ongoing=False, + rsync_exit_status=None, + nosync=False, + ) + + to_test.run_preprocessing(run_dir) + + assert "No manifest found for run" in caplog.text + mocks["mock_mail"].assert_called_once() + + +def test_process_on_finished_run(aviti_fixture): + """Should start demux.""" + to_test, tmp, caplog, mocks = aviti_fixture + + # Sub-mock configuration + mocks["mock_db"].return_value.check_db_run_status.return_value = "ongoing" + mocks["mock_db"].return_value.upload_to_statusdb.return_value = None + + # Add metadata files + run_dir = create_element_run_dir( + tmp=tmp, + lims_manifest=True, + metadata_files=True, + run_finished=True, + outcome_completed=True, + demux_dir=False, + demux_done=False, + rsync_ongoing=False, + rsync_exit_status=None, + nosync=False, + ) + + to_test.run_preprocessing(run_dir) + + expected_cmd = " ".join( + [ + "mock_bases2fastq_path", + f"{tmp.name}/ngi_data/sequencing/AV242106/20240926_AV242106_A2349523513", + f"{tmp.name}/ngi_data/sequencing/AV242106/20240926_AV242106_A2349523513/Demultiplexing_0", + "-p 8", + "--num-unassigned 500", + f"-r {tmp.name}/ngi_data/sequencing/AV242106/20240926_AV242106_A2349523513/20240926_AV242106_A2349523513_demux_0.csv", + "--legacy-fastq", + "--force-index-orientation", + ] + ) + + assert any( + expected_cmd in call.args[0] for call in mocks["mock_popen"].call_args_list + ), f"Expected command '{expected_cmd}' not found in any Popen calls." + + assert "Bases2Fastq conversion and demultiplexing started for run " in caplog.text + assert mocks["mock_db"].return_value.upload_to_statusdb.called diff --git a/tests/analysis/test_analysis_nanopore.py b/tests/analysis/test_analysis_nanopore.py index 01fec070..30a7105a 100644 --- a/tests/analysis/test_analysis_nanopore.py +++ b/tests/analysis/test_analysis_nanopore.py @@ -15,70 +15,58 @@ ) -def build_run_properties() -> dict: +def parametrize_testruns(): """In order to parametrize the test in a comprehensive way, the parametrization is tabulated as a string here. """ - col_names = [ - "instrument", - "qc", - "run_finished", - "sync_finished", - "raw_dirs", - "fastq_dirs", - "barcode_dirs", - "anglerfish_samplesheets", - "anglerfish_ongoing", - "anglerfish_exit", - ] - parameter_string_table = """ - promethion False False False False False False False False NA - promethion False True False False False False False False NA - promethion False True True False False False False False NA - promethion False True True True False False False False NA - promethion False True True True True False False False NA - promethion False True True True True True False False NA - minion False False False False False False False False NA - minion False True False False False False False False NA - minion False True True False False False False False NA - minion False True True True False False False False NA - minion False True True True True False False False NA - minion False True True True True True False False NA - minion True False False False False False False False NA - minion True True False False False False False False NA - minion True True True False False False False False NA - minion True True True True False False False False NA - minion True True True True True False False False NA - minion True True True True True True False False NA - minion True True True True True True True False NA - minion True True True True True True True True NA - minion True True True True True True True False 0 + desc instrument qc run_finished sync_finished raw_dirs fastq_dirs barcode_dirs anglerfish_samplesheets anglerfish_ongoing anglerfish_exit + prom_ongoing promethion False False False False False False False False NA + prom_done promethion False True False False False False False False NA + prom_synced promethion False True True False False False False False NA + prom_reads promethion False True True True False False False False NA + prom_fastq promethion False True True True True False False False NA + prom_bcs promethion False True True True True True False False NA + min_ongoing minion False False False False False False False False NA + min_done minion False True False False False False False False NA + min_synced minion False True True False False False False False NA + min_reads minion False True True True False False False False NA + min_fastq minion False True True True True False False False NA + min_bcs minion False True True True True True False False NA + min_qc_ongoing minion True False False False False False False False NA + min_qc_done minion True True False False False False False False NA + min_qc_synced minion True True True False False False False False NA + min_qc_reads minion True True True True False False False False NA + min_qc_fastq minion True True True True True False False False NA + min_qc_bcs minion True True True True True True False False NA + min_qc_ang_ss minion True True True True True True True False NA + min_qc_ang_run minion True True True True True True True True NA + min_qc_ang_done minion True True True True True True True False 0 """ + # Turn string table to datastream data = StringIO(parameter_string_table) # Read data, trimming whitespace - df = pd.read_csv(data, header=None, sep=r"\s+") - assert len(df.columns) == len(col_names) - df.columns = col_names + df = pd.read_csv(data, sep=r"\s+") # Replace nan(s) with None(s) df = df.replace(np.nan, None) - # Convert to dict - run_properties = df.to_dict("records") + # Drop the "desc" column and retain it as a list + testrun_descs = df.pop("desc").tolist() + + # Compile into list of parameters to use + testrun_kwargs: list[dict] = df.to_dict(orient="records") - # Convert float exit codes to ints - for d in run_properties: - if d["anglerfish_exit"] == 0.0: - d["anglerfish_exit"] = int(d["anglerfish_exit"]) + return testrun_kwargs, testrun_descs - return run_properties +testrun_kwargs, testrun_descs = parametrize_testruns() -@pytest.mark.parametrize("run_properties", build_run_properties()) + +@pytest.mark.parametrize("run_properties", testrun_kwargs, ids=testrun_descs) def test_ont_transfer(create_dirs, run_properties, caplog): """Test the "taca analaysis ont-transfer" subcommand automation from start to finish for a variety of runs. @@ -147,3 +135,6 @@ def side_effect(*args, **kwargs): # Start testing analysis_nanopore.ont_transfer(run_abspath=None, qc=False) + + # Stop mocks + patch.stopall() diff --git a/tests/conftest.py b/tests/conftest.py index 171a5667..8b53b4e3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import logging import os import shutil import tempfile @@ -14,17 +15,19 @@ def create_dirs(): │ ├── Chromium_10X_indexes.txt │ └── Smart-seq3_v1.5.csv ├── log - │ ├── transfer_minion_qc.tsv + │ ├── taca.log + │ ├── transfer.tsv + │ ├── transfer_aviti.tsv │ ├── transfer_minion.tsv + │ ├── transfer_minion_qc.tsv │ └── transfer_promethion.tsv - │ └── transfer.tsv - │ └── taca.log ├── miarka │ ├── minion │ │ └── qc │ └── promethion ├── minknow_reports ├── ngi-nas-ns + │ ├── Aviti_data │ ├── NextSeq_data │ ├── NovaSeqXPlus_data │ ├── NovaSeq_data @@ -32,10 +35,13 @@ def create_dirs(): │ ├── miseq_data │ ├── promethion_data │ └── samplesheets + │ ├── Aviti │ ├── NovaSeqXPlus │ └── anglerfish └── ngi_data └── sequencing + ├── AV242106 + │ └── nosync ├── MiSeq │ └── nosync ├── NextSeq @@ -65,6 +71,8 @@ def create_dirs(): os.makedirs(f"{tmp.name}/ngi_data/sequencing/promethion/nosync") os.makedirs(f"{tmp.name}/ngi_data/sequencing/minion/nosync") os.makedirs(f"{tmp.name}/ngi_data/sequencing/minion/qc/nosync") + ## Element + os.makedirs(f"{tmp.name}/ngi_data/sequencing/AV242106/nosync") # Sequencing metadata ## Illumina @@ -75,10 +83,13 @@ def create_dirs(): ## ONT os.makedirs(f"{tmp.name}/ngi-nas-ns/promethion_data") os.makedirs(f"{tmp.name}/ngi-nas-ns/minion_data") + ## Element + os.makedirs(f"{tmp.name}/ngi-nas-ns/Aviti_data") # Samplesheets os.makedirs(f"{tmp.name}/ngi-nas-ns/samplesheets/anglerfish") os.makedirs(f"{tmp.name}/ngi-nas-ns/samplesheets/NovaSeqXPlus") + os.makedirs(f"{tmp.name}/ngi-nas-ns/samplesheets/Aviti") # Misc. ONT dirs/files os.makedirs(f"{tmp.name}/minknow_reports") @@ -86,6 +97,7 @@ def create_dirs(): open(f"{tmp.name}/log/transfer_promethion.tsv", "w").close() open(f"{tmp.name}/log/transfer_minion.tsv", "w").close() open(f"{tmp.name}/log/transfer_minion_qc.tsv", "w").close() + open(f"{tmp.name}/log/transfer_aviti.tsv", "w").close() open(f"{tmp.name}/log/transfer.tsv", "w").close() open(f"{tmp.name}/log/taca.log", "w").close() @@ -104,3 +116,46 @@ def create_dirs(): yield tmp tmp.cleanup() + + +@pytest.fixture(autouse=True) +def configure_logging(create_dirs): + """Configure logging for the entire test session.""" + + # Use fixture + tmp = create_dirs + + # Specify log file path + log_file = os.path.join(tmp.name, "log", "taca.log") + assert os.path.exists(log_file) + + # Get the root logger + logger = logging.getLogger() + + # Clear any existing handlers to avoid duplicate logs + if logger.hasHandlers(): + logger.handlers.clear() + + # Configure logging + file_handler = logging.FileHandler(log_file) + stream_handler = logging.StreamHandler() + + # Set a common formatter + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + file_handler.setFormatter(formatter) + stream_handler.setFormatter(formatter) + + # Add handlers to the root logger + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + # Set log level + logger.setLevel(logging.INFO) + + # Log to confirm the logger is working + logger.info(f"Logging is set up. Logs will be stored in {log_file}.") + + # Return the log file path to use in tests if needed + return log_file diff --git a/tests/element/__init__.py b/tests/element/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/element/test_Aviti_Runs.py b/tests/element/test_Aviti_Runs.py new file mode 100644 index 00000000..62d142bc --- /dev/null +++ b/tests/element/test_Aviti_Runs.py @@ -0,0 +1,21 @@ +import tempfile +from unittest.mock import patch + +import pytest + +from taca.element import Aviti_Runs as to_test +from tests.element.test_Element_Runs import create_element_run_dir, get_config + + +class TestAviti_Run: + def test_init(self, create_dirs: pytest.fixture): + tmp: tempfile.TemporaryDirectory = create_dirs + run_dir = create_element_run_dir(tmp) + + # Mock db + mock_db = patch("taca.element.Element_Runs.ElementRunsConnection") + mock_db.start() + + run = to_test.Aviti_Run(run_dir, get_config(tmp)) + assert run.run_dir == run_dir + assert run.sequencer_type == "Aviti" diff --git a/tests/element/test_Element_Runs.py b/tests/element/test_Element_Runs.py new file mode 100644 index 00000000..37a5c9a4 --- /dev/null +++ b/tests/element/test_Element_Runs.py @@ -0,0 +1,546 @@ +import os +import tempfile +import zipfile +from unittest import mock + +import pytest + +from taca.element import Element_Runs as to_test + + +def get_config(tmp: tempfile.TemporaryDirectory) -> dict: + config = { + "element_analysis": { + "Element": { + "Aviti": { + "manifest_zip_location": f"{tmp.name}/ngi-nas-ns/samplesheets/Aviti", + "transfer_log": f"{tmp.name}/log/transfer_aviti.tsv", + }, + }, + "bases2fastq": "mock_bases2fastq_path", + }, + "mail": { + "recipients": ["mock@mock.com"], + }, + "statusdb": {}, + } + return config + + +def create_element_run_dir( + tmp: tempfile.TemporaryDirectory, + overwrite: bool = False, + run_name: str = "20240926_AV242106_A2349523513", + metadata_files: bool = False, + lims_manifest: bool = False, + run_finished: bool = False, + outcome_completed: bool = False, + demux_dir: bool = False, + n_demux_subdirs: int = 2, + demux_done: bool = False, + rsync_ongoing: bool = False, + rsync_exit_status: int | None = None, + nosync: bool = False, +) -> str: + """ + Build a run dir for an Element run for test purposes. + + Some file contents are replaced with "MOCK" to shorten them. + + 20240926_AV242106_A2349523513 + ├── RunManifest.csv + ├── RunManifest.json + ├── RunParameters.json + ├── RunUploaded.json + ├── .sync_finished + ├── Demultiplexing + ├── Demultiplexing_0 + | └── RunStats.json + ├── Demultiplexing_1 + | └── RunStats.json + └── ... + + """ + + # Create run dir + if nosync: + run_path = f"{tmp.name}/ngi_data/sequencing/AV242106/nosync/{run_name}" + else: + run_path = f"{tmp.name}/ngi_data/sequencing/AV242106/{run_name}" + if os.path.exists(run_path): + if overwrite: + os.rmdir(run_path) + else: + raise FileExistsError(f"Directory {run_path} already exists.") + os.mkdir(run_path) + + # Create LIMS manifest + if lims_manifest: + manifest_root_name = "AVITI_run_manifest_2349523513_24-1061390_240926_171138_ChristianNatanaelsson" + manifest_pdir = f"{tmp.name}/ngi-nas-ns/samplesheets/Aviti/2024" + + os.mkdir(manifest_pdir) + + csv_path = f"{manifest_pdir}/{manifest_root_name}_untrimmed.csv" + zip_path = f"{manifest_pdir}/{manifest_root_name}.zip" + + with open(csv_path, "w") as stream: + # This run manifest was generated after the sequencing run, + # and is different from what it's file name implies. + stream.write("""[RUNVALUES] +KeyName, Value +lims_step_name, Load to Flowcell (AVITI) v1.0 +lims_step_id, 24-1061411 +manifest_file, AVITI_run_manifest_2349523513_24-1061411_241011_142515_AlfredKedhammar_untrimmed.csv + +[SETTINGS] +SettingName, Value + +[SAMPLES] +SampleName,Index1,Index2,Lane,Project,Recipe,lims_label,settings +P32105_1001,AAAGCATA,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1001,CTGCAGCC,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1001,GCCTTTAT,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1001,TGTAGCGG,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1002,ATTGGACG,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1002,CAGCTTAC,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1002,GGCAAGGA,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1002,TCATCCTT,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1003,ACGTTACA,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1003,CGTAGGTT,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1003,GACGACGG,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1003,TTACCTAC,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1004,ACTTCACT,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +P32105_1004,CGAAGTTG,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +P32105_1004,GAGCACGC,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +P32105_1004,TTCGTGAA,NNNNNNNNNNNNNNNNNNNNNNNN,1,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +PhiX_Adept,ATGTCGCTAG,CTAGCTCGTA,1,Control,0-0,, +PhiX_Adept,CACAGATCGT,ACGAGAGTCT,1,Control,0-0,, +PhiX_Adept,GCACATAGTC,GACTACTAGC,1,Control,0-0,, +PhiX_Adept,TGTGTCGACA,TGTCTGACAG,1,Control,0-0,, +P32105_1001,AAAGCATA,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1001,CTGCAGCC,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1001,GCCTTTAT,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1001,TGTAGCGG,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-A3,I1Fastq:True +P32105_1002,ATTGGACG,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1002,CAGCTTAC,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1002,GGCAAGGA,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1002,TCATCCTT,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-B3,I1Fastq:True +P32105_1003,ACGTTACA,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1003,CGTAGGTT,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1003,GACGACGG,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1003,TTACCTAC,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-C3,I1Fastq:True +P32105_1004,ACTTCACT,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +P32105_1004,CGAAGTTG,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +P32105_1004,GAGCACGC,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +P32105_1004,TTCGTGAA,NNNNNNNNNNNNNNNNNNNNNNNN,2,I__Adameyko_24_06,50-8-24-49,SI-NA-D3,I1Fastq:True +PhiX_Adept,ATGTCGCTAG,CTAGCTCGTA,2,Control,0-0,, +PhiX_Adept,CACAGATCGT,ACGAGAGTCT,2,Control,0-0,, +PhiX_Adept,GCACATAGTC,GACTACTAGC,2,Control,0-0,, +PhiX_Adept,TGTGTCGACA,TGTCTGACAG,2,Control,0-0,, +""") + + with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: + # Add the CSV file to the zip file + zipf.write(csv_path, os.path.basename(csv_path)) + os.remove(csv_path) + + # Populate run dir with files and folders + if metadata_files: + with open(f"{run_path}/RunManifest.json", "w") as stream: + stream.write("""{ + "KitConfiguration": { + "MaxCycles": 334, + "DefaultR1Cycles": 151, + "DefaultR2Cycles": 151, + "DefaultI1Cycles": -1, + "DefaultI2Cycles": -1, + "MinimumR1Cycles": 5, + "MinimumR2Cycles": 0, + "MinimumI1Cycles": 0, + "MinimumI2Cycles": 0, + "DefaultI1FastQ": false, + "DefaultI2FastQ": false, + "DefaultUMIFastQ": false, + "DefaultI1Mask": "I1:Y*", + "DefaultI2Mask": "I2:Y*", + "DefaultUmiMask": "I1:N*", + "DefaultR1FastQMask": "R1:Y*N", + "DefaultR2FastQMask": "R2:Y*N", + "DefaultI1MaskRead": "I1", + "DefaultI2MaskRead": "I2", + "DefaultUmiMaskRead": "I1", + "DefaultR1FastQMaskRead": "R1", + "DefaultR2FastQMaskRead": "R2", + "DefaultR1Adapter": "", + "DefaultR2Adapter": "", + "DefaultR1AdapterTrim": false, + "DefaultR2AdapterTrim": false, + "DefaultR1AdapterNMask": false, + "DefaultR2AdapterNMask": false, + "DefaultR1AdapterMinimumTrimmedLength": 16, + "DefaultR2AdapterMinimumTrimmedLength": 16, + "DefaultR1AdapterMinimumStringency": 0.9, + "DefaultR2AdapterMinimumStringency": 0.9, + "DefaultR1AdapterMinimumOverlap": 3, + "DefaultR2AdapterMinimumOverlap": 3, + "DefaultAdapterTrimType": "Paired-End" + }, + "RunParameters": { + "PreparationWorkflow": "Adept", + "KitConfiguration": "300Cycles", + "ChemistryVersion": "Cloudbreak", + "LowDiversity": false, + "I1Cycles": 8, + "I2Cycles": 24, + "R1Cycles": 50, + "R2Cycles": 49 + }, + "RunValues": { + "lims_step_id": "24-1061390", + "lims_step_name": "Load to Flowcell (AVITI) v1.0", + "manifest_file": "AVITI_run_manifest_2349523513_24-1061390_240926_171138_ChristianNatanaelsson_trimmed.csv" + }, + "Settings": [ + { + "Lane": 1, + "I1MismatchThreshold": 1, + "I2MismatchThreshold": 1, + "R1Adapter": [], + "R2Adapter": [], + "I1MaskManifest": "I1:N*", + "I1Mask": [ + { + "Read": "I1", + "Cycles": [] + } + ], + "I1FastQ": false, + "I2MaskManifest": "I2:N*", + "I2Mask": [ + { + "Read": "I2", + "Cycles": [] + } + ], + "I2FastQ": false, + "UmiMaskManifest": "I1:N*", + "UmiMask": [ + { + "Read": "I1", + "Cycles": [] + } + ], + "UmiFastQ": false, + "R1FastQMaskManifest": "R1:Y*N", + "R1FastQMask": [ + { + "Read": "R1", + "Cycles": "MOCK" + } + ], + "R2FastQMaskManifest": "R2:Y*N", + "R2FastQMask": [ + { + "Read": "R2", + "Cycles": "MOCK" + } + ], + "SpikeInAsUnassigned": true, + "R1AdapterTrim": false, + "R2AdapterTrim": false, + "R1AdapterNMask": false, + "R2AdapterNMask": false, + "R1AdapterMinimumTrimmedLength": 16, + "R2AdapterMinimumTrimmedLength": 16, + "R1AdapterMinimumStringency": 0.9, + "R2AdapterMinimumStringency": 0.9, + "R1AdapterMinimumOverlap": 3, + "R2AdapterMinimumOverlap": 3, + "AdapterTrimType": "Paired-End" + }, + { + "Lane": 2, + "I1MismatchThreshold": 1, + "I2MismatchThreshold": 1, + "R1Adapter": [], + "R2Adapter": [], + "I1MaskManifest": "I1:N*", + "I1Mask": [ + { + "Read": "I1", + "Cycles": [] + } + ], + "I1FastQ": false, + "I2MaskManifest": "I2:N*", + "I2Mask": [ + { + "Read": "I2", + "Cycles": [] + } + ], + "I2FastQ": false, + "UmiMaskManifest": "I1:N*", + "UmiMask": [ + { + "Read": "I1", + "Cycles": [] + } + ], + "UmiFastQ": false, + "R1FastQMaskManifest": "R1:Y*N", + "R1FastQMask": [ + { + "Read": "R1", + "Cycles": "MOCK" + } + ], + "R2FastQMaskManifest": "R2:Y*N", + "R2FastQMask": [ + { + "Read": "R2", + "Cycles": "MOCK" + } + ], + "SpikeInAsUnassigned": true, + "R1AdapterTrim": false, + "R2AdapterTrim": false, + "R1AdapterNMask": false, + "R2AdapterNMask": false, + "R1AdapterMinimumTrimmedLength": 16, + "R2AdapterMinimumTrimmedLength": 16, + "R1AdapterMinimumStringency": 0.9, + "R2AdapterMinimumStringency": 0.9, + "R1AdapterMinimumOverlap": 3, + "R2AdapterMinimumOverlap": 3, + "AdapterTrimType": "Paired-End" + } + ], + "Samples": [ + { + "SampleName": "DefaultSample", + "SampleNumber": 1, + "ExternalId": "", + "Indexes": [ + { + "Lane": 1, + "Index1": "", + "Index2": "" + }, + { + "Lane": 2, + "Index1": "", + "Index2": "" + } + ], + "CustomMetadata": {}, + "Project": "DefaultProject" + } + ] +} +""") + with open(f"{run_path}/RunParameters.json", "w") as stream: + stream.write("""{ + "FileVersion": "5.0.0", + "RunName": "A2349523513", + "RecipeExecutionID": "rec.9590c80c95fc4eee8b3eb10c31251915", + "RunID": "seq_66f5837f1ae1a35f10a2e594", + "RunType": "Sequencing", + "RunDescription": "", + "Side": "SideA", + "FlowcellID": "2349523513", + "Date": "2024-09-26T16:34:55.978072698Z", + "InstrumentName": "AV242106", + "OperatorName": "christian.natanael@scilifelab.se ", + "RunFolderName": "20240926_AV242106_A2349523513", + "Tiles": "MOCK", + "Cycles": { + "R1": 50, + "R2": 49, + "I1": 8, + "I2": 24 + }, + "ReadOrder": "I1,I2,R1,R2", + "ThroughputSelection": "High", + "KitConfiguration": "300Cycles", + "PreparationWorkflow": "Adept", + "ChemistryVersion": "Cloudbreak", + "LowDiversity": false, + "PlatformVersion": "2.6.2", + "AnalysisLanes": "1+2", + "StorageConnectionID": "local:66866355d07c3234c01b67b1", + "PMGMask": "P1:Y4N*", + "Consumables": { + "Flowcell": { + "SerialNumber": "2349523513", + "PartNumber": "810-00002", + "LotNumber": "2405300233", + "Expiration": "2025-05-31T00:00:00Z", + "ExpirationStr": "20250531", + "BarcodeStr": "2349523513,810-00002,2405300233,20250531" + }, + "SequencingCartridge": { + "SerialNumber": "24062600390028", + "PartNumber": "820-00013", + "LotNumber": "2406260039", + "Expiration": "2025-05-22T00:00:00Z", + "ExpirationStr": "20250522", + "BarcodeStr": "24062600390028,820-00013,2406260039,20250522" + }, + "Buffer": { + "SerialNumber": "24062400390041", + "PartNumber": "820-00002", + "LotNumber": "2406240039", + "Expiration": "2026-06-25T00:00:00Z", + "ExpirationStr": "20260625", + "BarcodeStr": "24062400390041,820-00002,2406240039,20260625" + } + }, + "LibraryType": "Linear", + "RecipeValues": [ + { + "Name": "filterMask", + "Value": "R1:Y15N*-R2:Y15N*" + } + ], + "AdvancedSettings": { + "PolonyDensity": "HighDensity" + } +} +""") + + if run_finished: + with open(f"{run_path}/RunUploaded.json", "w") as stream: + outcome = "OutcomeCompleted" if outcome_completed else "OutcomeFailed" + stream.write( + "{" + + '"version":"1.0.0",' + + '"instrument":"AV242106",' + + '"instrumentId":"0000024023696901c5621014",' + + '"runType":"Sequencing",' + + '"recipeExecutionId":"rec.9590c80c95fc4eee8b3eb10c31251915",' + + '"runID":"seq_66f5837f1ae1a35f10a2e594",' + + f'"outcome":"{outcome}"' + + "}" + ) + + if rsync_ongoing: + open(f"{run_path}/.rsync_ongoing", "w").close() + + if rsync_exit_status is not None: + with open(f"{run_path}/.rsync_exit_status", "w") as stream: + stream.write(str(rsync_exit_status)) + + if demux_dir: + os.mkdir(os.path.join(run_path, "Demultiplexing")) + if demux_done: + open( + os.path.join( + run_path, + "Demultiplexing", + "RunStats.json", + ), + "w", + ).close() + for i in range(n_demux_subdirs): + os.mkdir(os.path.join(run_path, f"Demultiplexing_{i}")) + if demux_done: + open( + os.path.join( + run_path, + f"Demultiplexing_{i}", + "RunStats.json", + ), + "w", + ).close() + + return run_path + + +@mock.patch("taca.element.Element_Runs.ElementRunsConnection") +class TestRun: + def test_init(self, mock_db: mock.Mock, create_dirs: pytest.fixture): + tmp: tempfile.TemporaryDirectory = create_dirs + run_dir = create_element_run_dir(tmp) + + run = to_test.Run(run_dir, get_config(tmp)) + assert run.run_dir == run_dir + + @pytest.mark.parametrize( + "p", + [ + { + "run_finished": True, + "metadata_files": True, + "outcome_completed": True, + "expected": True, + }, + { + "run_finished": True, + "metadata_files": True, + "outcome_completed": False, + "expected": False, + }, + { + "run_finished": False, + "metadata_files": False, + "outcome_completed": False, + "expected": False, + }, + ], + ids=["success", "failure", "ongoing"], + ) + def test_check_sequencing_status( + self, + mock_db: mock.Mock, + p: pytest.fixture, + create_dirs: pytest.fixture, + ): + tmp: tempfile.TemporaryDirectory = create_dirs + + expected_outcome = p.pop("expected") + run = to_test.Run( + create_element_run_dir( + tmp, + **p, + ), + get_config(tmp), + ) + assert run.check_sequencing_status() is expected_outcome + + @pytest.mark.parametrize( + "p", + [ + {"demux_dir": False, "demux_done": False, "expected": "not started"}, + {"demux_dir": True, "demux_done": False, "expected": "ongoing"}, + {"demux_dir": True, "demux_done": True, "expected": "finished"}, + ], + ids=["not started", "ongoing", "finished"], + ) + def test_get_demultiplexing_status( + self, mock_db: mock.Mock, p: pytest.fixture, create_dirs: pytest.fixture + ): + tmp: tempfile.TemporaryDirectory = create_dirs + + run = to_test.Run( + create_element_run_dir( + tmp, + demux_dir=p["demux_dir"], + demux_done=p["demux_done"], + ), + get_config(tmp), + ) + + assert run.get_demultiplexing_status() == p["expected"] + + def test_start_demux(self, mock_db, create_dirs): + tmp: tempfile.TemporaryDirectory = create_dirs + with mock.patch("subprocess.Popen") as mock_Popen, mock.patch( + "taca.element.Element_Runs.Run.generate_demux_command" + ) as mock_command: + mock_command.return_value = "test command" + run = to_test.Run(create_element_run_dir(create_dirs), get_config(tmp)) + run.start_demux("mock_run_manifest", "mock_demux_dir") + mock_command.assert_called_once_with("mock_run_manifest", "mock_demux_dir") + mock_Popen.assert_called_once() diff --git a/tests/nanopore/test_ONT_run_classes.py b/tests/nanopore/test_ONT_run_classes.py index a91a2f97..99cd558f 100644 --- a/tests/nanopore/test_ONT_run_classes.py +++ b/tests/nanopore/test_ONT_run_classes.py @@ -52,8 +52,6 @@ def make_ONT_test_config(tmp: tempfile.TemporaryDirectory) -> dict: minknow_reports_dir: {tmp.name}/minknow_reports/ rsync_options: '-Lav': None - '--chown': ':ngi2016003' - '--chmod': 'Dg+s,g+rw' '-r': None '--exclude': ['work']"""