Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.14.0 Release #200

Merged
merged 23 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0c9b16f
try getting just stage name, but fall back to str representation of s…
vreuter Sep 5, 2023
0b430c7
version 0.13.3a1 for pipestat 0.6.0a1
donaldcampbelljr Oct 4, 2023
d9c818d
updated to pipestat 0.6.0
khoroshevskyi Nov 7, 2023
e30fe6e
Merge branch 'dev' into new_pipestat
khoroshevskyi Nov 7, 2023
e00e8d8
updated requirements
khoroshevskyi Nov 7, 2023
39b6e99
testing, drop python 3.7
khoroshevskyi Nov 7, 2023
6914fd5
Merge pull request #198 from vreuter/vr/stage-log-message-197
khoroshevskyi Nov 7, 2023
1c2c844
fix f-string quote issue for python 3.10
donaldcampbelljr Nov 8, 2023
5b46682
minor refactor to use pipestat properties instead of cfg dict
donaldcampbelljr Nov 10, 2023
d360ba1
update changelog and version number
donaldcampbelljr Nov 10, 2023
4db1637
Merge pull request #199 from databio/new_pipestat
donaldcampbelljr Nov 10, 2023
4ea21c2
update v0.13.3 and changelog
donaldcampbelljr Nov 14, 2023
ed6b7fb
fix _refresh_stats bug and change version to 0.14.0
donaldcampbelljr Nov 14, 2023
b0c5f8d
potential fix for #201
donaldcampbelljr Nov 15, 2023
d7b9c8c
changelog
donaldcampbelljr Nov 15, 2023
f92d034
v0.14.0a1 prerelease
donaldcampbelljr Nov 15, 2023
1a677da
report_object -> change message_raw to be a values dict to conform wi…
donaldcampbelljr Nov 20, 2023
ed95993
self.pipestat_results_file should take priority over self.pipeline_st…
donaldcampbelljr Nov 20, 2023
cc84070
make pipestat_results_file = pipeline_stats_file if it is not provided
donaldcampbelljr Nov 27, 2023
2de4e84
set pipeline_stats_file if pipestat_results_file IS provided, remove …
donaldcampbelljr Nov 27, 2023
d544644
add pipestat_pipeline_type, defaulting to sample
donaldcampbelljr Dec 8, 2023
649c985
pipestat req version bump, v0.14.0a2 bump for pre-release
donaldcampbelljr Dec 14, 2023
9349aa8
v0.14.0 release prep
donaldcampbelljr Dec 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
python-version: ["3.8", "3.10"]
os: [ubuntu-latest]

steps:
Expand Down
12 changes: 12 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## [0.14.0] -- 2023-12-22
### Changed
- refactor for pipestat v0.6.0 release
- drop python 2.7
- updated requirements
- changed message_raw to be a value_dict when reporting to conform to pipestat
- ### Fixed
- fixed #196 and #197
- ### Added
- added `force_overwrite` to `report_result` and `report_object`
- added pipestat_pipeline_type, defaulting to sample-level

