Skip to content

Commit

Permalink
Add command for starting demux
Browse files Browse the repository at this point in the history
  • Loading branch information
ssjunnebo committed Sep 17, 2024
1 parent 0baa93c commit 1fe015d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 31 deletions.
2 changes: 1 addition & 1 deletion taca/analysis/analysis_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _process(run):
logger.info(f"{run} is being transferred. Skipping.")
return
elif transfer_status == "rsync done":
if run.rsync_success():
if run.rsync_successful():
run.remove_transfer_indicator()
run.update_transfer_log()
run.status = "transferred"
Expand Down
64 changes: 34 additions & 30 deletions taca/element/Element_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import logging
import os
import re
import shutil
import zipfile
import subprocess
from datetime import datetime
from pathlib import Path
from glob import glob

import pandas as pd
Expand Down Expand Up @@ -36,6 +37,7 @@ def __init__(self, run_dir, configuration):
self.transfer_file = (
self.CONFIG.get("Element").get(self.sequencer_type).get("transfer_log")
) # TODO: change and add to taca.yaml
self.rsync_exit_file = os.path.join(self.run_dir, '.rsync_exit_status')

# Instrument generated files
self.run_parameters_file = os.path.join(self.run_dir, "RunParameters.json")
Expand Down Expand Up @@ -360,31 +362,32 @@ def make_demux_manifests(
return manifest_paths

def generate_demux_command(self, run_manifest, demux_dir):
command = [
self.CONFIG.get(self.software)[
"bin"
], # TODO add path to bases2fastq executable to config
self.run_dir,
demux_dir,
"-p 8",
f"-r {run_manifest}",
"--legacy-fastq", # TODO: except if Smart-seq3
"--force-index-orientation",
] # TODO: any other options?
# TODO: write exit status of command to file
command = (f"{self.CONFIG.get(self.software)["bin"]}" # TODO: add path to bases2fastq executable to config
+ f" {self.run_dir}"
+ f" {demux_dir}"
+ " -p 8"
+ f" -r {run_manifest}"
+ " --legacy-fastq" # TODO: except if Smart-seq3
+ f" --force-index-orientation; echo $? > {self.rsync_exit_file}"
) # TODO: any other options?
return command

def start_demux(self, run_manifest, demux_dir):
with chdir(self.run_dir):
cmd = self.generate_demux_command(run_manifest, demux_dir)
# TODO handle multiple composite manifests for demux
misc.call_external_command_detached(
cmd, with_log_files=True, prefix="demux_"
)
logger.info(
"Bases2Fastq conversion and demultiplexing "
f"started for run {self} on {datetime.now()}"
)
try:
p_handle = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True, cwd=self.run_dir)
logger.info(
"Bases2Fastq conversion and demultiplexing "
f"started for run {self} on {datetime.now()}"
)
except subprocess.CalledProcessError:
logger.warning("An error occurred while starting demultiplexing for "
f"{self} on {datetime.now()}."
)
return


def get_transfer_status(self):
if not self.in_transfer_log() and not self.transfer_ongoing() and not self.rsync_complete():
Expand All @@ -404,17 +407,18 @@ def in_transfer_log(self):
return False

def transfer_ongoing(self):
# TODO: return true if hidden transfer file marker exists, else false

pass
return os.path.isfile(os.path.join(self.run_dir, '.rsync_ongoing'))

def rsync_complete(self):
# TODO: return true if .rsync_exit_status exists
pass
return os.path.isfile(self.rsync_exit_file)

def get_rsync_exit_status():
# TODO: return status of rsync from .rsync_exit_status
pass
def rsync_successful(self):
with open(os.path.join(self.run_dir, '.rsync_exit_status')) as rsync_exit_file:
rsync_exit_status = rsync_exit_file.readlines()
if rsync_exit_status[0].strip() == 0:
return True
else:
return False

def aggregate_demux_results(self):
# TODO: aggregate demux results
Expand All @@ -425,8 +429,8 @@ def sync_metadata(self):
pass

def make_transfer_indicator(self):
# TODO: touch a hidden file in the run directory
pass
transfer_indicator = os.path.join(self.run_dir, '.rsync_ongoing')
Path(transfer_indicator).touch()

def transfer(self):
# TODO: rsync run to analysis cluster
Expand Down

0 comments on commit 1fe015d

Please sign in to comment.