From eda9f3ff7c5e28d6af1f1fb0d479785ce74bf581 Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Tue, 17 Sep 2024 08:45:53 +0200 Subject: [PATCH] Restructure transfer status --- taca/analysis/analysis_element.py | 9 +++++---- taca/element/Element_Runs.py | 12 +++++++++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/taca/analysis/analysis_element.py b/taca/analysis/analysis_element.py index 42fd3e00..0f20ba8a 100755 --- a/taca/analysis/analysis_element.py +++ b/taca/analysis/analysis_element.py @@ -72,7 +72,8 @@ def _process(run): run.update_statusdb() return elif sequencing_done and demultiplexing_status == "finished": - if not run.in_transfer_log() and not run.transfer_ongoing() and not run.rsync_complete(): + transfer_status = run.get_transfer_status() + if transfer_status == "not started": run.aggregate_demux_results() # TODO: if multiple demux dirs, aggregate the results into Demultiplexing? run.sync_metadata() run.make_transfer_indicator() @@ -81,13 +82,13 @@ def _process(run): run.update_statusdb() # TODO: Also update statusdb with a timestamp of when the transfer started run.transfer() # I think this should be a detached command as well - elif run.transfer_ongoing() and not run.rsync_complete(): + elif transfer_status == "ongoing": run.status = "transferring" if run.status_changed: run.update_statusdb() logger.info(f"{run} is being transferred. Skipping.") return - elif run.rsync_complete() and not run.in_transfer_log(): + elif transfer_status == "finished": if run.rsync_success(): run.remove_transfer_indicator() run.update_transfer_log() @@ -102,7 +103,7 @@ def _process(run): run.status = "transfer failed" logger.warning(f"An issue occurred while transfering {run} to the analysis cluster." ) # TODO: email warning to operator - elif run.in_transfer_log(): + elif transfer_status == "unknown": logger.warning( f"The run {run} has already been transferred but has not been archived. Please investigate" ) diff --git a/taca/element/Element_Runs.py b/taca/element/Element_Runs.py index 6b8f03e5..2010a933 100644 --- a/taca/element/Element_Runs.py +++ b/taca/element/Element_Runs.py @@ -179,7 +179,7 @@ def generate_demux_command(self, run_manifest, demux_dir): ], # TODO add path to bases2fastq executable to config self.run_dir, demux_dir, - "-p 12", # TODO: how many? Considering that we may start several demux runs at once + "-p 8", f"-r {run_manifest}", "--legacy-fastq", # TODO: except if Smart-seq3 "--force-index-orientation", @@ -198,6 +198,16 @@ def start_demux(self, run_manifest, demux_dir): f"started for run {self} on {datetime.now()}" ) + def get_transfer_status(self): + if not self.in_transfer_log() and not self.transfer_ongoing() and not self.rsync_complete(): + return "not started" + elif self.transfer_ongoing() and not self.rsync_complete(): + return "ongoing" + elif self.rsync_complete() and not self.in_transfer_log(): + return "finished" + elif self.in_transfer_log(): + return "unknown" + def in_transfer_log(self): with open(self.transfer_file, 'r') as transfer_file: for row in transfer_file.read():