Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive scaling and dask-jobqueue goes into endless loop when a job launches several worker processes (was: Different configs result in worker death) #498

Open
AlecThomson opened this issue May 18, 2021 · 14 comments
Labels
bug Something isn't working usage question Question about using jobqueue
Milestone

Comments

@AlecThomson
Copy link

What happened:
(Reposting from SO)

I'm using Dask Jobequeue on a Slurm supercomputer (I'll note that this is also a Cray machine). My workload includes a mix of threaded (i.e. numpy) and python workloads, so I think a balance of threads and processes would be best for my deployment (which is the default behaviour). However, in order for my jobs to run I need to use this basic configuration:

cluster = SLURMCluster(cores=20,
                    processes=1,
                    memory="60GB",
                    walltime='12:00:00',
                    ...
                    )
cluster.adapt(minimum=0, maximum=20)
client = Client(cluster)

which is entirely threaded. The tasks also seem to take longer than I would naively expect (a large part of this is a lot of file reading/writing). Switching to purely processes, i.e.

cluster = SLURMCluster(cores=20,
                    processes=20,
                    memory="60GB",
                    walltime='12:00:00',
                    ...
                    )

results in slurm jobs which are immediately killed by Slurm as they are launched, with the only output like:

slurmstepd: error: *** JOB 11116133 ON nid00201 CANCELLED AT 2021-04-29T17:23:25 ***

Choosing a balanced configuration (i.e. default)

cluster = SLURMCluster(cores=20,
                    memory="60GB",
                    walltime='12:00:00',
                    ...
                    )

results in a strange intermediate behaviour. The task will run near to completion (i.e. 900/1000 work tasks) then a number of the workers will be killed, and the progress will drop back down to, say, 400/1000 tasks.

Further, I've found that using cluster.scale, rather than cluster.adapt, results in a successful run of the work. Perhaps the issue here is how adapt is trying to scale the number of jobs?

What you expected to happen:
I would expect that changing the balance of processes / threads shouldn't change the lifetime of a worker.

Anything else we need to know?:
Possibly related to #20 and #363

As an aside, the current configuration of processes / threads confusing, and seems to conflict with how e.g. a LocalCluster is specified. Is there any progress on #231?

Environment:

  • Dask version: 2021.4.1
  • Python version: 3.8.8
  • Operating System: SUSE Linux Enterprise Server 12 SP3
  • Install method (conda, pip, source): conda
@jacobtomlinson
Copy link
Member

This seems to have an answer on SO so I'm going to close this out here.

@AlecThomson
Copy link
Author

Hi @jacobtomlinson - sorry for not being more transparent, I recreated this question here even with that answer in place. I don't think the current answer addresses the problem, which lead me to repost here. The suggestion on SO is that it is a Slurm issue. However, through my testing I've found that the jobs are not cancelled to due Slurm, and that the resource requests remain the same. Rather, it's the balance between threads and processes that changes (or using scale rather than adapt, and that change results in the workers being stopped early.

@guillaumeeb
Copy link
Member

Hi @AlecThomson,

Sorry for the delay. This seems a complicated issue, the problem is probably coming from a mix of how Slurm is setup on your system, adaptive scaling, dask-jobqueue, and your workflow.

First, simple question: do you really need adaptive scaling? This is a great feature, but it introduces a more complex way of managing resources, your workers (and jobs) are probably stopped and restarted several times, which Slurm may not like, This might be the cause of the error you indicated on SO:

slurm_load_jobs error: Invalid job id specified

Changing the balance of processes / threads might very well change the lifetime of a worker, for several reasons:

  • Memory management: if you launch several processes, each will take only a fraction of the available memory: 20 processes with 60GB memory means only 3GB of memory per process, so if your workflow is a bit memory bound, they might get killed early.
  • The way that Slurm handles job resources : sometimes job scheduler are setup to only authorise one process to run inside a reservation, depending on how resources are asked for. It is true that dask-jobqueue don't use srun command, which is useful to specify some more information to Slurm.

To investigate futher, we would need more information, like stderr/stdout of one worker job, the job_script() output, and ideally a reproducible example. See https://jobqueue.dask.org/en/latest/debug.html.

@guillaumeeb guillaumeeb reopened this Jun 5, 2021
@AlecThomson
Copy link
Author

AlecThomson commented Jun 9, 2021

Hi @guillaumeeb,

Thanks very much for following up!

First, simple question: do you really need adaptive scaling?

I would very much like to make use of the features, if possible. First, I'm on a highly-subscribed system, so its possible jobs could be sitting in the queue for some time. My understanding is that using the lifetime and liftime-stagger, in combination with adaptive scaling, is a neat workaround for preventing timeouts in the queue. Second, related to the first, I want to minimise my footprint on the system, so scaling up and down would seem to be an efficient use of resources.

On memory management, this would not seem to be the case. I've tested (using the example below) purposely exceeding the memory limit, which results in a clear output:

distributed.worker - WARNING - Worker is at 95% memory usage. Pausing worker.  Process memory: 2.66 GiB -- Worker memory limit: 2.79 GiB

followed by

slurmstepd: error: Detected 1 oom-kill event(s) in StepId=11331832.0 cgroup. Some of your processes may have been killed by the cgroup out-of-memory handler.
srun: Job step aborted: Waiting up to 32 seconds for job step to finish.

As requested, I've put together an example that reproduces my issue:

import time
from dask import delayed
from dask.distributed import Client, progress, LocalCluster
from dask_jobqueue import SLURMCluster
import numpy as np

def inner_job(i):
    return i+1


@delayed
def job(x):
    time.sleep(1)
    y = inner_job(x)
    # Enable for OOM error
    #large_arr = np.zeros([100000,10000]) # 8GB
    #y = y + large_arr
    return y


def main(client):
    njobs = int(1000)
    outputs = []
    for i in range(njobs):
        output = job(i)
        outputs.append(output)

    results = client.persist(outputs)
    print("Running test...")
    progress(results)


def cli():
    cluster = SLURMCluster(
        # Set up for Galaxy
        cores=20,
        # processes=1, 
        processes=20,
        name='spice-worker',
        memory="60GB",
        project='askap',
        queue='workq',
        walltime='12:00:00',
        job_extra=['-M galaxy'],
        # interface for the workers
        interface="ipogif0",
        log_directory='logs',
        python='srun -n 1 -c 20 python',
        extra=[
                "--lifetime", "11h",
                "--lifetime-stagger", "5m",
               ],
        death_timeout=300,
        local_directory='/dev/shm'
        )
    print('Submitted scripts will look like: \n', cluster.job_script())
    # cluster.adapt(maximum_jobs=2)
    cluster.scale(jobs=2)
    client = Client(cluster)

    main(client)


if __name__ == "__main__":
    cli()

I've tested this using several configurations:

  • processes=1 vs processes=20 vs processes=10
  • cluster.adapt(maximum_jobs=2) vs cluster.scale(jobs=2)

Using cluster.scale results in successful completion, and the following outputs:

  • Using threads:
Submitted scripts will look like: 
 #!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e logs/dask-worker-%J.err
#SBATCH -o logs/dask-worker-%J.out
#SBATCH -p workq
#SBATCH -A askap
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=56G
#SBATCH -t 12:00:00
#SBATCH -M galaxy
module unload askapsoft
module load askapsoft/1.1.0
unset PYTHONPATH
source /home/$(whoami)/.bashrc
conda activate spice
srun -n 1 -c 20 python -m distributed.cli.dask_worker tcp://10.128.0.11:39640 --nthreads 20 --memory-limit 55.88GiB --name dummy-name --nanny --death-timeout 300 --local-directory /dev/shm --lifetime 11h --lifetime-stagger 5m --interface ipogif0 --protocol tcp://

