Skip to content

Commit

Permalink
Merge pull request #410 from kedhammar/dev
Browse files Browse the repository at this point in the history
More improvements on TACA running Anglerfish
  • Loading branch information
kedhammar authored Feb 5, 2024
2 parents 4617a61 + 331c79e commit d22c0f6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# TACA Version Log

## 20240202.1

Use abspath for Anglerfish stderr path, make it possible to instantiate ONT run w/o specifying the type, add more info to the ONT db update subcommand.

## 20240201.1

Fix bugs that changs in PR #404 were reverted in PR #411
Expand Down
50 changes: 33 additions & 17 deletions taca/nanopore/ONT_run_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ def __init__(self, run_abspath: str):
if v == "None":
self.rsync_options[k] = None

# Get transfer details, depending on run type and instrument
self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]

# Get DB
self.db = NanoporeRunsConnection(CONFIG["statusdb"], dbname="nanopore_runs")

Expand Down Expand Up @@ -123,13 +118,6 @@ def assert_contents(self):
assert self.has_file("/final_summary*.txt")
assert self.has_file("/pore_activity*.csv")

def is_transferred(self) -> bool:
"""Return True if run ID in transfer.tsv, else False."""
with open(self.transfer_details["transfer_log"]) as f:
return self.run_name in f.read()

# DB update

def touch_db_entry(self):
"""Check run vs statusdb. Create entry if there is none."""

Expand Down Expand Up @@ -172,6 +160,18 @@ def update_db_entry(self, force_update=False):
# Instantiate json (dict) to update the db with
db_update = {}

# Parse run path
db_update["run_path"] = (
open(f"{self.run_abspath}/run_path.txt").read().strip()
)

# Parse pore counts
pore_counts = []
with open(f"{self.run_abspath}/pore_count_history.csv") as stream:
for line in csv.DictReader(stream):
pore_counts.append(line)
db_update["pore_count_history"] = pore_counts

# Parse report_*.json
self.parse_minknow_json(db_update)

Expand Down Expand Up @@ -230,7 +230,7 @@ def parse_pore_activity(self, db_update):
def parse_minknow_json(self, db_update):
"""Parse useful stuff from the MinKNOW .json report to add to CouchDB"""

logger.info(f"{self.run_name}:Parsing report JSON...")
logger.info(f"{self.run_name}: Parsing report JSON...")

dict_json_report = json.load(open(self.get_file("/report*.json")))

Expand Down Expand Up @@ -377,16 +377,27 @@ class ONT_user_run(ONT_run):
"""ONT user run, has class methods and attributes specific to user runs."""

def __init__(self, run_abspath: str):
self.run_type = "user_run"
super().__init__(run_abspath)
self.run_type = "user_run"
self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]

def is_transferred(self) -> bool:
"""Return True if run ID in transfer.tsv, else False."""
with open(self.transfer_details["transfer_log"]) as f:
return self.run_name in f.read()


class ONT_qc_run(ONT_run):
"""ONT QC run, has class methods and attributes specific to QC runs"""

def __init__(self, run_abspath: str):
self.run_type = "qc_run"
super().__init__(run_abspath)
self.run_type = "qc_run"
self.transfer_details = CONFIG["nanopore_analysis"]["run_types"][self.run_type][
"instruments"
][self.instrument]

# Get Anglerfish attributes from run
self.anglerfish_done_abspath = f"{self.run_abspath}/.anglerfish_done"
Expand All @@ -402,6 +413,11 @@ def __init__(self, run_abspath: str):
]
self.anglerfish_path = self.anglerfish_config["anglerfish_path"]

def is_transferred(self) -> bool:
"""Return True if run ID in transfer.tsv, else False."""
with open(self.transfer_details["transfer_log"]) as f:
return self.run_name in f.read()

# QC methods

def get_anglerfish_exit_code(self) -> Union[int, None]:
Expand Down Expand Up @@ -510,7 +526,7 @@ def run_anglerfish(self):
# Copy samplesheet used for traceability
shutil.copy(self.anglerfish_samplesheet, f"{taca_anglerfish_run_dir}/")
# Create files to dump subprocess std
stderr_relpath = f"{taca_anglerfish_run_dir}/stderr.txt"
stderr_abspath = f"{self.run_abspath}/{taca_anglerfish_run_dir}/stderr.txt"

full_command = [
# Dump subprocess PID into 'run-ongoing'-indicator file.
Expand All @@ -535,7 +551,7 @@ def run_anglerfish(self):
stream.write("\n".join(full_command))

# Start Anglerfish subprocess
with open(stderr_relpath, "w") as stderr:
with open(stderr_abspath, "w") as stderr:
process = subprocess.Popen(
f"bash {taca_anglerfish_run_dir}/command.sh",
shell=True,
Expand Down

0 comments on commit d22c0f6

Please sign in to comment.