Skip to content

Commit

Permalink
Merge pull request #375 from djarecka/fix/task_memory
Browse files Browse the repository at this point in the history
fixing the memory issue for workflows with a big splitters (closes #371)
  • Loading branch information
djarecka authored Nov 7, 2020
2 parents 7cb76c2 + 37c0059 commit 581545c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
2 changes: 0 additions & 2 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def __init__(

# checking if metadata is set properly
self.inputs.check_metadata()
self.state_inputs = inputs
# dictionary to save the connections with lazy fields
self.inp_lf = {}
self.state = None
Expand Down Expand Up @@ -481,7 +480,6 @@ def split(self, splitter, overwrite=False, **kwargs):
)
if kwargs:
self.inputs = attr.evolve(self.inputs, **kwargs)
self.state_inputs = kwargs
if not self.state or splitter != self.state.splitter:
self.set_state(splitter)
return self
Expand Down
65 changes: 65 additions & 0 deletions pydra/engine/tests/test_profiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from ..core import Workflow
from ..helpers import load_task
from ... import mark

import numpy as np
from pympler import asizeof
from pytest import approx


def generate_list(l):
return np.arange(l).tolist()


@mark.task
def show_var(a):
return a


def create_wf(size):
wf = Workflow(name="wf", input_spec=["x"])
wf.split("x", x=generate_list(size))
wf.add(show_var(name="show", a=wf.lzin.x))
wf.set_output([("out", wf.show.lzout.out)])
wf.state.prepare_states(wf.inputs)
wf.state.prepare_inputs()
return wf


def test_wf_memory():
"""creating two workflow with relatively big splitter: 1000, 2000 and 4000 elements
testings if the size of workflow grows linearly
"""

wf_1000 = create_wf(size=1000)
wf_1000_mem = asizeof.asizeof(wf_1000)

wf_2000 = create_wf(size=2000)
wf_2000_mem = asizeof.asizeof(wf_2000)

wf_4000 = create_wf(size=4000)
wf_4000_mem = asizeof.asizeof(wf_4000)
# checking if it's linear with the size of the splitter
# check print(asizeof.asized(wf_4000, detail=2).format()) in case of problems
assert wf_4000_mem / wf_2000_mem == approx(2, 0.05)
assert wf_2000_mem / wf_1000_mem == approx(2, 0.05)


def test_load_task_memory():
"""creating two workflow with relatively big splitter: 1000 and 4000 elements
testings if load_task for a single element returns tasks of a similar size
"""

wf_1000 = create_wf(size=1000)
wf_1000_pkl = wf_1000.pickle_task()
wf_1000_loaded = load_task(task_pkl=wf_1000_pkl, ind=1)
wf_1000_single_mem = asizeof.asizeof(wf_1000_loaded)

wf_4000 = create_wf(size=4000)
wf_4000_pkl = wf_4000.pickle_task()
wf_4000_loaded = load_task(task_pkl=wf_4000_pkl, ind=1)
wf_4000_single_mem = asizeof.asizeof(wf_4000_loaded)

# checking if it doesn't change with size of the splitter
# check print(asizeof.asized(wf_4000_loaded, detail=2).format()) in case of problems
assert wf_1000_single_mem / wf_4000_single_mem == approx(1, 0.05)
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ test_requires =
python-dateutil
tornado
boutiques
pympler
packages = find:
include_package_data = True

Expand Down Expand Up @@ -74,6 +75,7 @@ test =
python-dateutil
tornado
boutiques
pympler
tests =
%(test)s
dev =
Expand Down

0 comments on commit 581545c

Please sign in to comment.