## [0.13.2] -- 2023-08-02
### Fixed
- fixed self.new_start overriding checkpoints.
Expand Down
2 changes: 1 addition & 1 deletion pypiper/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.13.2"
__version__ = "0.14.0"
138 changes: 74 additions & 64 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(
pipestat_schema=None,
pipestat_results_file=None,
pipestat_config=None,
pipestat_pipeline_type=None,
pipestat_result_formatter=None,
**kwargs,
):
Expand Down Expand Up @@ -329,32 +330,35 @@ def __init__(
signal.signal(signal.SIGTERM, self._signal_term_handler)

# pipestat setup
self.pipestat_sample_name = pipestat_sample_name or DEFAULT_SAMPLE_NAME
# getattr(self, "sample_name", DEFAULT_SAMPLE_NAME)
self.pipestat_record_identifier = pipestat_sample_name or DEFAULT_SAMPLE_NAME
self.pipestat_pipeline_type = pipestat_pipeline_type or "sample"

# don't force default pipestat_results_file value unless
# pipestat config not provided
if pipestat_config is None and pipestat_results_file is None:
pipestat_results_file = pipeline_filepath(
self, filename="pipestat_results.yaml"
)
self.pipestat_results_file = self.pipeline_stats_file
elif pipestat_results_file:
self.pipestat_results_file = pipestat_results_file
self.pipeline_stats_file = self.pipestat_results_file

def _get_arg(args_dict, arg_name):
"""safely get argument from arg dict -- return None if doesn't exist"""
return None if arg_name not in args_dict else args_dict[arg_name]

self._pipestat_manager = PipestatManager(
sample_name=self.pipestat_sample_name
record_identifier=self.pipestat_record_identifier
or _get_arg(args_dict, "pipestat_sample_name")
or DEFAULT_SAMPLE_NAME,
pipeline_name=self.name,
schema_path=pipestat_schema
or _get_arg(args_dict, "pipestat_schema")
or default_pipestat_output_schema(sys.argv[0]),
results_file_path=self.pipeline_stats_file
or _get_arg(args_dict, "pipestat_results_file"),
results_file_path=self.pipestat_results_file
or _get_arg(args_dict, "pipestat_results_file")
or self.pipeline_stats_file,
config_file=pipestat_config or _get_arg(args_dict, "pipestat_config"),
multi_pipelines=multi,
pipeline_type=self.pipestat_pipeline_type,
)

self.start_pipeline(args, multi)
Expand Down Expand Up @@ -437,7 +441,7 @@ def _completed(self):
:return bool: Whether the managed pipeline is in a completed state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.sample_name)
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== COMPLETE_FLAG
)

Expand All @@ -448,7 +452,10 @@ def _failed(self):

:return bool: Whether the managed pipeline is in a failed state.
"""
return self.pipestat.get_status(self._pipestat_manager.sample_name) == FAIL_FLAG
return (
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== FAIL_FLAG
)

@property
def halted(self):
Expand All @@ -457,7 +464,8 @@ def halted(self):
:return bool: Whether the managed pipeline is in a paused/halted state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.sample_name) == PAUSE_FLAG
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== PAUSE_FLAG
)

@property
Expand Down Expand Up @@ -720,11 +728,12 @@ def start_pipeline(self, args=None, multi=False):
results = self._pipestat_manager.__str__().split("\n")
for i in results:
self.info("* " + i)
self.info("* Sample name: " + self.pipestat_sample_name + "\n")
self.info("* Sample name: " + self.pipestat_record_identifier + "\n")
self.info("\n----------------------------------------\n")
self.status = "running"
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name, status_identifier="running"
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="running",
)

# Record the start in PIPE_profile and PIPE_commands output files so we
Expand Down Expand Up @@ -770,7 +779,8 @@ def _set_status_flag(self, status):
prev_status = self.status
self.status = status
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name, status_identifier=status
record_identifier=self._pipestat_manager.record_identifier,
status_identifier=status,
)
self.debug("\nChanged status from {} to {}.".format(prev_status, self.status))

Expand All @@ -786,8 +796,8 @@ def _flag_file_path(self, status=None):
"""

flag_file_name = "{}_{}_{}".format(
self._pipestat_manager["_pipeline_name"],
self.pipestat_sample_name,
self._pipestat_manager.pipeline_name,
self.pipestat_record_identifier,
flag_name(status or self.status),
)
return pipeline_filepath(self, filename=flag_file_name)
Expand Down Expand Up @@ -1419,7 +1429,7 @@ def _wait_for_lock(self, lock_file):
)
# self._set_status_flag(WAIT_FLAG)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="waiting",
)
first_message_flag = True
Expand All @@ -1443,7 +1453,7 @@ def _wait_for_lock(self, lock_file):
self.timestamp("File unlocked.")
# self._set_status_flag(RUN_FLAG)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="running",
)