Running test...
[########################################] | 100% Completed | 42.2s
  • Using processes:
Submitted scripts will look like: 
 #!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e logs/dask-worker-%J.err
#SBATCH -o logs/dask-worker-%J.out
#SBATCH -p workq
#SBATCH -A askap
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=56G
#SBATCH -t 12:00:00
#SBATCH -M galaxy
module unload askapsoft
module load askapsoft/1.1.0
unset PYTHONPATH
source /home/$(whoami)/.bashrc
conda activate spice
srun -n 1 -c 20 python -m distributed.cli.dask_worker tcp://10.128.0.11:42596 --nthreads 1 --nprocs 20 --memory-limit 2.79GiB --name dummy-name --nanny --death-timeout 300 --local-directory /dev/shm --lifetime 11h --lifetime-stagger 5m --interface ipogif0 --protocol tcp://

Running test...
[########################################] | 100% Completed | 37.9s

Using cluster.adapt, however, only using threads works:

Submitted scripts will look like: 
 #!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e logs/dask-worker-%J.err
#SBATCH -o logs/dask-worker-%J.out
#SBATCH -p workq
#SBATCH -A askap
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=56G
#SBATCH -t 12:00:00
#SBATCH -M galaxy
module unload askapsoft
module load askapsoft/1.1.0
unset PYTHONPATH
source /home/$(whoami)/.bashrc
conda activate spice
srun -n 1 -c 20 python -m distributed.cli.dask_worker tcp://10.128.0.11:40178 --nthreads 20 --memory-limit 55.88GiB --name dummy-name --nanny --death-timeout 300 --local-directory /dev/shm --lifetime 11h --lifetime-stagger 5m --interface ipogif0 --protocol tcp://

Running test...
[########################################] | 100% Completed | 43.5s

Using anything other than processes=1 results in jobs that are immediately cancelled as they are submitted. The system appears to be stuck in a loop of creating jobs and then killing them. During these runs no output .err or .out logs are created by the workers. Interestingly, using this example, I was able to capture information using scontrol show jobid {jobid}. For a run using processes=20 the output was:

JobId=11331803 JobName=dask-worker
   UserId=athomson(24456) GroupId=athomson(24456) MCS_label=N/A
   Priority=5317 Nice=0 Account=askap QOS=normal
   JobState=CANCELLED Reason=None Dependency=(null)
   Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0
   RunTime=00:00:01 TimeLimit=12:00:00 TimeMin=N/A
   SubmitTime=11:22:30 EligibleTime=11:22:30
   AccrueTime=11:22:30
   StartTime=11:22:32 EndTime=11:22:33 Deadline=N/A
   SuspendTime=None SecsPreSuspend=0 LastSchedEval=11:22:32
   Partition=workq AllocNode:Sid=nid00010:30166
   ReqNodeList=(null) ExcNodeList=(null)
   NodeList=nid00218
   BatchHost=nid00218
   NumNodes=1 NumCPUs=40 NumTasks=1 CPUs/Task=20 ReqB:S:C:T=0:0:*:1
   TRES=cpu=40,mem=56G,node=1,billing=20
   Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=*
   MinCPUsNode=20 MinMemoryNode=56G MinTmpDiskNode=0
   Features=(null) DelayBoot=00:00:00
   OverSubscribe=NO Contiguous=0 Licenses=(null) Network=(null)
   Command=/tmp/tmpiymckuhh.sh
   WorkDir=/group/askap/athomson/repos/spiceracs/dask_tests/adapt_procs
   StdErr=/group/askap/athomson/repos/spiceracs/dask_tests/adapt_procs/logs/dask-worker-%J.err
   StdIn=/dev/null
   StdOut=/group/askap/athomson/repos/spiceracs/dask_tests/adapt_procs/logs/dask-worker-%J.out
   Power=
   NtasksPerTRES:0

The accompanying output from the control script was:

Submitted scripts will look like: 
 #!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e logs/dask-worker-%J.err
#SBATCH -o logs/dask-worker-%J.out
#SBATCH -p workq
#SBATCH -A askap
#SBATCH -n 1
#SBATCH --cpus-per-task=20
#SBATCH --mem=56G
#SBATCH -t 12:00:00
#SBATCH -M galaxy
module unload askapsoft
module load askapsoft/1.1.0
unset PYTHONPATH
source /home/$(whoami)/.bashrc
conda activate spice
srun -n 1 -c 20 python -m distributed.cli.dask_worker tcp://10.128.0.11:34206 --nthreads 1 --nprocs 20 --memory-limit 2.79GiB --name dummy-name --nanny --death-timeout 300 --local-directory /dev/shm --lifetime 11h --lifetime-stagger 5m --interface ipogif0 --protocol tcp://

Running test...
[                                        ] | 0% Completed |  1min  0.9s

At which point I stopped the job. The output looks exactly the same when using e.g. processes=10, save for the args parse to the dask_worker.

Let me know if you'd like any more info, or if there are any other diagnostic tests I can run.

EDIT: Accidentally left some extra arguments in env_extra. These are needed for my full-scale job, but not for the example. Removing this has no impact on the behaviour I've described.

@guillaumeeb
Copy link
Member

My understanding is that using the lifetime and lifetime-stagger, in combination with adaptive scaling, is a neat workaround for preventing timeouts in the queue

I'm not sure what you say by timeout in the queue. lifetime is for preventing reaching walltime when a job is run. A use case of Adaptive in combination to this is helping you for long running computations, especially when you're not sure of how many resources you'll have. But be careful, this is also really tricky to get right depending on your workflow.

Second, related to the first, I want to minimise my footprint on the system, so scaling up and down would seem to be an efficient use of resources

This I think is the principal interest of adaptive. I also often give the advice of using it only in interactive mode (not for batch).

Using anything other than processes=1 results in jobs that are immediately cancelled as they are submitted. The system appears to be stuck in a loop of creating jobs and then killing them.

OK, so in the end there is clearly a bug when using Adaptive scaling and dask-jobqueue when a job launches several worker processes. I've also seen this just yesterday. Adaptive scale with number of process, not number of jobs, and I think it leads to calls of scale that leads to the job submission and deletion loop.

Unfortunately, I have no time to dig this currently...

@andersy005 andersy005 added the usage question Question about using jobqueue label Oct 15, 2021
@guillaumeeb guillaumeeb added the bug Something isn't working label Aug 14, 2022
@jeiche
Copy link

jeiche commented Aug 24, 2022

My understanding is that using the lifetime and lifetime-stagger, in combination with adaptive scaling, is a neat workaround for preventing timeouts in the queue

I'm not sure what you say by timeout in the queue. lifetime is for preventing reaching walltime when a job is run. A use case of Adaptive in combination to this is helping you for long running computations, especially when you're not sure of how many resources you'll have. But be careful, this is also really tricky to get right depending on your workflow.

Second, related to the first, I want to minimise my footprint on the system, so scaling up and down would seem to be an efficient use of resources

This I think is the principal interest of adaptive. I also often give the advice of using it only in interactive mode (not for batch).

Using anything other than processes=1 results in jobs that are immediately cancelled as they are submitted. The system appears to be stuck in a loop of creating jobs and then killing them.

OK, so in the end there is clearly a bug when using Adaptive scaling and dask-jobqueue when a job launches several worker processes. I've also seen this just yesterday. Adaptive scale with number of process, not number of jobs, and I think it leads to calls of scale that leads to the job submission and deletion loop.

Unfortunately, I have no time to dig this currently...

We're currently experiencing the exact same issue with our Grid Engine cluster. Losing a job during the use of adaptive scaling with multiple processes (workers) per job results in permanent loss of workers; it seems that no additional job is submitted to replace the lost workers.

@guillaumeeb
Copy link
Member

Hi @jeiche, not sure it is the same issue since here we're talking about race condition and endless loop when using adaptive mode. We do see new jobs being launched, but almost immediately deleted, is that what you see too?

Anyway, it's a complicated issue to debug, we should look at SpecCluster in Adaptive code from distributed to fix this. Not sure if I'll have the time to try to understand the problem soon.

@guillaumeeb guillaumeeb added this to the 0.8.1 milestone Aug 30, 2022
@guillaumeeb guillaumeeb changed the title Different configs result in worker death Adaptive scaling and dask-jobqueue goes into endless loop when a job launches several worker processes (was: Different configs result in worker death) Aug 30, 2022
@jasonkena
Copy link

jasonkena commented Sep 1, 2022

@guillaumeeb I believe I found a solution to the problem (code). When adapt kills a worker, it calls scancel on the worker's job, inevitably killing other worker processes under the same job. To circumvent this, worker_key must be passed to Adaptive to force adapt to retire all workers under a job to kill a particular worker (JobQueueCluster should probably implement this by default). I also found specifying a higher value for interval to be helpful in preventing Dask from spawning/killing jobs every second.

Hope that helps.

@guillaumeeb
Copy link
Member

(JobQueueCluster should probably implement this by default)

That sounds really interesting!

@guillaumeeb
Copy link
Member

Okay, so I clearly reproduce the problem. Using process > 1 and adapt leads to an endless loop of starting a stopping workers and jobs. When activating debug mode, I see a lot of these messages:

...
DEBUG:Starting worker: spice-worker-1
...
DEBUG:Starting job: 31257350
DEBUG:Stopping worker: spice-worker-1 job: 31257350
...
DEBUG:Closed job 31257350
...
DEBUG:Starting worker: spice-worker-1
...
DEBUG:Starting job: 31257351
DEBUG:Stopping worker: spice-worker-1 job: 31257351
..
DEBUG:Closed job 31257351

@jasonkena suggestion modify this behavior, but both kwarg must be passed to adapt: worker_key and a higher interval. Using only worker_key is not sufficient. So there is probably something to fix.

@guillaumeeb
Copy link
Member

On-going investigation here, it seems that it's at the initialization from adaptive mode that the problem is, e.g. when starting the first worker process. The problem occurs when we launch adaptive without any minimum number of workers.

Using:

cluster.adapt(minimum_jobs=1, maximum_jobs=6)

Is also a workaround. But you'll always have at least one running job (which is not that bad).

@guillaumeeb
Copy link
Member

So if I'm not mistaken, I tracked down the problem to distributed adaptive code. It's a conjunction of two things:

So this needs to be fixed upstream.

@sgoodm
Copy link

sgoodm commented Jan 4, 2023

We just encountered this issue when utilizing adaptive scaling with Dask through Prefect (on a PBS Cluster rather than Slurm). We built on the solution from @jasonkena and the latest comment from @guillaumeeb to enforce the adaptive minimum to always be equal to the number of processes specified.

This seems like a fairly simple work around to handle the core issue, and can be used regardless of cluster type, etc. until potential upstream fixes such as dask/distributed#7019 are implemented.

@BrunoBelucci
Copy link

I cannot thank you guys enough for finally finding an answer to the bug I am experiencing. The solution was really difficult to track down, especially because we don't get any information about why the job is being cancelled, not even in DEBUG mode. I suggest at least adding a log message when canceling/starting a job to comply with adapt while the problem is not solved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working usage question Question about using jobqueue
Projects
None yet
Development

No branches or pull requests

8 participants