From 463aeb27a41895e812f23a56123ae562747a3d20 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 10 Jun 2024 15:48:58 -0400 Subject: [PATCH 1/7] work for #37 --- looper/conductor.py | 93 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) diff --git a/looper/conductor.py b/looper/conductor.py index ffbb1b54..21b4d015 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -4,6 +4,9 @@ import logging import os import subprocess +import signal +import psutil +import sys import time import yaml from math import ceil @@ -189,6 +192,7 @@ def __init__( the project level, rather that on the sample level) """ super(SubmissionConductor, self).__init__() + self.collate = collate self.section_key = PROJECT_PL_KEY if self.collate else SAMPLE_PL_KEY self.pl_iface = pipeline_interface @@ -210,6 +214,7 @@ def __init__( self._curr_size = 0 self._failed_sample_names = [] self._curr_skip_pool = [] + self.process_id = None # this is used for a submitted subprocess if self.extra_pipe_args: _LOGGER.debug( @@ -392,6 +397,10 @@ def submit(self, force=False): not for dry run) """ submitted = False + + # Override signal handler so that Ctrl+C can be used to gracefully terminate child process + signal.signal(signal.SIGINT, self._signal_int_handler) + if not self._pool: _LOGGER.debug("No submission (no pooled samples): %s", self.pl_name) # submitted = False @@ -421,7 +430,9 @@ def submit(self, force=False): # Capture submission command return value so that we can # intercept and report basic submission failures; #167 try: - subprocess.check_call(submission_command, shell=True) + process = subprocess.Popen(submission_command, shell=True) + self.process_id = process.pid + return_code = process.wait() except subprocess.CalledProcessError: fails = ( "" if self.collate else [s.sample_name for s in self._samples] @@ -489,6 +500,86 @@ def _sample_lump_name(self, pool): # name concordant with 1-based, not 0-based indexing. return "lump{}".format(self._num_total_job_submissions + 1) + def _signal_int_handler(self, signal, frame): + """ + For catching interrupt (Ctrl +C) signals. Fails gracefully. + """ + signal_type = "SIGINT" + self._generic_signal_handler(signal_type) + + def _generic_signal_handler(self, signal_type): + """ + Function for handling both SIGTERM and SIGINT + """ + message = "Received " + signal_type + ". Failing gracefully..." + _LOGGER.warning(msg=message) + + self._terminate_current_subprocess() + + sys.exit(1) + + def _terminate_current_subprocess(self): + + def pskill(proc_pid, sig=signal.SIGINT): + parent_process = psutil.Process(proc_pid) + for child_proc in parent_process.children(recursive=True): + child_proc.send_signal(sig) + parent_process.send_signal(sig) + + if self.process_id is None: + return + + # Gently wait for the subprocess before attempting to kill it + sys.stdout.flush() + still_running = self._attend_process(psutil.Process(self.process_id), 0) + sleeptime = 0.25 + time_waiting = 0 + + while still_running and time_waiting < 3: + try: + if time_waiting > 2: + pskill(self.process_id, signal.SIGKILL) + elif time_waiting > 1: + pskill(self.process_id, signal.SIGTERM) + else: + pskill(self.process_id, signal.SIGINT) + + except OSError: + # This would happen if the child process ended between the check + # and the next kill step + still_running = False + time_waiting = time_waiting + sleeptime + + # Now see if it's still running + time_waiting = time_waiting + sleeptime + if not self._attend_process(psutil.Process(self.process_id), sleeptime): + still_running = False + + if still_running: + _LOGGER.warning(f"Unable to halt child process: {self.process_id}") + else: + if time_waiting > 0: + note = f"terminated after {time_waiting} sec" + else: + note = "was already terminated" + _LOGGER.warning(msg=f"Child process {self.process_id} {note}.") + + def _attend_process(self, proc, sleeptime): + """ + Waits on a process for a given time to see if it finishes, returns True + if it's still running after the given time or False as soon as it + returns. + + :param psutil.Process proc: Process object opened by psutil.Popen() + :param float sleeptime: Time to wait + :return bool: True if process is still running; otherwise false + """ + try: + proc.wait(timeout=int(sleeptime)) + except psutil.TimeoutExpired: + return True + return False + def _jobname(self, pool): """Create the name for a job submission.""" return "{}_{}".format(self.pl_iface.pipeline_name, self._sample_lump_name(pool)) From ae5d14d442e615139968c8828991049d70ce70cd Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 10 Jun 2024 15:54:29 -0400 Subject: [PATCH 2/7] add psutil to requirements --- requirements/requirements-all.txt | 3 ++- requirements/requirements-test.txt | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index a78b632d..bd28baa8 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -11,4 +11,5 @@ pyyaml>=3.12 rich>=9.10.0 ubiquerg>=0.5.2 yacman==0.9.3 -pydantic2-argparse>=0.9.2 \ No newline at end of file +pydantic2-argparse>=0.9.2 +psutil \ No newline at end of file diff --git a/requirements/requirements-test.txt b/requirements/requirements-test.txt index 87d10086..f5579eba 100644 --- a/requirements/requirements-test.txt +++ b/requirements/requirements-test.txt @@ -4,4 +4,5 @@ pytest pytest-cov pytest-remotedata veracitools -GitPython \ No newline at end of file +GitPython +psutil \ No newline at end of file From a675ab1c15bf18c351e16916785a0d5a080fb127 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 10 Jun 2024 16:10:57 -0400 Subject: [PATCH 3/7] handle subprocess errors using PIPE instead of subprocess.CalledProcessError exception --- looper/conductor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/looper/conductor.py b/looper/conductor.py index 21b4d015..48be8555 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -12,7 +12,7 @@ from math import ceil from copy import copy, deepcopy from json import loads -from subprocess import check_output +from subprocess import check_output, PIPE from typing import * from eido import read_schema, get_input_files_size @@ -429,11 +429,11 @@ def submit(self, force=False): submission_command = "{} {}".format(sub_cmd, script) # Capture submission command return value so that we can # intercept and report basic submission failures; #167 - try: - process = subprocess.Popen(submission_command, shell=True) - self.process_id = process.pid - return_code = process.wait() - except subprocess.CalledProcessError: + + process = subprocess.Popen(submission_command, stderr=PIPE, shell=True) + self.process_id = process.pid + output, errors = process.communicate() + if errors: fails = ( "" if self.collate else [s.sample_name for s in self._samples] ) From cbf9b24c9f142d2aa34e5cee296106f5908958a5 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Tue, 11 Jun 2024 15:28:07 -0400 Subject: [PATCH 4/7] update advanced and pipestat examples, fix corresponding tests, add output_schema to pipestat namespace --- looper/conductor.py | 5 ++-- .../advanced/pipeline/col_pipeline1.py | 0 .../advanced/pipeline/col_pipeline2.py | 0 .../advanced/pipeline/other_pipeline2.py | 0 .../advanced/pipeline/pipeline1.py | 0 .../pipeline/pipeline_interface1_project.yaml | 8 +++---- .../pipeline/pipeline_interface1_sample.yaml | 8 +++---- .../pipeline/pipeline_interface2_project.yaml | 8 +++---- .../pipeline/pipeline_interface2_sample.yaml | 7 ++---- .../pipestat_pipeline_interface1_sample.yaml | 8 +++---- .../pipestat_pipeline_interface2_sample.yaml | 8 +++---- .../pipestat/pipeline_pipestat/count_lines.py | 7 +++++- .../pipeline_pipestat/pipeline_interface.yaml | 2 +- tests/smoketests/test_other.py | 24 +++++++++++++++++++ tests/test_comprehensive.py | 1 + 15 files changed, 52 insertions(+), 34 deletions(-) create mode 100644 tests/data/hello_looper-dev/advanced/pipeline/col_pipeline1.py create mode 100644 tests/data/hello_looper-dev/advanced/pipeline/col_pipeline2.py create mode 100644 tests/data/hello_looper-dev/advanced/pipeline/other_pipeline2.py create mode 100644 tests/data/hello_looper-dev/advanced/pipeline/pipeline1.py diff --git a/looper/conductor.py b/looper/conductor.py index 48be8555..bcdd57df 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -429,11 +429,11 @@ def submit(self, force=False): submission_command = "{} {}".format(sub_cmd, script) # Capture submission command return value so that we can # intercept and report basic submission failures; #167 - process = subprocess.Popen(submission_command, stderr=PIPE, shell=True) self.process_id = process.pid output, errors = process.communicate() - if errors: + _LOGGER.debug(msg=errors) + if process.returncode != 0: fails = ( "" if self.collate else [s.sample_name for s in self._samples] ) @@ -654,6 +654,7 @@ def _set_pipestat_namespace( "results_file": psm.file, "record_identifier": psm.record_identifier, "config_file": psm.config_path, + "output_schema": psm.cfg["_schema_path"], } filtered_namespace = {k: v for k, v in full_namespace.items() if v} return YAMLConfigManager(filtered_namespace) diff --git a/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline1.py b/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline1.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline2.py b/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline2.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/hello_looper-dev/advanced/pipeline/other_pipeline2.py b/tests/data/hello_looper-dev/advanced/pipeline/other_pipeline2.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline1.py b/tests/data/hello_looper-dev/advanced/pipeline/pipeline1.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml index cddc14b7..2a23d321 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml @@ -2,10 +2,8 @@ pipeline_name: PIPELINE1 pipeline_type: project output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/col_pipeline1.py" + path: "{looper.piface_dir}/col_pipeline1.py" command_template: > - {pipeline.var_templates.path} --project-name {project.name} + python3 {pipeline.var_templates.path} --project-name {project.name} + -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml index 43638d92..8e79b7ae 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml @@ -3,13 +3,11 @@ pipeline_type: sample input_schema: https://schema.databio.org/pep/2.0.0.yaml output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/pipeline1.py" + path: "{looper.piface_dir}/pipeline1.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml index 7c4a4223..824b7e09 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml @@ -2,12 +2,10 @@ pipeline_name: OTHER_PIPELINE2 pipeline_type: project output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/col_pipeline2.py" + path: "{looper.piface_dir}/col_pipeline2.py" command_template: > - {pipeline.var_templates.path} --project-name {project.name} + python3 {pipeline.var_templates.path} --project-name {project.name} compute: size_dependent_variables: resources-project.tsv -bioconductor: - readFunName: readData - readFunPath: readData.R + diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml index 987f7873..589aef6d 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml @@ -2,15 +2,12 @@ pipeline_name: OTHER_PIPELINE2 pipeline_type: sample output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/other_pipeline2.py" + path: "{looper.piface_dir}/other_pipeline2.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} compute: size_dependent_variables: resources-sample.tsv -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml index ff40c411..e687ea0d 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml @@ -3,13 +3,11 @@ pipeline_type: sample input_schema: https://schema.databio.org/pep/2.0.0.yaml output_schema: pipestat_output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/pipeline1.py" + path: "{looper.piface_dir}/pipeline1.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml index 79dcf50f..bac3ea3d 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml @@ -3,15 +3,13 @@ pipeline_type: sample input_schema: https://schema.databio.org/pep/2.0.0.yaml output_schema: pipestat_output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/other_pipeline2.py" + path: "{looper.piface_dir}/other_pipeline2.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} compute: size_dependent_variables: resources-sample.tsv -bioconductor: - readFunName: readData - readFunPath: readData.R + diff --git a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py index 97e866ee..6f6a4ab8 100755 --- a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py +++ b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py @@ -1,3 +1,5 @@ +import os.path + import pipestat import sys @@ -8,14 +10,17 @@ ] # this is the sample we wish to process by reading the number of lines sample_name = sys.argv[2] results_file = sys.argv[3] +schema_path = sys.argv[4] # Create pipestat manager and then report values psm = pipestat.PipestatManager( - schema_path="pipeline_pipestat/pipestat_output_schema.yaml", + schema_path=schema_path, results_file_path=results_file, record_identifier=sample_name, ) + +text_file = os.path.abspath(text_file) # Read text file and count lines with open(text_file, "r") as f: result = {"number_of_lines": len(f.readlines())} diff --git a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml index 1d26ac43..ec6cf255 100644 --- a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml +++ b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml @@ -2,4 +2,4 @@ pipeline_name: example_pipestat_pipeline pipeline_type: sample output_schema: pipestat_output_schema.yaml command_template: > - python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file} \ No newline at end of file + python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file} {pipestat.output_schema} \ No newline at end of file diff --git a/tests/smoketests/test_other.py b/tests/smoketests/test_other.py index 2527f4f2..b90e9b61 100644 --- a/tests/smoketests/test_other.py +++ b/tests/smoketests/test_other.py @@ -95,6 +95,30 @@ def test_pipestat_rerun(self, prep_temp_pep_pipestat, pipeline_name, flags): """Verify that rerun works with either failed or waiting flags""" tp = prep_temp_pep_pipestat _make_flags_pipestat(tp, flags, pipeline_name) + path_to_looper_config = prep_temp_pep_pipestat + pipestat_dir = os.path.dirname(path_to_looper_config) + + # open up the project config and replace the derived attributes with the path to the data. In a way, this simulates using the environment variables. + pipestat_project_file = get_project_config_path(path_to_looper_config) + + pipestat_pipeline_interface_file = os.path.join( + pipestat_dir, "pipeline_pipestat/pipeline_interface.yaml" + ) + + with open(pipestat_project_file, "r") as f: + pipestat_project_data = safe_load(f) + + pipestat_project_data["sample_modifiers"]["derive"]["sources"]["source1"] = ( + os.path.join(pipestat_dir, "data/{sample_name}.txt") + ) + + with open(pipestat_pipeline_interface_file, "r") as f: + pipestat_piface_data = safe_load(f) + + pipeline_name = pipestat_piface_data["pipeline_name"] + + with open(pipestat_project_file, "w") as f: + dump(pipestat_project_data, f) x = ["rerun", "--looper-config", tp] try: diff --git a/tests/test_comprehensive.py b/tests/test_comprehensive.py index cce74ca5..9b857f8f 100644 --- a/tests/test_comprehensive.py +++ b/tests/test_comprehensive.py @@ -167,6 +167,7 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): tsv_list = [os.path.join(sd, f) for f in os.listdir(sd) if f.endswith(".tsv")] assert len(tsv_list) == 0 with pytest.raises(RecordNotFoundError): + psm = PipestatManager(config_file=path_to_pipestat_config) retrieved_result = psm.retrieve_one(record_identifier="frog_2") From 7df88bfdb4ea7cae13a2fbaa69786796b767c60c Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Tue, 11 Jun 2024 15:39:35 -0400 Subject: [PATCH 5/7] add more doc strings --- looper/conductor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/looper/conductor.py b/looper/conductor.py index bcdd57df..ec074d52 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -519,6 +519,7 @@ def _generic_signal_handler(self, signal_type): sys.exit(1) def _terminate_current_subprocess(self): + """This terminates the current sub process associated with self.process_id""" def pskill(proc_pid, sig=signal.SIGINT): parent_process = psutil.Process(proc_pid) From 296b0cf096742b7decd46ce8695a7a3b1f51987b Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Tue, 11 Jun 2024 15:41:21 -0400 Subject: [PATCH 6/7] edit comments --- looper/conductor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looper/conductor.py b/looper/conductor.py index ec074d52..1b08961d 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -214,7 +214,7 @@ def __init__( self._curr_size = 0 self._failed_sample_names = [] self._curr_skip_pool = [] - self.process_id = None # this is used for a submitted subprocess + self.process_id = None # this is used for currently submitted subprocess if self.extra_pipe_args: _LOGGER.debug( From b9b53cc20804a582eaa7261299c558b399008133 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Tue, 11 Jun 2024 15:58:14 -0400 Subject: [PATCH 7/7] align command templates to be the same --- .../pipestat/pipeline_pipestat/pipeline_interface.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml index ec6cf255..e5a14402 100644 --- a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml +++ b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml @@ -2,4 +2,4 @@ pipeline_name: example_pipestat_pipeline pipeline_type: sample output_schema: pipestat_output_schema.yaml command_template: > - python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file} {pipestat.output_schema} \ No newline at end of file + python3 {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file} {pipestat.output_schema} \ No newline at end of file