Skip to content

Commit

Permalink
LSF: Accept use_stdin in the constructor (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuarteberg authored and lesteve committed Oct 30, 2019
1 parent b4f94c0 commit be60856
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 17 deletions.
17 changes: 17 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import pytest

import dask_jobqueue.lsf


def pytest_addoption(parser):
parser.addoption(
Expand All @@ -27,3 +29,18 @@ def pytest_runtest_setup(item):
if envnames:
if item.config.getoption("-E") not in envnames:
pytest.skip("test requires env in %r" % envnames)


@pytest.fixture(autouse=True)
def mock_lsf_version(monkeypatch, request):
# Monkey-patch lsf_version() UNLESS the 'lsf' environment is selected.
# In that case, the real lsf_version() function should work.
markers = list(request.node.iter_markers())
if any("lsf" in marker.args for marker in markers):
return

try:
dask_jobqueue.lsf.lsf_version()
except OSError:
# Provide a fake implementation of lsf_version()
monkeypatch.setattr(dask_jobqueue.lsf, "lsf_version", lambda: "10")
2 changes: 1 addition & 1 deletion dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobqueue:
job-extra: []
log-directory: null
lsf-units: null
use-stdin: null
use-stdin: null # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh'

htcondor:
name: dask-worker
Expand Down
35 changes: 26 additions & 9 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
job_extra=None,
lsf_units=None,
config_name="lsf",
use_stdin=None,
**kwargs
):
if queue is None:
Expand All @@ -46,6 +47,12 @@ def __init__(
if lsf_units is None:
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % config_name)

if use_stdin is None:
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % config_name)
if use_stdin is None:
use_stdin = lsf_version() < "10"
self.use_stdin = use_stdin

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

Expand Down Expand Up @@ -97,7 +104,7 @@ def __init__(
logger.debug("Job script: \n %s" % self.job_script())

async def _submit_job(self, script_filename):
if use_stdin():
if self.use_stdin:
piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"]
return self._call(piped_cmd, shell=True)
else:
Expand Down Expand Up @@ -189,12 +196,29 @@ class LSFCluster(JobQueueCluster):
lsf_units : str
Unit system for large units in resource usage set by the
LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster.
use_stdin : bool
LSF's ``bsub`` command allows us to launch a job by passing it as an
argument (``bsub /tmp/jobscript.sh``) or feeding it to stdin
(``bsub < /tmp/jobscript.sh``). Depending on your cluster's configuration
and/or shared filesystem setup, one of those methods may not work,
forcing you to use the other one. This option controls which method
``dask-jobqueue`` will use to submit jobs via ``bsub``.
In particular, if your cluster fails to launch and the LSF log contains
an error message similar to the following:
.. code-block::
/home/someuser/.lsbatch/1571869562.66512066: line 8: /tmp/tmpva_yau8m.sh: No such file or directory
...then try passing ``use_stdin=True`` here or setting ``use-stdin: true``
in your ``jobqueue.lsf`` config section.
Examples
--------
>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFCluster(queue='general', project='DaskonLSF',
... cores=15, memory='25GB')
... cores=15, memory='25GB', use_stdin=True)
>>> cluster.scale(jobs=10) # ask for 10 jobs
>>> from dask.distributed import Client
Expand All @@ -211,13 +235,6 @@ class LSFCluster(JobQueueCluster):
config_name = "lsf"


def use_stdin():
if dask.config.get("jobqueue.lsf.use-stdin") is not None:
return dask.config.get("jobqueue.lsf.use-stdin")

return lsf_version() < "10"


@toolz.memoize
def lsf_version():
out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate()
Expand Down
39 changes: 32 additions & 7 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,41 @@ def test_adaptive_grouped(loop):

def test_config(loop):
with dask.config.set(
{"jobqueue.lsf.walltime": "00:02", "jobqueue.lsf.local-directory": "/foo"}
{
"jobqueue.lsf.walltime": "00:02",
"jobqueue.lsf.local-directory": "/foo",
"jobqueue.lsf.use-stdin": True,
}
):
with LSFCluster(loop=loop, cores=1, memory="2GB") as cluster:
assert "00:02" in cluster.job_script()
assert "--local-directory /foo" in cluster.job_script()
assert cluster._dummy_job.use_stdin


@pytest.mark.parametrize(
"config_value,constructor_value",
[
(None, False),
(None, True),
(True, None),
(False, None),
(True, False), # Constuctor overrides config
],
)
def test_use_stdin(loop, config_value, constructor_value):
"""
Verify that use-stdin is respected when passed via the
config OR the LSFCluster() constructor
"""
with dask.config.set({"jobqueue.lsf.use-stdin": config_value}):
with LSFCluster(
loop=loop, cores=1, memory="2GB", use_stdin=constructor_value
) as cluster:
if constructor_value is not None:
assert cluster._dummy_job.use_stdin == constructor_value
else:
assert cluster._dummy_job.use_stdin == config_value


def test_config_name_lsf_takes_custom_config():
Expand All @@ -244,6 +274,7 @@ def test_config_name_lsf_takes_custom_config():
"env-extra": [],
"log-directory": None,
"shebang": "#!/usr/bin/env bash",
"use-stdin": None,
}

with dask.config.set({"jobqueue.lsf-config-name": conf}):
Expand Down Expand Up @@ -306,9 +337,3 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit):

def test_lsf_unit_detection_without_file():
lsf_unit_detection_helper("kb", conf_text=None)


@pytest.mark.parametrize("stdin", [True, False])
def test_stdin(stdin):
with dask.config.set({"jobqueue.lsf.use-stdin": stdin}):
assert lsf.use_stdin() is stdin

0 comments on commit be60856

Please sign in to comment.