Expand Down Expand Up @@ -1582,7 +1592,9 @@ def _report_profile(
with open(self.pipeline_profile_file, "a") as myfile:
myfile.write(message_raw + "\n")

def report_result(self, key, value, nolog=False, result_formatter=None):
def report_result(
self, key, value, nolog=False, result_formatter=None, force_overwrite=False
):
"""
Writes a key:value pair to self.pipeline_stats_file.

Expand All @@ -1592,6 +1604,7 @@ def report_result(self, key, value, nolog=False, result_formatter=None):
logfile. Use sparingly in case you will be printing the result in a
different format.
:param str result_formatter: function for formatting via pipestat backend
:param bool force_overwrite: overwrite results if they already exist?
:return str reported_result: the reported result is returned as a list of formatted strings.

"""
Expand All @@ -1602,13 +1615,19 @@ def report_result(self, key, value, nolog=False, result_formatter=None):

reported_result = self.pipestat.report(
values={key: value},
sample_name=self.pipestat_sample_name,
record_identifier=self.pipestat_record_identifier,
result_formatter=rf,
force_overwrite=force_overwrite,
)

if not nolog:
for r in reported_result:
self.info(r)
if isinstance(
reported_result, bool
): # Pipestat can return False if results are NOT reported.
self.info("Result successfully reported? " + str(reported_result))
else:
for r in reported_result:
self.info(r)

return reported_result

Expand All @@ -1621,6 +1640,7 @@ def report_object(
annotation=None,
nolog=False,
result_formatter=None,
force_overwrite=False,
):
"""
Writes a key:value pair to self.pipeline_stats_file. Note: this function
Expand All @@ -1641,6 +1661,7 @@ def report_object(
logfile. Use sparingly in case you will be printing the result in a
different format.
:param str result_formatter: function for formatting via pipestat backend
:param bool force_overwrite: overwrite results if they already exist?
:return str reported_result: the reported result is returned as a list of formatted strings.
"""
warnings.warn(
Expand All @@ -1659,37 +1680,30 @@ def report_object(
anchor_text = str(key).strip()
# better to use a relative path in this file
# convert any absolute paths into relative paths
relative_filename = (
os.path.relpath(filename, self.outfolder)
if os.path.isabs(filename)
else filename
)

if anchor_image:
relative_anchor_image = (
os.path.relpath(anchor_image, self.outfolder)
if os.path.isabs(anchor_image)
else anchor_image
)
else:
relative_anchor_image = "None"

message_raw = "{filename}\t{anchor_text}\t{anchor_image}\t{annotation}".format(
filename=relative_filename,
anchor_text=anchor_text,
anchor_image=relative_anchor_image,
annotation=annotation,
)

val = {key: message_raw.replace("\t", " ")}
values = {
"path": filename,
"thumbnail_path": anchor_image,
"title": anchor_text,
"annotation": annotation,
}
val = {key: values}

reported_result = self.pipestat.report(
values=val, sample_name=self.pipestat_sample_name, result_formatter=rf
values=val,
record_identifier=self.pipestat_record_identifier,
result_formatter=rf,
force_overwrite=force_overwrite,
)

if not nolog:
for r in reported_result:
self.info(r)
return reported_result
if isinstance(
reported_result, bool
): # Pipestat can return False if results are NOT reported.
self.info("Result successfully reported? " + str(reported_result))
else:
for r in reported_result:
self.info(r)

def _safe_write_to_file(self, file, message):
"""
Expand Down Expand Up @@ -1849,15 +1863,11 @@ def _refresh_stats(self):

if os.path.isfile(self.pipeline_stats_file):
_, data = read_yaml_data(path=self.pipeline_stats_file, what="stats_file")
print(data)
pipeline_key = list(
data[self.pipestat["_pipeline_name"]][self.pipestat["_pipeline_type"]]
)[0]
if self.name == pipeline_key:
for key, value in data[self.pipestat["_pipeline_name"]][
self.pipestat["_pipeline_type"]
][pipeline_key].items():
self.stats_dict[key] = value.strip()

for key, value in data[self._pipestat_manager.pipeline_name][
self._pipestat_manager.pipeline_type
][self._pipestat_manager.record_identifier].items():
self.stats_dict[key] = value

def get_stat(self, key):
"""
Expand Down Expand Up @@ -1989,12 +1999,12 @@ def complete(self):
"""Stop a completely finished pipeline."""
self.stop_pipeline(status=COMPLETE_FLAG)

def fail_pipeline(self, exc, dynamic_recover=False):
def fail_pipeline(self, exc: Exception, dynamic_recover: bool = False):
"""
If the pipeline does not complete, this function will stop the pipeline gracefully.
It sets the status flag to failed and skips the normal success completion procedure.

:param Exception e: Exception to raise.
:param Exception exc: Exception to raise.
:param bool dynamic_recover: Whether to recover e.g. for job termination.
"""
# Take care of any active running subprocess
Expand Down Expand Up @@ -2024,9 +2034,8 @@ def fail_pipeline(self, exc, dynamic_recover=False):
total_time = datetime.timedelta(seconds=self.time_elapsed(self.starttime))
self.info("Total time: " + str(total_time))
self.info("Failure reason: " + str(exc))
# self._set_status_flag(FAIL_FLAG)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="failed",
)

Expand Down Expand Up @@ -2087,7 +2096,8 @@ def stop_pipeline(self, status=COMPLETE_FLAG):
"""
# self._set_status_flag(status)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name, status_identifier=status
record_identifier=self._pipestat_manager.record_identifier,
status_identifier=status,
)
self._cleanup()
elapsed_time_this_run = str(
Expand Down Expand Up @@ -2457,8 +2467,8 @@ def _cleanup(self, dry_run=False):
for fn in glob.glob(self.outfolder + flag_name("*"))
if COMPLETE_FLAG not in os.path.basename(fn)
and not "{}_{}_{}".format(
self._pipestat_manager["_pipeline_name"],
self.pipestat_sample_name,
self._pipestat_manager.pipeline_name,
self.pipestat_record_identifier,
run_flag,
)
== os.path.basename(fn)
Expand Down
2 changes: 1 addition & 1 deletion pypiper/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def run(self, start_point=None, stop_before=None, stop_after=None):
# between results from different stages.
skip_mode = False

print("Running stage: {}".format(stage))
print(f"Running stage: {getattr(stage, 'name', str(stage))}")

stage.run()
self.executed.append(stage)
Expand Down
14 changes: 8 additions & 6 deletions pypiper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,20 @@ def split_by_pipes(cmd):
cmdlist = []
newcmd = str()
for char in cmd:
if char is "{":
if char == "{":
stack_brace.append("{")
elif char is "}":
elif char == "}":
stack_brace.pop()
elif char is "(":
elif char == "(":
stack_paren.append("(")
elif char is ")":
elif char == ")":
stack_paren.pop()

if len(stack_brace) > 0 or len(stack_paren) > 0:
# We are inside a parenthetic of some kind; emit character
# no matter what it is
newcmd += char
elif char is "|":
elif char == "|":
# if it's a pipe, finish the command and start a new one
cmdlist.append(newcmd)
newcmd = str()
Expand Down Expand Up @@ -1110,9 +1110,11 @@ def _add_args(parser, args, required):
return parser


def result_formatter_markdown(pipeline_name, sample_name, res_id, value) -> str:
def result_formatter_markdown(pipeline_name, record_identifier, res_id, value) -> str:
"""
Returns Markdown formatted value as string

# Pipeline_name and record_identifier should be kept because pipestat needs it
"""

message_markdown = "\n> `{key}`\t{value}\t_RES_".format(key=res_id, value=value)
Expand Down
Loading
Loading