From db3083f664e302b8e23c441c2b40d33c1fffab1e Mon Sep 17 00:00:00 2001 From: Dorota Jarecka <djarecka@gmail.com> Date: Fri, 6 Nov 2020 22:11:51 -0500 Subject: [PATCH 1/2] removing state_inputs from Task class (was not used anymore), and fixing a memory issue for wf witha big splitter; adding a test for it together with an extra library for testing: pympler --- pydra/engine/core.py | 2 -- pydra/engine/tests/test_profiles.py | 39 +++++++++++++++++++++++++++++ setup.cfg | 2 ++ 3 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 pydra/engine/tests/test_profiles.py diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 266788908c..f5490cef83 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -163,7 +163,6 @@ def __init__( # checking if metadata is set properly self.inputs.check_metadata() - self.state_inputs = inputs # dictionary to save the connections with lazy fields self.inp_lf = {} self.state = None @@ -481,7 +480,6 @@ def split(self, splitter, overwrite=False, **kwargs): ) if kwargs: self.inputs = attr.evolve(self.inputs, **kwargs) - self.state_inputs = kwargs if not self.state or splitter != self.state.splitter: self.set_state(splitter) return self diff --git a/pydra/engine/tests/test_profiles.py b/pydra/engine/tests/test_profiles.py new file mode 100644 index 0000000000..dd758b8d0b --- /dev/null +++ b/pydra/engine/tests/test_profiles.py @@ -0,0 +1,39 @@ +from ..core import Workflow +from ..helpers import load_task +from ... import mark + +import numpy as np +from pympler import asizeof + + +def test_load_task_memory(): + """creating two workflow with relatively big splitter: 1000 and 4000 elements + testings if load_task for a single element returns tasks of a similar size + """ + + def generate_list(l): + return np.arange(l).tolist() + + @mark.task + def show_var(a): + return a + + def create_wf_pkl(size): + wf = Workflow(name="wf", input_spec=["x"]) + wf.split("x", x=generate_list(size)) + wf.add(show_var(name="show", a=wf.lzin.x)) + wf.set_output([("out", wf.show.lzout.out)]) + wf.state.prepare_states(wf.inputs) + wf.state.prepare_inputs() + wf_pkl = wf.pickle_task() + return wf_pkl + + wf_1000_pkl = create_wf_pkl(size=1000) + wf_1000_loaded = load_task(task_pkl=wf_1000_pkl, ind=1) + wf_1000_single_mem = asizeof.asizeof(wf_1000_loaded) + + wf_4000_pkl = create_wf_pkl(size=4000) + wf_4000_loaded = load_task(task_pkl=wf_4000_pkl, ind=1) + wf_4000_single_mem = asizeof.asizeof(wf_4000_loaded) + + assert abs(wf_1000_single_mem - wf_4000_single_mem) / wf_1000_single_mem < 0.1 diff --git a/setup.cfg b/setup.cfg index 029433c26a..a560fa5fe7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,6 +41,7 @@ test_requires = python-dateutil tornado boutiques + pympler packages = find: include_package_data = True @@ -74,6 +75,7 @@ test = python-dateutil tornado boutiques + pympler tests = %(test)s dev = From 37c005950df9a84368f0ac61b9e6efb572442d30 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka <djarecka@gmail.com> Date: Sat, 7 Nov 2020 09:57:33 -0500 Subject: [PATCH 2/2] adding additional test that checks the wf size --- pydra/engine/tests/test_profiles.py | 66 ++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/pydra/engine/tests/test_profiles.py b/pydra/engine/tests/test_profiles.py index dd758b8d0b..11c09f6c10 100644 --- a/pydra/engine/tests/test_profiles.py +++ b/pydra/engine/tests/test_profiles.py @@ -4,36 +4,62 @@ import numpy as np from pympler import asizeof +from pytest import approx -def test_load_task_memory(): - """creating two workflow with relatively big splitter: 1000 and 4000 elements - testings if load_task for a single element returns tasks of a similar size +def generate_list(l): + return np.arange(l).tolist() + + +@mark.task +def show_var(a): + return a + + +def create_wf(size): + wf = Workflow(name="wf", input_spec=["x"]) + wf.split("x", x=generate_list(size)) + wf.add(show_var(name="show", a=wf.lzin.x)) + wf.set_output([("out", wf.show.lzout.out)]) + wf.state.prepare_states(wf.inputs) + wf.state.prepare_inputs() + return wf + + +def test_wf_memory(): + """creating two workflow with relatively big splitter: 1000, 2000 and 4000 elements + testings if the size of workflow grows linearly """ - def generate_list(l): - return np.arange(l).tolist() + wf_1000 = create_wf(size=1000) + wf_1000_mem = asizeof.asizeof(wf_1000) + + wf_2000 = create_wf(size=2000) + wf_2000_mem = asizeof.asizeof(wf_2000) - @mark.task - def show_var(a): - return a + wf_4000 = create_wf(size=4000) + wf_4000_mem = asizeof.asizeof(wf_4000) + # checking if it's linear with the size of the splitter + # check print(asizeof.asized(wf_4000, detail=2).format()) in case of problems + assert wf_4000_mem / wf_2000_mem == approx(2, 0.05) + assert wf_2000_mem / wf_1000_mem == approx(2, 0.05) - def create_wf_pkl(size): - wf = Workflow(name="wf", input_spec=["x"]) - wf.split("x", x=generate_list(size)) - wf.add(show_var(name="show", a=wf.lzin.x)) - wf.set_output([("out", wf.show.lzout.out)]) - wf.state.prepare_states(wf.inputs) - wf.state.prepare_inputs() - wf_pkl = wf.pickle_task() - return wf_pkl - wf_1000_pkl = create_wf_pkl(size=1000) +def test_load_task_memory(): + """creating two workflow with relatively big splitter: 1000 and 4000 elements + testings if load_task for a single element returns tasks of a similar size + """ + + wf_1000 = create_wf(size=1000) + wf_1000_pkl = wf_1000.pickle_task() wf_1000_loaded = load_task(task_pkl=wf_1000_pkl, ind=1) wf_1000_single_mem = asizeof.asizeof(wf_1000_loaded) - wf_4000_pkl = create_wf_pkl(size=4000) + wf_4000 = create_wf(size=4000) + wf_4000_pkl = wf_4000.pickle_task() wf_4000_loaded = load_task(task_pkl=wf_4000_pkl, ind=1) wf_4000_single_mem = asizeof.asizeof(wf_4000_loaded) - assert abs(wf_1000_single_mem - wf_4000_single_mem) / wf_1000_single_mem < 0.1 + # checking if it doesn't change with size of the splitter + # check print(asizeof.asized(wf_4000_loaded, detail=2).format()) in case of problems + assert wf_1000_single_mem / wf_4000_single_mem == approx(1, 0.05)