diff --git a/niworkflows/engine/tests/test_plugin.py b/niworkflows/engine/tests/test_plugin.py new file mode 100644 index 00000000000..995cc79c7e0 --- /dev/null +++ b/niworkflows/engine/tests/test_plugin.py @@ -0,0 +1,92 @@ +import logging +from types import SimpleNamespace + +import pytest +from nipype.pipeline import engine as pe +from nipype.interfaces import utility as niu + +from ..plugin import MultiProcPlugin + + +def add(x, y): + return x + y + + +def addall(inlist): + import time + + time.sleep(0.2) # Simulate some work + return sum(inlist) + + +@pytest.fixture +def workflow(tmp_path): + workflow = pe.Workflow(name="test_wf", base_dir=tmp_path) + + inputnode = pe.Node(niu.IdentityInterface(fields=["x", "y"]), name="inputnode") + outputnode = pe.Node(niu.IdentityInterface(fields=["z"]), name="outputnode") + + # Generate many nodes and claim a lot of memory + add_nd = pe.MapNode( + niu.Function(function=add, input_names=["x", "y"], output_names=["z"]), + name="add", + iterfield=["x"], + mem_gb=0.8, + ) + + # Regular node + sum_nd = pe.Node(niu.Function(function=addall, input_names=["inlist"]), name="sum") + + # Run without submitting is another code path + add_more_nd = pe.Node( + niu.Function(function=add, input_names=["x", "y"], output_names=["z"]), + name="add_more", + run_without_submitting=True, + ) + + workflow.connect( + [ + (inputnode, add_nd, [("x", "x"), ("y", "y")]), + (add_nd, sum_nd, [("z", "inlist")]), + (sum_nd, add_more_nd, [("out", "x")]), + (inputnode, add_more_nd, [("y", "y")]), + (add_more_nd, outputnode, [("z", "z")]), + ] + ) + + inputnode.inputs.x = list(range(30)) + inputnode.inputs.y = 4 + + # Avoid unnecessary sleeps + workflow.config["execution"]["poll_sleep_duration"] = 0 + + return workflow + + +def test_plugin_defaults(workflow, caplog): + """Test the plugin works without any arguments.""" + caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + workflow.run(plugin=MultiProcPlugin()) + + +def test_plugin_args_noconfig(workflow, caplog): + """Test the plugin works with typical nipype arguments.""" + caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + workflow.run( + plugin=MultiProcPlugin(), + plugin_args={"n_procs": 2, "memory_gb": 0.1}, + ) + + +def test_plugin_app_config(workflow, caplog): + """Test the plugin works with a nipreps-style configuration.""" + app_config = SimpleNamespace( + environment=SimpleNamespace(total_memory_gb=1), + _process_initializer=lambda x: None, + file_path='/does/not/need/to/exist/for/testing', + ) + caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + workflow.run( + plugin=MultiProcPlugin(), + plugin_args={"n_procs": 2, "app_config": app_config}, + )