From c5b9a1df99ad6ae0ce402bc8701c29d73d68347b Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Fri, 8 May 2020 00:50:40 -0400 Subject: [PATCH 01/18] editing to_job, so it doesnt create a deep copy of a task when the task has a state, but instead a soft copy of the task and the input is saved in pickle files, so it could be load later (in specific workers) --- pydra/engine/core.py | 60 +++++++++++++++++++++++++++++++++------ pydra/engine/submitter.py | 24 ++++++++++++---- pydra/engine/workers.py | 9 +++++- 3 files changed, 78 insertions(+), 15 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index e8e1dfc4fa..971f8db899 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -6,7 +6,7 @@ import os from pathlib import Path import typing as ty -from copy import deepcopy +from copy import deepcopy, copy import cloudpickle as cp from filelock import SoftFileLock @@ -359,6 +359,16 @@ def __call__(self, submitter=None, plugin=None, rerun=False, **kwargs): res = self._run(rerun=rerun, **kwargs) return res + def _load_and_run(self, ind, task_pkl, input_pkl, rerun=False, **kwargs): + """ loading the task and inputs from pickle files, + settings proper input for specific index before running the task + """ + task_orig = cp.loads(input_pkl.read_bytes()) + task = cp.loads(task_pkl.read_bytes()) + _, inputs_dict = task_orig.get_input_el(ind) + task.inputs = attr.evolve(task.inputs, **inputs_dict) + return task._run(rerun=rerun, **kwargs) + def _run(self, rerun=False, **kwargs): self.inputs = attr.evolve(self.inputs, **kwargs) self.inputs.check_fields_input_spec() @@ -499,15 +509,35 @@ def get_input_el(self, ind): return None, inputs_dict def to_job(self, ind): - """Run interface one element generated from node_state.""" + """ Pickling tasks and inputs, so don't have to load the input to teh memory """ # logger.debug("Run interface el, name={}, ind={}".format(self.name, ind)) - el = deepcopy(self) - el.state = None - # dj might be needed - # el._checksum = None - _, inputs_dict = self.get_input_el(ind) - el.inputs = attr.evolve(el.inputs, **inputs_dict) - return el + # copy the task and setting input/state to None + + pkl_files = self.cache_dir / "pkl_files" + pkl_files.mkdir(exist_ok=True) + task_path = pkl_files / f"task_{self.name}.pklz" + input_path = pkl_files / f"task_orig_input_{self.name}.pklz" + + # the pickle files should be independent on index, so could be saved once only + if ind == 0: + task_copy = copy(self) + task_copy.state = None + task_copy.inputs = attr.evolve( + self.inputs, **{k: None for k in self.input_names} + ) + + # saving the task object (no input) + with task_path.open("wb") as fp: + cp.dump(task_copy, fp) + + # saving the original task with the full input + # so can be later used to set input to all of the tasks + with input_path.open("wb") as fp: + cp.dump(self, fp) + + # index, path to the pkl task, path to the pkl original task with input, + # and self (to be able to check properties when needed) + return (ind, task_path, input_path, self) @property def done(self): @@ -823,6 +853,18 @@ def create_connections(self, task): combiner=combiner, ) + async def _load_and_run( + self, ind, task_pkl, input_pkl, submitter=None, rerun=False, **kwargs + ): + """ loading the workflow and inputs from pickle files, + settings proper input for specific index before running the workflow + """ + task_orig = cp.loads(input_pkl.read_bytes()) + task = cp.loads(task_pkl.read_bytes()) + _, inputs_dict = task_orig.get_input_el(ind) + task.inputs = attr.evolve(task.inputs, **inputs_dict) + await task._run(submitter=submitter, rerun=rerun, **kwargs) + async def _run(self, submitter=None, rerun=False, **kwargs): # self.inputs = dc.replace(self.inputs, **kwargs) don't need it? checksum = self.checksum diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index a397a86ad1..fc4ab2b9df 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -64,11 +64,25 @@ def __call__(self, runnable, cache_locations=None, rerun=False): async def submit_workflow(self, workflow, rerun=False): """Distribute or initiate workflow execution.""" - if workflow.plugin and workflow.plugin != self.plugin: - # dj: this is not tested!!! - await self.worker.run_el(workflow, rerun=rerun) - else: - await workflow._run(self, rerun=rerun) + if is_workflow(workflow): + if workflow.plugin and workflow.plugin != self.plugin: + # dj: this is not tested!!! TODO + await self.worker.run_el(workflow, rerun=rerun) + else: + await workflow._run(self, rerun=rerun) + else: # could be a tuple with paths to pickle files wiith tasks and inputs + ind, wf_pkl, input_pkl, wf_orig = workflow + if wf_orig.plugin and wf_orig.plugin != self.plugin: + # dj: this is not tested!!! TODO + await self.worker.run_el(workflow, rerun=rerun) + else: + await wf_orig._load_and_run( + ind=ind, + task_pkl=wf_pkl, + input_pkl=input_pkl, + submitter=self, + rerun=rerun, + ) async def submit(self, runnable, wait=False, rerun=False): """ diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 4453f9d0a0..84951d5d71 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -7,6 +7,7 @@ import concurrent.futures as cf +from .core import TaskBase, is_workflow from .helpers import create_pyscript, get_available_cpus, read_and_display_async, save import logging @@ -177,7 +178,13 @@ def run_el(self, runnable, rerun=False, **kwargs): async def exec_as_coro(self, runnable, rerun=False): """Run a task (coroutine wrapper).""" - res = await self.loop.run_in_executor(self.pool, runnable._run, rerun) + if isinstance(runnable, TaskBase): + res = await self.loop.run_in_executor(self.pool, runnable._run, rerun) + else: # it could be tuple that includes pickle files with tasks and inputs + ind, task_pkl, input_pkl, task_orig = runnable + res = await self.loop.run_in_executor( + self.pool, task_orig._load_and_run, ind, task_pkl, input_pkl, rerun + ) return res def close(self): From 8efa963d3cfdfb65f7cdb524a14bb374b7abd6ba Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Thu, 28 May 2020 23:48:32 -0400 Subject: [PATCH 02/18] removing load_and_run from task and Workflow, creating standalone functions --- pydra/engine/core.py | 22 ---------------------- pydra/engine/helpers.py | 24 ++++++++++++++++++++++++ pydra/engine/submitter.py | 4 ++-- pydra/engine/tests/test_node_task.py | 11 +++++++++++ pydra/engine/workers.py | 12 +++++++++--- 5 files changed, 46 insertions(+), 27 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 971f8db899..bbccaefb00 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -359,16 +359,6 @@ def __call__(self, submitter=None, plugin=None, rerun=False, **kwargs): res = self._run(rerun=rerun, **kwargs) return res - def _load_and_run(self, ind, task_pkl, input_pkl, rerun=False, **kwargs): - """ loading the task and inputs from pickle files, - settings proper input for specific index before running the task - """ - task_orig = cp.loads(input_pkl.read_bytes()) - task = cp.loads(task_pkl.read_bytes()) - _, inputs_dict = task_orig.get_input_el(ind) - task.inputs = attr.evolve(task.inputs, **inputs_dict) - return task._run(rerun=rerun, **kwargs) - def _run(self, rerun=False, **kwargs): self.inputs = attr.evolve(self.inputs, **kwargs) self.inputs.check_fields_input_spec() @@ -853,18 +843,6 @@ def create_connections(self, task): combiner=combiner, ) - async def _load_and_run( - self, ind, task_pkl, input_pkl, submitter=None, rerun=False, **kwargs - ): - """ loading the workflow and inputs from pickle files, - settings proper input for specific index before running the workflow - """ - task_orig = cp.loads(input_pkl.read_bytes()) - task = cp.loads(task_pkl.read_bytes()) - _, inputs_dict = task_orig.get_input_el(ind) - task.inputs = attr.evolve(task.inputs, **inputs_dict) - await task._run(submitter=submitter, rerun=rerun, **kwargs) - async def _run(self, submitter=None, rerun=False, **kwargs): # self.inputs = dc.replace(self.inputs, **kwargs) don't need it? checksum = self.checksum diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 860d5fcd04..16c3af7624 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -544,3 +544,27 @@ def get_available_cpus(): # Last resort return os.cpu_count() + + +def load_and_run(ind, task_pkl, input_pkl, rerun=False, **kwargs): + """ loading the task and inputs from pickle files, + settings proper input for specific index before running the task + """ + task_orig = cp.loads(input_pkl.read_bytes()) + task = cp.loads(task_pkl.read_bytes()) + _, inputs_dict = task_orig.get_input_el(ind) + task.inputs = attr.evolve(task.inputs, **inputs_dict) + return task._run(rerun=rerun, **kwargs) + + +async def load_and_run_async( + ind, task_pkl, input_pkl, submitter=None, rerun=False, **kwargs +): + """ loading the workflow and inputs from pickle files, + settings proper input for specific index before running the workflow + """ + task_orig = cp.loads(input_pkl.read_bytes()) + task = cp.loads(task_pkl.read_bytes()) + _, inputs_dict = task_orig.get_input_el(ind) + task.inputs = attr.evolve(task.inputs, **inputs_dict) + await task._run(submitter=submitter, rerun=rerun, **kwargs) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index fc4ab2b9df..e3868348e4 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -2,7 +2,7 @@ import asyncio from .workers import SerialWorker, ConcurrentFuturesWorker, SlurmWorker, DaskWorker from .core import is_workflow -from .helpers import get_open_loop +from .helpers import get_open_loop, load_and_run_async import logging @@ -76,7 +76,7 @@ async def submit_workflow(self, workflow, rerun=False): # dj: this is not tested!!! TODO await self.worker.run_el(workflow, rerun=rerun) else: - await wf_orig._load_and_run( + await load_and_run_async( ind=ind, task_pkl=wf_pkl, input_pkl=input_pkl, diff --git a/pydra/engine/tests/test_node_task.py b/pydra/engine/tests/test_node_task.py index f8a9479853..602c7a020a 100644 --- a/pydra/engine/tests/test_node_task.py +++ b/pydra/engine/tests/test_node_task.py @@ -378,6 +378,17 @@ def test_task_nostate_1(plugin_dask_opt): assert nn.output_dir.exists() +def test_task_nostate_1_call(): + """ task without splitter""" + nn = fun_addtwo(name="NA", a=3) + nn() + # checking the results + results = nn.result() + assert results.output.out == 5 + # checking the output_dir + assert nn.output_dir.exists() + + @pytest.mark.flaky(reruns=2) # when dask def test_task_nostate_1_call_subm(plugin_dask_opt): """ task without splitter""" diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 84951d5d71..5f966a767c 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -7,8 +7,14 @@ import concurrent.futures as cf -from .core import TaskBase, is_workflow -from .helpers import create_pyscript, get_available_cpus, read_and_display_async, save +from .core import TaskBase +from .helpers import ( + create_pyscript, + get_available_cpus, + read_and_display_async, + save, + load_and_run, +) import logging @@ -183,7 +189,7 @@ async def exec_as_coro(self, runnable, rerun=False): else: # it could be tuple that includes pickle files with tasks and inputs ind, task_pkl, input_pkl, task_orig = runnable res = await self.loop.run_in_executor( - self.pool, task_orig._load_and_run, ind, task_pkl, input_pkl, rerun + self.pool, load_and_run, ind, task_pkl, input_pkl, rerun ) return res From fb54adfb5050253380f3bd0cfa4de57957ba2a88 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sat, 30 May 2020 22:21:27 -0400 Subject: [PATCH 03/18] simplifying load_and_run, adding a tets --- pydra/engine/core.py | 19 ++++--------------- pydra/engine/helpers.py | 18 ++++++++---------- pydra/engine/submitter.py | 8 ++------ pydra/engine/tests/test_helpers.py | 17 +++++++++++++++++ pydra/engine/tests/test_task.py | 8 +------- pydra/engine/workers.py | 4 ++-- 6 files changed, 34 insertions(+), 40 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index f327c882e7..ce4773c3d8 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -510,29 +510,18 @@ def to_job(self, ind): pkl_files = self.cache_dir / "pkl_files" pkl_files.mkdir(exist_ok=True) - task_path = pkl_files / f"task_{self.name}.pklz" - input_path = pkl_files / f"task_orig_input_{self.name}.pklz" + task_main_path = pkl_files / f"task_main_{self.name}.pklz" # the pickle files should be independent on index, so could be saved once only if ind == 0: - task_copy = copy(self) - task_copy.state = None - task_copy.inputs = attr.evolve( - self.inputs, **{k: None for k in self.input_names} - ) - - # saving the task object (no input) - with task_path.open("wb") as fp: - cp.dump(task_copy, fp) - # saving the original task with the full input # so can be later used to set input to all of the tasks - with input_path.open("wb") as fp: + with task_main_path.open("wb") as fp: cp.dump(self, fp) - # index, path to the pkl task, path to the pkl original task with input, + # index, path to the pickled original task with input, # and self (to be able to check properties when needed) - return (ind, task_path, input_path, self) + return (ind, task_main_path, self) @property def done(self): diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 16c3af7624..1ff64d2289 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -546,25 +546,23 @@ def get_available_cpus(): return os.cpu_count() -def load_and_run(ind, task_pkl, input_pkl, rerun=False, **kwargs): +def load_and_run(ind, task_main_pkl, rerun=False, **kwargs): """ loading the task and inputs from pickle files, settings proper input for specific index before running the task """ - task_orig = cp.loads(input_pkl.read_bytes()) - task = cp.loads(task_pkl.read_bytes()) - _, inputs_dict = task_orig.get_input_el(ind) + task = cp.loads(task_main_pkl.read_bytes()) + _, inputs_dict = task.get_input_el(ind) task.inputs = attr.evolve(task.inputs, **inputs_dict) + task.state = None return task._run(rerun=rerun, **kwargs) -async def load_and_run_async( - ind, task_pkl, input_pkl, submitter=None, rerun=False, **kwargs -): +async def load_and_run_async(ind, task_main_pkl, submitter=None, rerun=False, **kwargs): """ loading the workflow and inputs from pickle files, settings proper input for specific index before running the workflow """ - task_orig = cp.loads(input_pkl.read_bytes()) - task = cp.loads(task_pkl.read_bytes()) - _, inputs_dict = task_orig.get_input_el(ind) + task = cp.loads(task_main_pkl.read_bytes()) + _, inputs_dict = task.get_input_el(ind) task.inputs = attr.evolve(task.inputs, **inputs_dict) + task.state = None await task._run(submitter=submitter, rerun=rerun, **kwargs) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index e3868348e4..e35cbbbb78 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -71,17 +71,13 @@ async def submit_workflow(self, workflow, rerun=False): else: await workflow._run(self, rerun=rerun) else: # could be a tuple with paths to pickle files wiith tasks and inputs - ind, wf_pkl, input_pkl, wf_orig = workflow + ind, wf_main_pkl, wf_orig = workflow if wf_orig.plugin and wf_orig.plugin != self.plugin: # dj: this is not tested!!! TODO await self.worker.run_el(workflow, rerun=rerun) else: await load_and_run_async( - ind=ind, - task_pkl=wf_pkl, - input_pkl=input_pkl, - submitter=self, - rerun=rerun, + ind=ind, task_main_pkl=wf_main_pkl, submitter=self, rerun=rerun ) async def submit(self, runnable, wait=False, rerun=False): diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 9c4e2f1ead..03e90b5e0d 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -14,6 +14,7 @@ get_available_cpus, save, create_pyscript, + load_and_run, ) from .. import helpers_file from ..specs import File, Directory @@ -212,3 +213,19 @@ def test_get_available_cpus(): if platform.system().lower() == "darwin": assert get_available_cpus() == os.cpu_count() + + +def test_load_and_run(tmpdir): + """ testing load_and_run for pickled task""" + task_pkl = Path(tmpdir.join("task_main.pkl")) + + task = multiply(name="mult", x=[1, 2], y=10).split("x") + task.state.prepare_states(inputs=task.inputs) + task.state.prepare_inputs() + with task_pkl.open("wb") as fp: + cp.dump(task, fp) + + res_0 = load_and_run(ind=0, task_main_pkl=task_pkl) + assert res_0.output.out == 10 + res_1 = load_and_run(ind=1, task_main_pkl=task_pkl) + assert res_1.output.out == 20 diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index eaa21f9952..20bfd13850 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -5,13 +5,7 @@ import pytest from ... import mark -from ..task import ( - AuditFlag, - ShellCommandTask, - ContainerTask, - DockerTask, - SingularityTask, -) +from ..task import AuditFlag, ShellCommandTask, DockerTask, SingularityTask from ...utils.messenger import FileMessenger, PrintMessenger, collect_messages from .utils import gen_basic_wf diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 58a6321aaa..fa5b41d797 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -187,9 +187,9 @@ async def exec_as_coro(self, runnable, rerun=False): if isinstance(runnable, TaskBase): res = await self.loop.run_in_executor(self.pool, runnable._run, rerun) else: # it could be tuple that includes pickle files with tasks and inputs - ind, task_pkl, input_pkl, task_orig = runnable + ind, task_main_pkl, task_orig = runnable res = await self.loop.run_in_executor( - self.pool, load_and_run, ind, task_pkl, input_pkl, rerun + self.pool, load_and_run, ind, task_main_pkl, rerun ) return res From d8d7353ab0d99d4a439a08ad893eeffb85ab7800 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 31 May 2020 02:05:28 -0400 Subject: [PATCH 04/18] changing pytest setting - running only slurm if sbatch available (just to decrease the time) --- pydra/conftest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pydra/conftest.py b/pydra/conftest.py index 940bf9fad8..48125400e3 100644 --- a/pydra/conftest.py +++ b/pydra/conftest.py @@ -8,7 +8,7 @@ def pytest_addoption(parser): def pytest_generate_tests(metafunc): if "plugin_dask_opt" in metafunc.fixturenames: if bool(shutil.which("sbatch")): - Plugins = ["cf", "slurm"] + Plugins = ["slurm"] else: Plugins = ["cf"] if metafunc.config.getoption("dask"): @@ -19,7 +19,7 @@ def pytest_generate_tests(metafunc): if metafunc.config.getoption("dask"): Plugins = [] elif bool(shutil.which("sbatch")): - Plugins = ["cf", "slurm"] + Plugins = ["slurm"] else: Plugins = ["cf"] metafunc.parametrize("plugin", Plugins) From c417698a54f7198c16dc23aa924ab75691922ac7 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 31 May 2020 02:14:17 -0400 Subject: [PATCH 05/18] adjusting slurm worker to the current version of to_job (without deepcopy): modyfying create_pyscript to use load_and_run for task with a state, changes in _prepare_runscript and run_el to deal with a tuple instead of a runnable --- pydra/engine/helpers.py | 46 +++++++++++++++++++++------------- pydra/engine/workers.py | 55 +++++++++++++++++++++++++++++------------ 2 files changed, 68 insertions(+), 33 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 1ff64d2289..7f465538a4 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -397,7 +397,7 @@ def get_open_loop(): return loop -def create_pyscript(script_path, checksum, rerun=False): +def create_pyscript(script_path, checksum, rerun=False, ind=None): """ Create standalone script for task execution in a different environment. @@ -414,21 +414,28 @@ def create_pyscript(script_path, checksum, rerun=False): Execution script """ - task_pkl = script_path / "_task.pklz" + if ind is None: + task_pkl = script_path / "_task.pklz" + else: + task_pkl = script_path / "_task_main.pklz" if not task_pkl.exists() or not task_pkl.stat().st_size: raise Exception("Missing or empty task!") content = f"""import cloudpickle as cp from pathlib import Path - - cache_path = Path("{str(script_path)}") -task_pkl = (cache_path / "_task.pklz") +""" + if ind is None: + content += f"""task_pkl = (cache_path / "_task.pklz") task = cp.loads(task_pkl.read_bytes()) - -# submit task +""" + else: + content += f"""from pydra.engine.helpers import load_task +task_pkl = (cache_path / "_task_main.pklz") +task = load_task(ind={ind}, task_main_pkl=task_pkl) +""" + content += f"""# submit task task(rerun={rerun}) - if not task.result(): raise Exception("Something went wrong") print("Completed", task.checksum, task) @@ -547,22 +554,27 @@ def get_available_cpus(): def load_and_run(ind, task_main_pkl, rerun=False, **kwargs): - """ loading the task and inputs from pickle files, - settings proper input for specific index before running the task """ - task = cp.loads(task_main_pkl.read_bytes()) - _, inputs_dict = task.get_input_el(ind) - task.inputs = attr.evolve(task.inputs, **inputs_dict) - task.state = None + loading a task from a pickle file, settings proper input + and running the task + """ + task = load_task(ind=ind, task_main_pkl=task_main_pkl) return task._run(rerun=rerun, **kwargs) async def load_and_run_async(ind, task_main_pkl, submitter=None, rerun=False, **kwargs): - """ loading the workflow and inputs from pickle files, - settings proper input for specific index before running the workflow """ + loading a task from a pickle file, settings proper input + and running the workflow + """ + task = load_task(ind=ind, task_main_pkl=task_main_pkl) + await task._run(submitter=submitter, rerun=rerun, **kwargs) + + +def load_task(ind, task_main_pkl): + """ loading a task from a pickle file, settings proper input for the specific ind""" task = cp.loads(task_main_pkl.read_bytes()) _, inputs_dict = task.get_input_el(ind) task.inputs = attr.evolve(task.inputs, **inputs_dict) task.state = None - await task._run(submitter=submitter, rerun=rerun, **kwargs) + return task diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index fa5b41d797..048c9bb46d 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -4,6 +4,7 @@ import re from tempfile import gettempdir from pathlib import Path +from shutil import copyfile import concurrent.futures as cf @@ -74,14 +75,26 @@ def __init__(self, loop=None, max_jobs=None): self._jobs = 0 def _prepare_runscripts(self, task, interpreter="/bin/sh", rerun=False): - script_dir = ( - task.cache_dir / f"{self.__class__.__name__}_scripts" / task.checksum - ) + + if isinstance(task, TaskBase): + checksum = task.checksum + cache_dir = task.cache_dir + ind = None + else: + ind = task[0] + checksum = task[-1].checksum_states()[ind] + cache_dir = task[-1].cache_dir + + script_dir = cache_dir / f"{self.__class__.__name__}_scripts" / checksum script_dir.mkdir(parents=True, exist_ok=True) - if not (script_dir / "_task.pkl").exists(): - save(script_dir, task=task) - pyscript = create_pyscript(script_dir, task.checksum, rerun=rerun) - batchscript = script_dir / f"batchscript_{task.checksum}.sh" + if ind is None: + if not (script_dir / "_task.pkl").exists(): + save(script_dir, task=task) + else: + copyfile(task[1], script_dir / "_task_main.pklz") + + pyscript = create_pyscript(script_dir, checksum, rerun=rerun, ind=ind) + batchscript = script_dir / f"batchscript_{checksum}.sh" bcmd = "\n".join( ( f"#!{interpreter}", @@ -91,7 +104,7 @@ def _prepare_runscripts(self, task, interpreter="/bin/sh", rerun=False): ) with batchscript.open("wt") as fp: fp.writelines(bcmd) - return script_dir, pyscript, batchscript + return script_dir, batchscript async def fetch_finished(self, futures): """ @@ -231,20 +244,30 @@ def __init__( def run_el(self, runnable, rerun=False): """Worker submission API.""" - script_dir, _, batch_script = self._prepare_runscripts(runnable, rerun=rerun) + script_dir, batch_script = self._prepare_runscripts(runnable, rerun=rerun) if (script_dir / script_dir.parts[1]) == gettempdir(): logger.warning("Temporary directories may not be shared across computers") - return self._submit_job(runnable, batch_script) - async def _submit_job(self, task, batchscript): - """Coroutine that submits task runscript and polls job until completion or error.""" - script_dir = ( - task.cache_dir / f"{self.__class__.__name__}_scripts" / task.checksum + if isinstance(runnable, TaskBase): + checksum = runnable.checksum + cache_dir = runnable.cache_dir + name = runnable.name + else: + checksum = runnable[-1].checksum_states()[runnable[0]] + cache_dir = runnable[-1].cache_dir + name = runnable[-1].name + + return self._submit_job( + batch_script, name=name, checksum=checksum, cache_dir=cache_dir ) + + async def _submit_job(self, batchscript, name, checksum, cache_dir): + """Coroutine that submits task runscript and polls job until completion or error.""" + script_dir = cache_dir / f"{self.__class__.__name__}_scripts" / checksum sargs = self.sbatch_args.split() jobname = re.search(r"(?<=-J )\S+|(?<=--job-name=)\S+", self.sbatch_args) if not jobname: - jobname = ".".join((task.name, task.checksum)) + jobname = ".".join((name, checksum)) sargs.append(f"--job-name={jobname}") output = re.search(r"(?<=-o )\S+|(?<=--output=)\S+", self.sbatch_args) if not output: @@ -274,7 +297,7 @@ async def _submit_job(self, task, batchscript): # Exception: Polling / job failure done = await self._poll_job(jobid) if done: - return task + return True await asyncio.sleep(self.poll_delay) async def _poll_job(self, jobid): From cc071703495230a4753fe1dec396d708b8f9bc30 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 31 May 2020 10:30:38 -0400 Subject: [PATCH 06/18] small changes in load_and_run, to be able to use in create_pyscript --- pydra/engine/helpers.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 7f465538a4..78f84a9200 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -428,14 +428,16 @@ def create_pyscript(script_path, checksum, rerun=False, ind=None): if ind is None: content += f"""task_pkl = (cache_path / "_task.pklz") task = cp.loads(task_pkl.read_bytes()) +# submit task +task(rerun={rerun}) """ else: - content += f"""from pydra.engine.helpers import load_task + content += f"""from pydra.engine.helpers import load_and_run task_pkl = (cache_path / "_task_main.pklz") -task = load_task(ind={ind}, task_main_pkl=task_pkl) +# loading and running the task +task = load_and_run(ind={ind}, task_main_pkl=task_pkl, rerun={rerun}) """ - content += f"""# submit task -task(rerun={rerun}) + content += f"""# checking results if not task.result(): raise Exception("Something went wrong") print("Completed", task.checksum, task) @@ -559,7 +561,8 @@ def load_and_run(ind, task_main_pkl, rerun=False, **kwargs): and running the task """ task = load_task(ind=ind, task_main_pkl=task_main_pkl) - return task._run(rerun=rerun, **kwargs) + task._run(rerun=rerun, **kwargs) + return task async def load_and_run_async(ind, task_main_pkl, submitter=None, rerun=False, **kwargs): From 414905546f717082ae0b27ebd433d052144aa4fb Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 31 May 2020 11:39:56 -0400 Subject: [PATCH 07/18] simplifying create_pyscript --- pydra/engine/helpers.py | 41 ++++++++++++++------------------------- pydra/engine/submitter.py | 2 +- pydra/engine/workers.py | 4 ++-- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 78f84a9200..fecbc99173 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -414,30 +414,18 @@ def create_pyscript(script_path, checksum, rerun=False, ind=None): Execution script """ - if ind is None: - task_pkl = script_path / "_task.pklz" - else: - task_pkl = script_path / "_task_main.pklz" + task_pkl = script_path / "_task.pklz" if not task_pkl.exists() or not task_pkl.stat().st_size: raise Exception("Missing or empty task!") content = f"""import cloudpickle as cp from pathlib import Path cache_path = Path("{str(script_path)}") -""" - if ind is None: - content += f"""task_pkl = (cache_path / "_task.pklz") -task = cp.loads(task_pkl.read_bytes()) -# submit task -task(rerun={rerun}) -""" - else: - content += f"""from pydra.engine.helpers import load_and_run -task_pkl = (cache_path / "_task_main.pklz") +from pydra.engine.helpers import load_and_run +task_pkl = (cache_path / "_task.pklz") # loading and running the task -task = load_and_run(ind={ind}, task_main_pkl=task_pkl, rerun={rerun}) -""" - content += f"""# checking results +task = load_and_run(task_pkl=task_pkl, ind={ind}, rerun={rerun}) +# checking results if not task.result(): raise Exception("Something went wrong") print("Completed", task.checksum, task) @@ -555,29 +543,30 @@ def get_available_cpus(): return os.cpu_count() -def load_and_run(ind, task_main_pkl, rerun=False, **kwargs): +def load_and_run(task_pkl, ind=None, rerun=False, **kwargs): """ loading a task from a pickle file, settings proper input and running the task """ - task = load_task(ind=ind, task_main_pkl=task_main_pkl) + task = load_task(task_pkl=task_pkl, ind=ind) task._run(rerun=rerun, **kwargs) return task -async def load_and_run_async(ind, task_main_pkl, submitter=None, rerun=False, **kwargs): +async def load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs): """ loading a task from a pickle file, settings proper input and running the workflow """ - task = load_task(ind=ind, task_main_pkl=task_main_pkl) + task = load_task(task_pkl=task_pkl, ind=ind) await task._run(submitter=submitter, rerun=rerun, **kwargs) -def load_task(ind, task_main_pkl): +def load_task(task_pkl, ind=None): """ loading a task from a pickle file, settings proper input for the specific ind""" - task = cp.loads(task_main_pkl.read_bytes()) - _, inputs_dict = task.get_input_el(ind) - task.inputs = attr.evolve(task.inputs, **inputs_dict) - task.state = None + task = cp.loads(task_pkl.read_bytes()) + if ind is not None: + _, inputs_dict = task.get_input_el(ind) + task.inputs = attr.evolve(task.inputs, **inputs_dict) + task.state = None return task diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index e35cbbbb78..877b12a84a 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -77,7 +77,7 @@ async def submit_workflow(self, workflow, rerun=False): await self.worker.run_el(workflow, rerun=rerun) else: await load_and_run_async( - ind=ind, task_main_pkl=wf_main_pkl, submitter=self, rerun=rerun + task_pkl=wf_main_pkl, ind=ind, submitter=self, rerun=rerun ) async def submit(self, runnable, wait=False, rerun=False): diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 048c9bb46d..8f52f9c37f 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -91,7 +91,7 @@ def _prepare_runscripts(self, task, interpreter="/bin/sh", rerun=False): if not (script_dir / "_task.pkl").exists(): save(script_dir, task=task) else: - copyfile(task[1], script_dir / "_task_main.pklz") + copyfile(task[1], script_dir / "_task.pklz") pyscript = create_pyscript(script_dir, checksum, rerun=rerun, ind=ind) batchscript = script_dir / f"batchscript_{checksum}.sh" @@ -202,7 +202,7 @@ async def exec_as_coro(self, runnable, rerun=False): else: # it could be tuple that includes pickle files with tasks and inputs ind, task_main_pkl, task_orig = runnable res = await self.loop.run_in_executor( - self.pool, load_and_run, ind, task_main_pkl, rerun + self.pool, load_and_run, task_main_pkl, ind, rerun ) return res From ca65aac06054bb72007a1fa127eff2fb845d16fb Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 31 May 2020 15:08:52 -0400 Subject: [PATCH 08/18] removing to_job and adding pickle_task - uses helpers.save to save the task with full input; submitter passes the same pickle file, but different indices for tasks with states (so load_and_runwould be able to set proper inputs) --- pydra/engine/core.py | 26 +++++++------------------- pydra/engine/helpers.py | 11 +++++++++-- pydra/engine/submitter.py | 11 +++++------ pydra/engine/tests/test_helpers.py | 8 ++++---- 4 files changed, 25 insertions(+), 31 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index ce4773c3d8..e80f671d83 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -503,25 +503,13 @@ def get_input_el(self, ind): inputs_dict = {inp: getattr(self.inputs, inp) for inp in self.input_names} return None, inputs_dict - def to_job(self, ind): - """ Pickling tasks and inputs, so don't have to load the input to teh memory """ - # logger.debug("Run interface el, name={}, ind={}".format(self.name, ind)) - # copy the task and setting input/state to None - - pkl_files = self.cache_dir / "pkl_files" - pkl_files.mkdir(exist_ok=True) - task_main_path = pkl_files / f"task_main_{self.name}.pklz" - - # the pickle files should be independent on index, so could be saved once only - if ind == 0: - # saving the original task with the full input - # so can be later used to set input to all of the tasks - with task_main_path.open("wb") as fp: - cp.dump(self, fp) - - # index, path to the pickled original task with input, - # and self (to be able to check properties when needed) - return (ind, task_main_path, self) + def pickle_task(self): + """ Pickling the tasks with full inputs""" + pkl_files = self.cache_dir / self.checksum / "pkl_files" + pkl_files.mkdir(exist_ok=True, parents=True) + task_main_path = pkl_files / "_task.pklz" + save(task_path=pkl_files, task=self, use_locfile=True) + return task_main_path @property def done(self): diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index fecbc99173..cc35f00446 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -4,6 +4,7 @@ import attr import cloudpickle as cp from pathlib import Path +from filelock import SoftFileLock import os import sys from hashlib import sha256 @@ -94,7 +95,7 @@ def load_result(checksum, cache_locations): return None -def save(task_path: Path, result=None, task=None): +def save(task_path: Path, result=None, task=None, use_locfile=False): """ Save a :class:`~pydra.engine.core.TaskBase` object and/or results. @@ -106,8 +107,14 @@ def save(task_path: Path, result=None, task=None): Result to pickle and write task : :class:`~pydra.engine.core.TaskBase` Task to pickle and write - + use_locfile : :obj: `bool` + if True, SoftFileLock will be used """ + if use_locfile: + lockfile = task_path.parent / (task_path.name + ".lock") + with SoftFileLock(lockfile): + save(task_path=task_path, result=result, task=task) + if task is None and result is None: raise ValueError("Nothing to be saved") task_path.mkdir(parents=True, exist_ok=True) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 877b12a84a..059e83081f 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -110,17 +110,16 @@ async def submit(self, runnable, wait=False, rerun=False): logger.debug( f"Expanding {runnable} into {len(runnable.state.states_val)} states" ) + task_pkl = runnable.pickle_task() + for sidx in range(len(runnable.state.states_val)): - job = runnable.to_job(sidx) - logger.debug( - f'Submitting runnable {job}{str(sidx) if sidx is not None else ""}' - ) + job_tuple = (sidx, task_pkl, runnable) if is_workflow(runnable): # job has no state anymore - futures.add(self.submit_workflow(job, rerun=rerun)) + futures.add(self.submit_workflow(job_tuple, rerun=rerun)) else: # tasks are submitted to worker for execution - futures.add(self.worker.run_el(job, rerun=rerun)) + futures.add(self.worker.run_el(job_tuple, rerun=rerun)) else: if is_workflow(runnable): await self._run_workflow(runnable, rerun=rerun) diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 03e90b5e0d..c41c8e8a85 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -225,7 +225,7 @@ def test_load_and_run(tmpdir): with task_pkl.open("wb") as fp: cp.dump(task, fp) - res_0 = load_and_run(ind=0, task_main_pkl=task_pkl) - assert res_0.output.out == 10 - res_1 = load_and_run(ind=1, task_main_pkl=task_pkl) - assert res_1.output.out == 20 + task_0 = load_and_run(task_pkl=task_pkl, ind=0) + assert task_0.result().output.out == 10 + task_1 = load_and_run(task_pkl=task_pkl, ind=1) + assert task_1.result().output.out == 20 From ecfe4810439f20320a74855d9df90fc8862b8ed8 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 31 May 2020 16:53:44 -0400 Subject: [PATCH 09/18] removing create_pyscript, using load_and_run in slurm script --- pydra/engine/helpers.py | 45 ++++-------------------------- pydra/engine/tests/test_helpers.py | 19 +------------ pydra/engine/workers.py | 17 ++++++----- 3 files changed, 14 insertions(+), 67 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index cc35f00446..72338be542 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -404,46 +404,6 @@ def get_open_loop(): return loop -def create_pyscript(script_path, checksum, rerun=False, ind=None): - """ - Create standalone script for task execution in a different environment. - - Parameters - ---------- - script_path : :obj:`os.pathlike` - Path to the script. - checksum : str - Task checksum. - - Returns - ------- - pyscript : :obj:`File` - Execution script - - """ - task_pkl = script_path / "_task.pklz" - if not task_pkl.exists() or not task_pkl.stat().st_size: - raise Exception("Missing or empty task!") - - content = f"""import cloudpickle as cp -from pathlib import Path -cache_path = Path("{str(script_path)}") -from pydra.engine.helpers import load_and_run -task_pkl = (cache_path / "_task.pklz") -# loading and running the task -task = load_and_run(task_pkl=task_pkl, ind={ind}, rerun={rerun}) -# checking results -if not task.result(): - raise Exception("Something went wrong") -print("Completed", task.checksum, task) -task_pkl.unlink() -""" - pyscript = script_path / f"pyscript_{checksum}.py" - with pyscript.open("wt") as fp: - fp.writelines(content) - return pyscript - - def hash_function(obj): """Generate hash of object.""" return sha256(str(obj).encode()).hexdigest() @@ -557,6 +517,9 @@ def load_and_run(task_pkl, ind=None, rerun=False, **kwargs): """ task = load_task(task_pkl=task_pkl, ind=ind) task._run(rerun=rerun, **kwargs) + + if not task.result(): + raise Exception("Something went wrong") return task @@ -571,6 +534,8 @@ async def load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, ** def load_task(task_pkl, ind=None): """ loading a task from a pickle file, settings proper input for the specific ind""" + if isinstance(task_pkl, str): + task_pkl = Path(task_pkl) task = cp.loads(task_pkl.read_bytes()) if ind is not None: _, inputs_dict = task.get_input_el(ind) diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index c41c8e8a85..251009f2da 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -8,14 +8,7 @@ import cloudpickle as cp from .utils import multiply -from ..helpers import ( - hash_value, - hash_function, - get_available_cpus, - save, - create_pyscript, - load_and_run, -) +from ..helpers import hash_value, hash_function, get_available_cpus, save, load_and_run from .. import helpers_file from ..specs import File, Directory @@ -44,16 +37,6 @@ def test_save(tmpdir): assert res.output.out == 2 -def test_create_pyscript(tmpdir): - outdir = Path(tmpdir) - with pytest.raises(Exception): - create_pyscript(outdir, "checksum") - foo = multiply(name="mult", x=1, y=2) - save(outdir, task=foo) - pyscript = create_pyscript(outdir, foo.checksum) - assert pyscript.exists() - - def test_hash_file(tmpdir): outdir = Path(tmpdir) with open(outdir / "test.file", "wt") as fp: diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 8f52f9c37f..e4fa5ac231 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -9,13 +9,7 @@ import concurrent.futures as cf from .core import TaskBase -from .helpers import ( - create_pyscript, - get_available_cpus, - read_and_display_async, - save, - load_and_run, -) +from .helpers import get_available_cpus, read_and_display_async, save, load_and_run import logging @@ -93,13 +87,18 @@ def _prepare_runscripts(self, task, interpreter="/bin/sh", rerun=False): else: copyfile(task[1], script_dir / "_task.pklz") - pyscript = create_pyscript(script_dir, checksum, rerun=rerun, ind=ind) + task_pkl = script_dir / "_task.pklz" + if not task_pkl.exists() or not task_pkl.stat().st_size: + raise Exception("Missing or empty task!") + batchscript = script_dir / f"batchscript_{checksum}.sh" + python_string = f"""'from pydra.engine.helpers import load_and_run; load_and_run(task_pkl="{str(task_pkl)}", ind={ind}, rerun={rerun}) ' + """ bcmd = "\n".join( ( f"#!{interpreter}", f"#SBATCH --output={str(script_dir / 'slurm-%j.out')}", - f"{sys.executable} {str(pyscript)}", + f"{sys.executable} -c " + python_string, ) ) with batchscript.open("wt") as fp: From d582ec8c610b95cb2af5139be3a4ad332b83a45f Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 13:37:14 -0400 Subject: [PATCH 10/18] fixing load and run, so it works also for workflows --- pydra/engine/helpers.py | 6 ++++-- pydra/engine/tests/test_helpers.py | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 72338be542..a9ff31da0a 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -510,13 +510,15 @@ def get_available_cpus(): return os.cpu_count() -def load_and_run(task_pkl, ind=None, rerun=False, **kwargs): +def load_and_run( + task_pkl, ind=None, rerun=False, submitter=None, plugin=None, **kwargs +): """ loading a task from a pickle file, settings proper input and running the task """ task = load_task(task_pkl=task_pkl, ind=ind) - task._run(rerun=rerun, **kwargs) + task(rerun=rerun, plugin=plugin, submitter=submitter, **kwargs) if not task.result(): raise Exception("Something went wrong") diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 251009f2da..8321b7ca9b 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -11,6 +11,7 @@ from ..helpers import hash_value, hash_function, get_available_cpus, save, load_and_run from .. import helpers_file from ..specs import File, Directory +from ..core import Workflow def test_save(tmpdir): @@ -212,3 +213,29 @@ def test_load_and_run(tmpdir): assert task_0.result().output.out == 10 task_1 = load_and_run(task_pkl=task_pkl, ind=1) assert task_1.result().output.out == 20 + + +def test_load_and_run_wf(tmpdir): + """ testing load_and_run for pickled task""" + wf_pkl = Path(tmpdir.join("wf_main.pkl")) + + wf = Workflow(name="wf", input_spec=["x", "y"]) + wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y)) + wf.split(("x")) + wf.inputs.x = [1, 2] + wf.inputs.y = 10 + + wf.set_output([("out", wf.mult.lzout.out)]) + + # task = multiply(name="mult", x=[1, 2], y=10).split("x") + wf.state.prepare_states(inputs=wf.inputs) + wf.state.prepare_inputs() + wf.plugin = "cf" + + with wf_pkl.open("wb") as fp: + cp.dump(wf, fp) + + wf_0 = load_and_run(ind=0, task_pkl=wf_pkl) + assert wf_0.result().output.out == 10 + wf_1 = load_and_run(ind=1, task_pkl=wf_pkl) + assert wf_1.result().output.out == 20 From fad92c9a1ba769e111527f8c1e54bcf36ffd9b19 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 15:28:14 -0400 Subject: [PATCH 11/18] always using lockfile in save --- pydra/engine/core.py | 2 +- pydra/engine/helpers.py | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index e80f671d83..b30167e5a7 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -508,7 +508,7 @@ def pickle_task(self): pkl_files = self.cache_dir / self.checksum / "pkl_files" pkl_files.mkdir(exist_ok=True, parents=True) task_main_path = pkl_files / "_task.pklz" - save(task_path=pkl_files, task=self, use_locfile=True) + save(task_path=pkl_files, task=self) return task_main_path @property diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index a9ff31da0a..0241ce7a7a 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -95,7 +95,7 @@ def load_result(checksum, cache_locations): return None -def save(task_path: Path, result=None, task=None, use_locfile=False): +def save(task_path: Path, result=None, task=None): """ Save a :class:`~pydra.engine.core.TaskBase` object and/or results. @@ -107,26 +107,26 @@ def save(task_path: Path, result=None, task=None, use_locfile=False): Result to pickle and write task : :class:`~pydra.engine.core.TaskBase` Task to pickle and write - use_locfile : :obj: `bool` - if True, SoftFileLock will be used """ - if use_locfile: - lockfile = task_path.parent / (task_path.name + ".lock") - with SoftFileLock(lockfile): - save(task_path=task_path, result=result, task=task) if task is None and result is None: raise ValueError("Nothing to be saved") + + if not isinstance(task_path, Path): + task_path = Path(task_path) task_path.mkdir(parents=True, exist_ok=True) - if result: - if Path(task_path).name.startswith("Workflow"): - # copy files to the workflow directory - result = copyfile_workflow(wf_path=task_path, result=result) - with (task_path / "_result.pklz").open("wb") as fp: - cp.dump(result, fp) - if task: - with (task_path / "_task.pklz").open("wb") as fp: - cp.dump(task, fp) + + lockfile = task_path.parent / (task_path.name + "_save.lock") + with SoftFileLock(lockfile): + if result: + if task_path.name.startswith("Workflow"): + # copy files to the workflow directory + result = copyfile_workflow(wf_path=task_path, result=result) + with (task_path / "_result.pklz").open("wb") as fp: + cp.dump(result, fp) + if task: + with (task_path / "_task.pklz").open("wb") as fp: + cp.dump(task, fp) def copyfile_workflow(wf_path, result): From 485a9f0e8fe72aa3e39ed767ed81d32d8668a8a1 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 18:13:48 -0400 Subject: [PATCH 12/18] adding crashfile to load_and_run --- pydra/engine/helpers.py | 70 ++++++++++++++++++++++++++++-- pydra/engine/tests/test_helpers.py | 24 +++++++++- pydra/engine/tests/utils.py | 7 +++ 3 files changed, 96 insertions(+), 5 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 0241ce7a7a..027300f982 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -9,6 +9,11 @@ import sys from hashlib import sha256 import subprocess as sp +import getpass +import uuid +from time import strftime +from traceback import format_exception + from .specs import Runtime, File, Directory, attr_fields from .helpers_file import hash_file, hash_dir, copyfile, is_existing_file @@ -518,10 +523,22 @@ def load_and_run( and running the task """ task = load_task(task_pkl=task_pkl, ind=ind) - task(rerun=rerun, plugin=plugin, submitter=submitter, **kwargs) - - if not task.result(): - raise Exception("Something went wrong") + try: + task(rerun=rerun, plugin=plugin, submitter=submitter, **kwargs) + except Exception as excinfo: + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + crashfile = report_crash(task, traceback=traceback) + raise type(excinfo)( + str(excinfo.with_traceback(None)), + f" full crash report is here: {crashfile}", + ) + else: + if not task.result(): + crashfile = report_crash(task, traceback="results are not set properly") + raise Exception( + f"results are not set properly, full crash report is here: {crashfile}" + ) return task @@ -544,3 +561,48 @@ def load_task(task_pkl, ind=None): task.inputs = attr.evolve(task.inputs, **inputs_dict) task.state = None return task + + +def report_crash(task, traceback, hostname=None): + """Writes crash related information to a file (based on nipype function) + """ + name = task.name + + try: + result = task.result() + except FileNotFoundError: + traceback += """ +When creating this crashfile, the results file corresponding +to the task could not be found.""".splitlines( + keepends=True + ) + except Exception as exc: + traceback += """ +During the creation of this crashfile triggered by the above exception, +another exception occurred:\n\n{}.""".format( + exc + ).splitlines( + keepends=True + ) + else: + if getattr(result, "runtime", None): + if isinstance(result.runtime, list): + host = result.runtime[0].hostname + else: + host = result.runtime.hostname + + # Try everything to fill in the host + # TODO: think about using host + # host = host or hostname or gethostname() + timeofcrash = strftime("%Y%m%d-%H%M%S") + try: + login_name = getpass.getuser() + except KeyError: + login_name = "UID{:d}".format(os.getuid()) + crashfile = f"crash-{timeofcrash}-{login_name}-{name}-{uuid.uuid4()}.pklz" + crashfile = task.output_dir / crashfile + + with crashfile.open("wb") as fp: + cp.dump(traceback, fp) + + return crashfile diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 8321b7ca9b..9c8a65083a 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -7,7 +7,7 @@ import pytest import cloudpickle as cp -from .utils import multiply +from .utils import multiply, raise_xeq1 from ..helpers import hash_value, hash_function, get_available_cpus, save, load_and_run from .. import helpers_file from ..specs import File, Directory @@ -215,6 +215,28 @@ def test_load_and_run(tmpdir): assert task_1.result().output.out == 20 +def test_load_and_run_exception(tmpdir): + """ testing raising exception and saving info in crashfile when when load_and_run""" + task_pkl = Path(tmpdir.join("task_main.pkl")) + + task = raise_xeq1(name="raise", x=[1, 2]).split("x") + task.state.prepare_states(inputs=task.inputs) + task.state.prepare_inputs() + + with task_pkl.open("wb") as fp: + cp.dump(task, fp) + + with pytest.raises(Exception) as excinfo: + task_0 = load_and_run(task_pkl=task_pkl, ind=0) + assert "i'm raising an exception!" in str(excinfo.value) + # checking if the crashfile has been created + assert "/crash" in str(excinfo.value) + assert Path(str(excinfo.value).split("here: ")[1][:-2]).exists() + # the second task should be fine + task_1 = load_and_run(task_pkl=task_pkl, ind=1) + assert task_1.result().output.out == 2 + + def test_load_and_run_wf(tmpdir): """ testing load_and_run for pickled task""" wf_pkl = Path(tmpdir.join("wf_main.pkl")) diff --git a/pydra/engine/tests/utils.py b/pydra/engine/tests/utils.py index 56558711ed..46c043749d 100644 --- a/pydra/engine/tests/utils.py +++ b/pydra/engine/tests/utils.py @@ -102,6 +102,13 @@ def add2(x): return x + 2 +@mark.task +def raise_xeq1(x): + if x == 1: + raise Exception("x is 1, so i'm raising an exception!") + return x + + @mark.task @mark.annotate({"return": {"out_add": float, "out_sub": float}}) def add2_sub2_res(res): From 341293160fded32080b6ac9fe8118f6a06ec8820 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 18:31:47 -0400 Subject: [PATCH 13/18] Adding name_prefix to save --- pydra/engine/core.py | 6 +++--- pydra/engine/helpers.py | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index b30167e5a7..c4a4940cd8 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -505,10 +505,10 @@ def get_input_el(self, ind): def pickle_task(self): """ Pickling the tasks with full inputs""" - pkl_files = self.cache_dir / self.checksum / "pkl_files" + pkl_files = self.cache_dir / "pkl_files" pkl_files.mkdir(exist_ok=True, parents=True) - task_main_path = pkl_files / "_task.pklz" - save(task_path=pkl_files, task=self) + task_main_path = pkl_files / f"{self.name}_{self.checksum}_task.pklz" + save(task_path=pkl_files, task=self, name_prefix=f"{self.name}_{self.checksum}") return task_main_path @property diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 027300f982..986fbb1dca 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -100,7 +100,7 @@ def load_result(checksum, cache_locations): return None -def save(task_path: Path, result=None, task=None): +def save(task_path: Path, result=None, task=None, name_prefix=None): """ Save a :class:`~pydra.engine.core.TaskBase` object and/or results. @@ -120,6 +120,8 @@ def save(task_path: Path, result=None, task=None): if not isinstance(task_path, Path): task_path = Path(task_path) task_path.mkdir(parents=True, exist_ok=True) + if name_prefix is None: + name_prefix = "" lockfile = task_path.parent / (task_path.name + "_save.lock") with SoftFileLock(lockfile): @@ -127,10 +129,10 @@ def save(task_path: Path, result=None, task=None): if task_path.name.startswith("Workflow"): # copy files to the workflow directory result = copyfile_workflow(wf_path=task_path, result=result) - with (task_path / "_result.pklz").open("wb") as fp: + with (task_path / f"{name_prefix}_result.pklz").open("wb") as fp: cp.dump(result, fp) if task: - with (task_path / "_task.pklz").open("wb") as fp: + with (task_path / f"{name_prefix}_task.pklz").open("wb") as fp: cp.dump(task, fp) From 68996ac36c2ae727901deed6b1557067748e38fd Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 19:26:21 -0400 Subject: [PATCH 14/18] small edit to the test to fix windows --- pydra/engine/tests/test_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 9c8a65083a..c85e456a34 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -230,7 +230,7 @@ def test_load_and_run_exception(tmpdir): task_0 = load_and_run(task_pkl=task_pkl, ind=0) assert "i'm raising an exception!" in str(excinfo.value) # checking if the crashfile has been created - assert "/crash" in str(excinfo.value) + assert "crash-" in str(excinfo.value) assert Path(str(excinfo.value).split("here: ")[1][:-2]).exists() # the second task should be fine task_1 = load_and_run(task_pkl=task_pkl, ind=1) From 50b409a110b0e25a90aaed4141f04655614ad2d8 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 19:54:14 -0400 Subject: [PATCH 15/18] allowing failure for task, i'm getting Timed out trying to connecttoo often --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index f21066212c..a51146d204 100644 --- a/.travis.yml +++ b/.travis.yml @@ -61,6 +61,11 @@ matrix: allow_failures: - python: 3.7 env: INSTALL_DEPENDS="pip==10.0.1 setuptools==30.3.0" + - python: 3.7 + env: + - INSTALL_TYPE="develop" + - CHECK_TYPE="test_dask" + before_install: From 3ede7870ee2b5dcf0fb3103f6fc3143b01929702 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 21:51:04 -0400 Subject: [PATCH 16/18] removing report_crash and modyfing record_error to be used in load_and_run --- pydra/engine/helpers.py | 94 ++++++++++++------------------ pydra/engine/tests/test_helpers.py | 12 +++- 2 files changed, 47 insertions(+), 59 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 986fbb1dca..b6bef3dcd5 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -15,7 +15,7 @@ from traceback import format_exception -from .specs import Runtime, File, Directory, attr_fields +from .specs import Runtime, File, Directory, attr_fields, Result from .helpers_file import hash_file, hash_dir, copyfile, is_existing_file @@ -383,8 +383,33 @@ def create_checksum(name, inputs): def record_error(error_path, error): """Write an error file.""" + + error_message = str(error) + + resultfile = error_path / "_result.pklz" + if not resultfile.exists(): + error_message += """\n + When creating this error file, the results file corresponding + to the task could not be found.""" + + name_checksum = str(error_path.name) + timeofcrash = strftime("%Y%m%d-%H%M%S") + try: + login_name = getpass.getuser() + except KeyError: + login_name = "UID{:d}".format(os.getuid()) + + full_error = { + "time of crash": timeofcrash, + "login name": login_name, + "name with checksum": name_checksum, + "error message": error, + } + with (error_path / "_error.pklz").open("wb") as fp: - cp.dump(error, fp) + cp.dump(full_error, fp) + + return error_path / "_error.pklz" def get_open_loop(): @@ -528,19 +553,19 @@ def load_and_run( try: task(rerun=rerun, plugin=plugin, submitter=submitter, **kwargs) except Exception as excinfo: - etype, eval, etr = sys.exc_info() - traceback = format_exception(etype, eval, etr) - crashfile = report_crash(task, traceback=traceback) + # creating result and error files if missing + resultfile = task.output_dir / "_result.pklz" + errorfile = task.output_dir / "_error.pklz" + if not resultfile.exists(): + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + errorfile = record_error(task.output_dir, error=traceback) + result = Result(output=None, runtime=None, errored=True) + save(task.output_dir, result=result) raise type(excinfo)( str(excinfo.with_traceback(None)), - f" full crash report is here: {crashfile}", + f" full crash report is here: {errorfile}", ) - else: - if not task.result(): - crashfile = report_crash(task, traceback="results are not set properly") - raise Exception( - f"results are not set properly, full crash report is here: {crashfile}" - ) return task @@ -563,48 +588,3 @@ def load_task(task_pkl, ind=None): task.inputs = attr.evolve(task.inputs, **inputs_dict) task.state = None return task - - -def report_crash(task, traceback, hostname=None): - """Writes crash related information to a file (based on nipype function) - """ - name = task.name - - try: - result = task.result() - except FileNotFoundError: - traceback += """ -When creating this crashfile, the results file corresponding -to the task could not be found.""".splitlines( - keepends=True - ) - except Exception as exc: - traceback += """ -During the creation of this crashfile triggered by the above exception, -another exception occurred:\n\n{}.""".format( - exc - ).splitlines( - keepends=True - ) - else: - if getattr(result, "runtime", None): - if isinstance(result.runtime, list): - host = result.runtime[0].hostname - else: - host = result.runtime.hostname - - # Try everything to fill in the host - # TODO: think about using host - # host = host or hostname or gethostname() - timeofcrash = strftime("%Y%m%d-%H%M%S") - try: - login_name = getpass.getuser() - except KeyError: - login_name = "UID{:d}".format(os.getuid()) - crashfile = f"crash-{timeofcrash}-{login_name}-{name}-{uuid.uuid4()}.pklz" - crashfile = task.output_dir / crashfile - - with crashfile.open("wb") as fp: - cp.dump(traceback, fp) - - return crashfile diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index c85e456a34..64bdb24b88 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -230,8 +230,16 @@ def test_load_and_run_exception(tmpdir): task_0 = load_and_run(task_pkl=task_pkl, ind=0) assert "i'm raising an exception!" in str(excinfo.value) # checking if the crashfile has been created - assert "crash-" in str(excinfo.value) - assert Path(str(excinfo.value).split("here: ")[1][:-2]).exists() + assert "crash" in str(excinfo.value) + errorfile = Path(str(excinfo.value).split("here: ")[1][:-2]) + assert errorfile.exists() + + resultfile = errorfile.parent / "_result.pklz" + assert resultfile.exists() + # checking the content + result_exception = cp.loads(resultfile.read_bytes()) + assert result_exception.errored is True + # the second task should be fine task_1 = load_and_run(task_pkl=task_pkl, ind=1) assert task_1.result().output.out == 2 From e0b3552cc7d441ea25724fdd84ebf694ecf58d41 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 22:11:41 -0400 Subject: [PATCH 17/18] catching error in load_task; returning resultfile instead of task in load_and_run --- pydra/engine/helpers.py | 16 ++++++++++--- pydra/engine/tests/test_helpers.py | 37 +++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index b6bef3dcd5..401c2f8371 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -549,12 +549,22 @@ def load_and_run( loading a task from a pickle file, settings proper input and running the task """ - task = load_task(task_pkl=task_pkl, ind=ind) + try: + task = load_task(task_pkl=task_pkl, ind=ind) + except Exception as excinfo: + if task_pkl.parent.exists(): + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + errorfile = record_error(task_pkl.parent, error=traceback) + result = Result(output=None, runtime=None, errored=True) + save(task_pkl.parent, result=result) + raise + + resultfile = task.output_dir / "_result.pklz" try: task(rerun=rerun, plugin=plugin, submitter=submitter, **kwargs) except Exception as excinfo: # creating result and error files if missing - resultfile = task.output_dir / "_result.pklz" errorfile = task.output_dir / "_error.pklz" if not resultfile.exists(): etype, eval, etr = sys.exc_info() @@ -566,7 +576,7 @@ def load_and_run( str(excinfo.with_traceback(None)), f" full crash report is here: {errorfile}", ) - return task + return resultfile async def load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs): diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 64bdb24b88..45b3c08e87 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -209,13 +209,24 @@ def test_load_and_run(tmpdir): with task_pkl.open("wb") as fp: cp.dump(task, fp) - task_0 = load_and_run(task_pkl=task_pkl, ind=0) - assert task_0.result().output.out == 10 - task_1 = load_and_run(task_pkl=task_pkl, ind=1) - assert task_1.result().output.out == 20 + resultfile_0 = load_and_run(task_pkl=task_pkl, ind=0) + resultfile_1 = load_and_run(task_pkl=task_pkl, ind=1) + # checking the result files + result_0 = cp.loads(resultfile_0.read_bytes()) + result_1 = cp.loads(resultfile_1.read_bytes()) + assert result_0.output.out == 10 + assert result_1.output.out == 20 -def test_load_and_run_exception(tmpdir): +def test_load_and_run_exception_load(tmpdir): + """ testing raising exception and saving info in crashfile when when load_and_run""" + task_pkl = Path(tmpdir.join("task_main.pkl")) + task = raise_xeq1(name="raise", x=[1, 2]).split("x") + with pytest.raises(FileNotFoundError) as excinfo: + task_0 = load_and_run(task_pkl=task_pkl, ind=0) + + +def test_load_and_run_exception_run(tmpdir): """ testing raising exception and saving info in crashfile when when load_and_run""" task_pkl = Path(tmpdir.join("task_main.pkl")) @@ -241,8 +252,9 @@ def test_load_and_run_exception(tmpdir): assert result_exception.errored is True # the second task should be fine - task_1 = load_and_run(task_pkl=task_pkl, ind=1) - assert task_1.result().output.out == 2 + resultfile = load_and_run(task_pkl=task_pkl, ind=1) + result_1 = cp.loads(resultfile.read_bytes()) + assert result_1.output.out == 2 def test_load_and_run_wf(tmpdir): @@ -265,7 +277,10 @@ def test_load_and_run_wf(tmpdir): with wf_pkl.open("wb") as fp: cp.dump(wf, fp) - wf_0 = load_and_run(ind=0, task_pkl=wf_pkl) - assert wf_0.result().output.out == 10 - wf_1 = load_and_run(ind=1, task_pkl=wf_pkl) - assert wf_1.result().output.out == 20 + resultfile_0 = load_and_run(ind=0, task_pkl=wf_pkl) + resultfile_1 = load_and_run(ind=1, task_pkl=wf_pkl) + # checking the result files + result_0 = cp.loads(resultfile_0.read_bytes()) + result_1 = cp.loads(resultfile_1.read_bytes()) + assert result_0.output.out == 10 + assert result_1.output.out == 20 From d17b593d56b45d52cc86853bcd17055dc452a93a Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 1 Jun 2020 23:34:14 -0400 Subject: [PATCH 18/18] small changes to make_class, so result is properly formatted, closes #267 --- pydra/engine/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 401c2f8371..0e8e97c50d 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -235,7 +235,7 @@ def make_klass(spec): if isinstance(item[1], attr._make._CountingAttr): newfields[item[0]] = item[1] else: - newfields[item[0]] = attr.ib(repr=False, type=item[1]) + newfields[item[0]] = attr.ib(type=item[1]) else: if ( any([isinstance(ii, attr._make._CountingAttr) for ii in item])