-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dsub and multiprocessing #268
Comments
Hi @jacekzkominek , Sorry to hear about the battle with I don't have a lot of insight into issues with the I do like that you attempted to use fewer processes than cores. That's one thing that has cropped up before where it helps to leave an "idle" core to service system needs. The only other bits of help I think I can offer:
A few years back, we had some workflows that for some inputs would run longer than 24 hours. We wanted to make sure that they didn't get preempted at 24 hours and retried (since they'd fail again), so we used the linux timeout command to generate a hard (non-preemption) failure. It looked something like:
(We actually wanted to kill it at 23 1/2 hours since delocalization could take 1/2 hour.) I mention the using "timeout" as "hanging" means sitting idle, burning CPU/memory/disk hours. Not sure if you can time bound your commands. |
Thank you for the response and insights @mbookman! As to your questions/points: The one thing where I think I have seen some impact was reducing the number of analyses run within a multiprocessing pool. I have 100 analyses per job, and my testing showed that running only 20 of them is much less likely to hang than, say running 50 or all 100 of them. So, I tried running a few smaller pools sequentially to get around that, but the issue was still present... |
Just a small follow-up, I experimented with the base image and Python versions, and got some improvement by switching from google/cloud-sdk (which has Python 3.9) to ubuntu:23.04 (which has Python 3.11), but still ~5% of the jobs keep freezing. I guess it's just something to accept and either implement a hard timeout or simply run things sequentially. I also tried switching from standard Python multiprocessing to pathos.multiprocessing and joblib, but no cigar either. |
@lm-jkominek I know
#!/bin/bash
PROGRAM_STATUS=''
retries=0
MAX_RETRIES=$1
SECONDS_BETWEEN_STATUS_CHECKS=$2
LOOPS=$3
while [ $retries -le $MAX_RETRIES ]
do
PROGRAM_STATUS=`python program_with_status.py $SECONDS_BETWEEN_STATUS_CHECKS $LOOPS`
if [[ "$PROGRAM_STATUS" == "success" ]]; then
echo $PROGRAM_STATUS
exit 0
fi
((retries++))
done
echo $PROGRAM_STATUS
#!/bin/bash
command=$1
loop=$2
counter=0
if [[ "$command" == "loop" ]]; then
while [ $counter -le $loop ]
do
#echo $counter
((counter++))
done
fi
import os
import psutil
import signal
import sys
import time
seconds_between_checks = int( sys.argv[1] )
loops = sys.argv[2]
# This creates a fork
pid = os.fork()
def is_defunct( pid ):
proc = psutil.Process(pid)
if proc.status() == psutil.STATUS_ZOMBIE:
return True
return False
def retry_or_not(pid, checks, max_checks):
if psutil.pid_exists(pid):
if is_defunct( pid ):
try:
os.kill(pid, signal.SIGKILL)
return "success"
except:
return "success"
if checks >= max_checks:
try:
os.kill(pid, signal.SIGKILL)
except:
return "retry"
return "retry"
return "continue"
else:
return "success"
# This is the parent process
if pid > 0 :
#print("This is the parent process checking on the child...")
#print("Process ID:", os.getpid())
#print("Child's process ID:", pid)
child_pid = pid
max_checks = 3
checks = 0
child_status = ""
while( True ):
time.sleep( seconds_between_checks )
child_status = retry_or_not( child_pid, checks, max_checks )
#print( ['child_status', child_status])
if child_status.startswith("success"):
print( child_status )
sys.exit()
checks = checks + 1
if checks > max_checks:
print( child_status )
sys.exit()
# This is a child process
else :
#print("\nThis is the child process:")
#print("Process ID:", os.getpid())
#print("Parent's process ID:", os.getppid())
args = ("loop", str(loops) )
program = './test.sh'
os.execlp( program, program, *args) You can launch your program like this:
Examples: $ ./wrapper.sh 3 3 1000
success
$
$ ./wrapper.sh 3 3 1000000
retry
$ Feel free to modify and adapt as necessary. Hope it helps, |
Thank you @pgrosu, @mbookman , appreciate the scripts! They're helpful, even if they don't directly address the freezes I'm been seeing. After trying multiple parallel libs (pathos, joblib, concurrent.futures), I also tried rewriting the code under the I'm curious about something thought - is there a bandwidth/transfer limitations in either dsub or Google Cloud? My jobs write their results (~10MBs each) to a Google Cloud bucket through an |
Hi @lm-jkominek, It could be, but I doubt it based on the quotas listed here -- and knowing the throughput of GoogleStorage (and different type of errors that would be reported otherwise): https://cloud.google.com/batch/quotas https://cloud.google.com/life-sciences/quotas https://github.com/DataBiosphere/dsub/blob/main/docs/compute_quotas.md So spawning starts a whole new process, which is heavier resource-wise as compared to fork, which is within the same process, as described here. This is partially the reason I wrote the Python/wrapper scripts as such, so that it retries even under heavy loads. In fact, you can adapt the Python script to check on the operation that you can get via dstat if that provides better granularity. In any case, just out of curiosity, why do you think my scripts do not directly address the freezing issue? The Python and wrapper scripts can be adapted to check and retry. So even under heavy parallelism you can serialize them where some can wait if too many files are being updated. Have you tried a binary search on the number of submitted jobs -- such as 500, 250 or 125 jobs -- to see if you experience the same issue? You have to remember most of the reading and writing is network-based and thus is best-effort, which is why retries are necessary during higher throughput. Hope it helps, |
Hi, I am using dsub to submit a bunch of jobs to GCE, which run inside a docker image and use the Python's
multiprocessing
library for further parallelization inside of that. The jobs themselves run fine, but there is a certain number (anywhere between 10-20%) that finish up, but freeze at the very end, and just keep endlessly logging out without actually terminating, e.g.:2023-08-04 13:16:19 INFO: gsutil -h Content-Type:text/plain -mq cp /tmp/continuous_logging_action/stdout gs://bucket/outdir/test/test_dir/003/log/test---user--230804-130707-40.1-stdout.log
What's weird is that this behavior is non-deterministic - I can submit the same jobs multiple time and each round different jobs will freeze, so it's not an issue with the input data. I know my test case is good in general, because the jobs all run fine if I skip the internal multiprocessing altogether and just run all the pieces subsequently, it just takes longer, obviously.
I found a kind of similar issue here from a few years back that related to delocalization (#165) but I run some tests, added more memory and varied the CPU counts, to no avail, so it's not an issue with the executing VM.
My jobs use the
logger
lib for console outputs, and there is apparently a well-known issue with using that and multiprocessing, but I've commented all that logging stuff out, and it didn't help. I've tried reducing the size of the multiprocessing pool (e.g. 6 processes on an 8-core VM), and no change either.Any insights or experiences that could help my situation would be GREATLY appreciated!
The text was updated successfully, but these errors were encountered: