Simple package for simple SGE job submission & monitoring
Via conda
conda install "dill>=0.3" "pathos>=0.2.8" "tqdm>=4"
From Github
pip install git+https://github.com/nick-youngblut/sgepy.git
conda install pytest
pytest -s
sgepy.Worker(kwargs=dict(), pkgs=[], threads=1, time='00:59:00', mem=6, gpu=0,
max_attempts=3, conda_env='snakemake',
tmp_dir='/ebio/abt3_projects/temp_data/', keep_tmp=False,
parallel_env='parallel', verbose=False)
kwargs
- Keyword arguments passed to the function
pkgs
- Python packages used within any functions passed to the worker object
- e.g.,
pkgs=['time']
iftime.sleep()
is used in the function
threads
- Number of parallel processes for the SGE job
time
- Time resource parameter. Format = HH:MM:SS
- Resource escalation possible (see below)
mem
- Memory resource (Gb)
- Note: the memory is multiplied by the number of threads
- Resource escalation possible (see below)
- Memory resource (Gb)
gpu
- Use a gpu?
0 = No; 1 = Yes
- Use a gpu?
max_attempts
- Number of times to re-submit the job
- This only makes sense if resource escalation is used (see below)
- Number of times to re-submit the job
conda_env
- Which conda env to use for the cluster job?
- Use
base
for the "standard" conda env
tmp_dir
- Temporary file path
- Note: by default, the directory will be deleted after the job is complete
keep_tmp
- Keep the temporary file path?
- This is useful for debugging
verbose
- Verbose output?
For time
and mem
, resource escalation can be used, similar to
Snakemake.
To use resource escalation, provide a function for the time
or mem
parameters.
For example:
time = lambda attempt, threads: 8 * attempt ** 3
# time increases by 8 * $ATTEMPT ** 3 for each job submission (attempt)
The function must have two arguments: attempt
and threads
attempt
- The job attempt count (see
max_attempts
)
- The job attempt count (see
threads
- The number of threads used for the job
- This can be useful for
mem
so that the resource can be set based on the number of threads used
Worker object that will run a function as a job on the cluster. The object will monitor the cluster job and can re-run the job if resourse escalation is used (see above).
worker_object(function, x)
function
- function to run as an SGE job
x
- varible passed to the function as the first parameter
sgepy.Pool(njobs=1, kwargs=dict(), pkgs=[], threads=1, time='00:59:00', mem=6, gpu=0,
max_attempts=3, conda_env='snakemake',
tmp_dir='/ebio/abt3_projects/temp_data/', keep_tmp=False,
parallel_env='parallel', verbose=False)
njobs
- Number of jobs to submit in parallel
- i.e., number of parallel workers
- Other parameters
- See the
Worker
class (above)
- See the
pool
object, similar to that generated by the multiprocessing.Pool()
class
Usage is similar to the multiprocessing.Pool()
class.
pool.map(function, x)
function
- Function run in each cluster job
x
- Iterable; each value will be processed by an independent cluster job
- Each value will be processed by the user-provided
function
Using a simple lambda function
import sgepy
func = lambda x: [x**2 for x in range(5)]
w = sgepy.Workerverbose=True)
w(func, 2)
Test with keyword arguments and package dependencies
import sgepy
# simple function
def func1(x, y=1, z=2):
time.sleep(x)
return x * y * z
# cluster worker
kwargs = {'y' : 2, 'z' : 3}
pkgs = ['time']
w = sgepy.Worker(tmp_dir=args.tmp_dir, kwargs=kwargs, pkgs=pkgs, verbose=True)
w(func1, 1)
Using the multiprocessing.Pool()
functionality
import sgepy
# simple function (requires import of a package)
def func1(x, y=1, z=2):
time.sleep(x)
return x * y * z
# map call
kwargs = {'y' : 2, 'z' : 2}
pkgs = ['time']
p = sgepy.Pool(tmp_dir=args.tmp_dir, kwargs=kwargs, pkgs=pkgs, n_jobs=2, verbose=True)
p.map(func1, [1,5])