diff --git a/.github/workflows/run-pytest.yml b/.github/workflows/run-pytest.yml index 6bf573b..cc3d35d 100644 --- a/.github/workflows/run-pytest.yml +++ b/.github/workflows/run-pytest.yml @@ -11,7 +11,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ["3.7", "3.8", "3.9", "3.10"] + python-version: ["3.8", "3.10"] os: [ubuntu-latest] steps: diff --git a/docs/changelog.md b/docs/changelog.md index d7281c6..edbdfef 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,17 @@ # Changelog +## [0.14.0] -- 2023-12-22 +### Changed +- refactor for pipestat v0.6.0 release +- drop python 2.7 +- updated requirements +- changed message_raw to be a value_dict when reporting to conform to pipestat +- ### Fixed +- fixed #196 and #197 +- ### Added +- added `force_overwrite` to `report_result` and `report_object` +- added pipestat_pipeline_type, defaulting to sample-level + ## [0.13.2] -- 2023-08-02 ### Fixed - fixed self.new_start overriding checkpoints. diff --git a/pypiper/_version.py b/pypiper/_version.py index 83ce76f..9e78220 100644 --- a/pypiper/_version.py +++ b/pypiper/_version.py @@ -1 +1 @@ -__version__ = "0.13.2" +__version__ = "0.14.0" diff --git a/pypiper/manager.py b/pypiper/manager.py index c3f4cf6..67441d6 100644 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -141,6 +141,7 @@ def __init__( pipestat_schema=None, pipestat_results_file=None, pipestat_config=None, + pipestat_pipeline_type=None, pipestat_result_formatter=None, **kwargs, ): @@ -329,32 +330,35 @@ def __init__( signal.signal(signal.SIGTERM, self._signal_term_handler) # pipestat setup - self.pipestat_sample_name = pipestat_sample_name or DEFAULT_SAMPLE_NAME - # getattr(self, "sample_name", DEFAULT_SAMPLE_NAME) + self.pipestat_record_identifier = pipestat_sample_name or DEFAULT_SAMPLE_NAME + self.pipestat_pipeline_type = pipestat_pipeline_type or "sample" # don't force default pipestat_results_file value unless # pipestat config not provided if pipestat_config is None and pipestat_results_file is None: - pipestat_results_file = pipeline_filepath( - self, filename="pipestat_results.yaml" - ) + self.pipestat_results_file = self.pipeline_stats_file + elif pipestat_results_file: + self.pipestat_results_file = pipestat_results_file + self.pipeline_stats_file = self.pipestat_results_file def _get_arg(args_dict, arg_name): """safely get argument from arg dict -- return None if doesn't exist""" return None if arg_name not in args_dict else args_dict[arg_name] self._pipestat_manager = PipestatManager( - sample_name=self.pipestat_sample_name + record_identifier=self.pipestat_record_identifier or _get_arg(args_dict, "pipestat_sample_name") or DEFAULT_SAMPLE_NAME, pipeline_name=self.name, schema_path=pipestat_schema or _get_arg(args_dict, "pipestat_schema") or default_pipestat_output_schema(sys.argv[0]), - results_file_path=self.pipeline_stats_file - or _get_arg(args_dict, "pipestat_results_file"), + results_file_path=self.pipestat_results_file + or _get_arg(args_dict, "pipestat_results_file") + or self.pipeline_stats_file, config_file=pipestat_config or _get_arg(args_dict, "pipestat_config"), multi_pipelines=multi, + pipeline_type=self.pipestat_pipeline_type, ) self.start_pipeline(args, multi) @@ -437,7 +441,7 @@ def _completed(self): :return bool: Whether the managed pipeline is in a completed state. """ return ( - self.pipestat.get_status(self._pipestat_manager.sample_name) + self.pipestat.get_status(self._pipestat_manager.record_identifier) == COMPLETE_FLAG ) @@ -448,7 +452,10 @@ def _failed(self): :return bool: Whether the managed pipeline is in a failed state. """ - return self.pipestat.get_status(self._pipestat_manager.sample_name) == FAIL_FLAG + return ( + self.pipestat.get_status(self._pipestat_manager.record_identifier) + == FAIL_FLAG + ) @property def halted(self): @@ -457,7 +464,8 @@ def halted(self): :return bool: Whether the managed pipeline is in a paused/halted state. """ return ( - self.pipestat.get_status(self._pipestat_manager.sample_name) == PAUSE_FLAG + self.pipestat.get_status(self._pipestat_manager.record_identifier) + == PAUSE_FLAG ) @property @@ -720,11 +728,12 @@ def start_pipeline(self, args=None, multi=False): results = self._pipestat_manager.__str__().split("\n") for i in results: self.info("* " + i) - self.info("* Sample name: " + self.pipestat_sample_name + "\n") + self.info("* Sample name: " + self.pipestat_record_identifier + "\n") self.info("\n----------------------------------------\n") self.status = "running" self.pipestat.set_status( - sample_name=self._pipestat_manager.sample_name, status_identifier="running" + record_identifier=self._pipestat_manager.record_identifier, + status_identifier="running", ) # Record the start in PIPE_profile and PIPE_commands output files so we @@ -770,7 +779,8 @@ def _set_status_flag(self, status): prev_status = self.status self.status = status self.pipestat.set_status( - sample_name=self._pipestat_manager.sample_name, status_identifier=status + record_identifier=self._pipestat_manager.record_identifier, + status_identifier=status, ) self.debug("\nChanged status from {} to {}.".format(prev_status, self.status)) @@ -786,8 +796,8 @@ def _flag_file_path(self, status=None): """ flag_file_name = "{}_{}_{}".format( - self._pipestat_manager["_pipeline_name"], - self.pipestat_sample_name, + self._pipestat_manager.pipeline_name, + self.pipestat_record_identifier, flag_name(status or self.status), ) return pipeline_filepath(self, filename=flag_file_name) @@ -1419,7 +1429,7 @@ def _wait_for_lock(self, lock_file): ) # self._set_status_flag(WAIT_FLAG) self.pipestat.set_status( - sample_name=self._pipestat_manager.sample_name, + record_identifier=self._pipestat_manager.record_identifier, status_identifier="waiting", ) first_message_flag = True @@ -1443,7 +1453,7 @@ def _wait_for_lock(self, lock_file): self.timestamp("File unlocked.") # self._set_status_flag(RUN_FLAG) self.pipestat.set_status( - sample_name=self._pipestat_manager.sample_name, + record_identifier=self._pipestat_manager.record_identifier, status_identifier="running", ) @@ -1582,7 +1592,9 @@ def _report_profile( with open(self.pipeline_profile_file, "a") as myfile: myfile.write(message_raw + "\n") - def report_result(self, key, value, nolog=False, result_formatter=None): + def report_result( + self, key, value, nolog=False, result_formatter=None, force_overwrite=False + ): """ Writes a key:value pair to self.pipeline_stats_file. @@ -1592,6 +1604,7 @@ def report_result(self, key, value, nolog=False, result_formatter=None): logfile. Use sparingly in case you will be printing the result in a different format. :param str result_formatter: function for formatting via pipestat backend + :param bool force_overwrite: overwrite results if they already exist? :return str reported_result: the reported result is returned as a list of formatted strings. """ @@ -1602,13 +1615,19 @@ def report_result(self, key, value, nolog=False, result_formatter=None): reported_result = self.pipestat.report( values={key: value}, - sample_name=self.pipestat_sample_name, + record_identifier=self.pipestat_record_identifier, result_formatter=rf, + force_overwrite=force_overwrite, ) if not nolog: - for r in reported_result: - self.info(r) + if isinstance( + reported_result, bool + ): # Pipestat can return False if results are NOT reported. + self.info("Result successfully reported? " + str(reported_result)) + else: + for r in reported_result: + self.info(r) return reported_result @@ -1621,6 +1640,7 @@ def report_object( annotation=None, nolog=False, result_formatter=None, + force_overwrite=False, ): """ Writes a key:value pair to self.pipeline_stats_file. Note: this function @@ -1641,6 +1661,7 @@ def report_object( logfile. Use sparingly in case you will be printing the result in a different format. :param str result_formatter: function for formatting via pipestat backend + :param bool force_overwrite: overwrite results if they already exist? :return str reported_result: the reported result is returned as a list of formatted strings. """ warnings.warn( @@ -1659,37 +1680,30 @@ def report_object( anchor_text = str(key).strip() # better to use a relative path in this file # convert any absolute paths into relative paths - relative_filename = ( - os.path.relpath(filename, self.outfolder) - if os.path.isabs(filename) - else filename - ) - - if anchor_image: - relative_anchor_image = ( - os.path.relpath(anchor_image, self.outfolder) - if os.path.isabs(anchor_image) - else anchor_image - ) - else: - relative_anchor_image = "None" - message_raw = "{filename}\t{anchor_text}\t{anchor_image}\t{annotation}".format( - filename=relative_filename, - anchor_text=anchor_text, - anchor_image=relative_anchor_image, - annotation=annotation, - ) - - val = {key: message_raw.replace("\t", " ")} + values = { + "path": filename, + "thumbnail_path": anchor_image, + "title": anchor_text, + "annotation": annotation, + } + val = {key: values} reported_result = self.pipestat.report( - values=val, sample_name=self.pipestat_sample_name, result_formatter=rf + values=val, + record_identifier=self.pipestat_record_identifier, + result_formatter=rf, + force_overwrite=force_overwrite, ) + if not nolog: - for r in reported_result: - self.info(r) - return reported_result + if isinstance( + reported_result, bool + ): # Pipestat can return False if results are NOT reported. + self.info("Result successfully reported? " + str(reported_result)) + else: + for r in reported_result: + self.info(r) def _safe_write_to_file(self, file, message): """ @@ -1849,15 +1863,11 @@ def _refresh_stats(self): if os.path.isfile(self.pipeline_stats_file): _, data = read_yaml_data(path=self.pipeline_stats_file, what="stats_file") - print(data) - pipeline_key = list( - data[self.pipestat["_pipeline_name"]][self.pipestat["_pipeline_type"]] - )[0] - if self.name == pipeline_key: - for key, value in data[self.pipestat["_pipeline_name"]][ - self.pipestat["_pipeline_type"] - ][pipeline_key].items(): - self.stats_dict[key] = value.strip() + + for key, value in data[self._pipestat_manager.pipeline_name][ + self._pipestat_manager.pipeline_type + ][self._pipestat_manager.record_identifier].items(): + self.stats_dict[key] = value def get_stat(self, key): """ @@ -1989,12 +1999,12 @@ def complete(self): """Stop a completely finished pipeline.""" self.stop_pipeline(status=COMPLETE_FLAG) - def fail_pipeline(self, exc, dynamic_recover=False): + def fail_pipeline(self, exc: Exception, dynamic_recover: bool = False): """ If the pipeline does not complete, this function will stop the pipeline gracefully. It sets the status flag to failed and skips the normal success completion procedure. - :param Exception e: Exception to raise. + :param Exception exc: Exception to raise. :param bool dynamic_recover: Whether to recover e.g. for job termination. """ # Take care of any active running subprocess @@ -2024,9 +2034,8 @@ def fail_pipeline(self, exc, dynamic_recover=False): total_time = datetime.timedelta(seconds=self.time_elapsed(self.starttime)) self.info("Total time: " + str(total_time)) self.info("Failure reason: " + str(exc)) - # self._set_status_flag(FAIL_FLAG) self.pipestat.set_status( - sample_name=self._pipestat_manager.sample_name, + record_identifier=self._pipestat_manager.record_identifier, status_identifier="failed", ) @@ -2087,7 +2096,8 @@ def stop_pipeline(self, status=COMPLETE_FLAG): """ # self._set_status_flag(status) self.pipestat.set_status( - sample_name=self._pipestat_manager.sample_name, status_identifier=status + record_identifier=self._pipestat_manager.record_identifier, + status_identifier=status, ) self._cleanup() elapsed_time_this_run = str( @@ -2457,8 +2467,8 @@ def _cleanup(self, dry_run=False): for fn in glob.glob(self.outfolder + flag_name("*")) if COMPLETE_FLAG not in os.path.basename(fn) and not "{}_{}_{}".format( - self._pipestat_manager["_pipeline_name"], - self.pipestat_sample_name, + self._pipestat_manager.pipeline_name, + self.pipestat_record_identifier, run_flag, ) == os.path.basename(fn) diff --git a/pypiper/pipeline.py b/pypiper/pipeline.py index 88c6173..95d31f2 100644 --- a/pypiper/pipeline.py +++ b/pypiper/pipeline.py @@ -330,7 +330,7 @@ def run(self, start_point=None, stop_before=None, stop_after=None): # between results from different stages. skip_mode = False - print("Running stage: {}".format(stage)) + print(f"Running stage: {getattr(stage, 'name', str(stage))}") stage.run() self.executed.append(stage) diff --git a/pypiper/utils.py b/pypiper/utils.py index 2c5ac75..8973466 100644 --- a/pypiper/utils.py +++ b/pypiper/utils.py @@ -387,20 +387,20 @@ def split_by_pipes(cmd): cmdlist = [] newcmd = str() for char in cmd: - if char is "{": + if char == "{": stack_brace.append("{") - elif char is "}": + elif char == "}": stack_brace.pop() - elif char is "(": + elif char == "(": stack_paren.append("(") - elif char is ")": + elif char == ")": stack_paren.pop() if len(stack_brace) > 0 or len(stack_paren) > 0: # We are inside a parenthetic of some kind; emit character # no matter what it is newcmd += char - elif char is "|": + elif char == "|": # if it's a pipe, finish the command and start a new one cmdlist.append(newcmd) newcmd = str() @@ -1110,9 +1110,11 @@ def _add_args(parser, args, required): return parser -def result_formatter_markdown(pipeline_name, sample_name, res_id, value) -> str: +def result_formatter_markdown(pipeline_name, record_identifier, res_id, value) -> str: """ Returns Markdown formatted value as string + + # Pipeline_name and record_identifier should be kept because pipestat needs it """ message_markdown = "\n> `{key}`\t{value}\t_RES_".format(key=res_id, value=value) diff --git a/requirements/requirements-docs.txt b/requirements/requirements-docs.txt index 4471914..ef5b4e6 100644 --- a/requirements/requirements-docs.txt +++ b/requirements/requirements-docs.txt @@ -2,5 +2,5 @@ mkdocs>=1.0 markdown-include pydoc-markdown piper -pipestat>=0.4.0 +pipestat>=0.6.0 https://github.com/databio/mkdocs-databio/archive/master.zip \ No newline at end of file diff --git a/requirements/requirements-pypiper.txt b/requirements/requirements-pypiper.txt index 886be3e..9a35f34 100644 --- a/requirements/requirements-pypiper.txt +++ b/requirements/requirements-pypiper.txt @@ -1,7 +1,6 @@ -attmap>=0.12.5 logmuse>=0.2.4 psutil pandas ubiquerg>=0.4.5 yacman -pipestat>=0.4.0 +pipestat>=0.6.0 diff --git a/setup.py b/setup.py index d485071..06a9d08 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,10 @@ def read_reqs_file(reqs_name): classifiers=[ "Development Status :: 4 - Beta", "License :: OSI Approved :: BSD License", - "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "Topic :: Scientific/Engineering :: Bio-Informatics", ], author="Nathan Sheffield, Johanna Klughammer, Andre Rendeiro",