Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove slurm integration #1233

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions SLURM/README

This file was deleted.

135 changes: 0 additions & 135 deletions SLURM/demo_slurm.py

This file was deleted.

33 changes: 0 additions & 33 deletions SLURM/slurmStart.sh

This file was deleted.

164 changes: 43 additions & 121 deletions caiman/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,72 +100,38 @@ def extract_patch_coordinates(dims: tuple,

return list(map(np.sort, coords_flat)), shapes

def start_server(slurm_script: str = None, ipcluster: str = "ipcluster", ncpus: int = None) -> None:
def start_server(ipcluster: str = "ipcluster", ncpus: int = None) -> None:
"""
programmatically start the ipyparallel server

Args:
ncpus: int
ncpus
number of processors

ipcluster : str
ipcluster
ipcluster binary file name; requires 4 path separators on Windows. ipcluster="C:\\\\Anaconda3\\\\Scripts\\\\ipcluster.exe"
Default: "ipcluster"
"""
logger.info("Starting cluster...")
if ncpus is None:
ncpus = psutil.cpu_count()

if slurm_script is None:

if ipcluster == "ipcluster":
subprocess.Popen(f"ipcluster start -n {ncpus}", shell=True, close_fds=(os.name != 'nt'))
else:
subprocess.Popen(shlex.split(f"{ipcluster} start -n {ncpus}"),
shell=True,
close_fds=(os.name != 'nt'))
time.sleep(1.5)
# Check that all processes have started
client = ipyparallel.Client()
time.sleep(1.5)
while len(client) < ncpus:
sys.stdout.write(".") # Give some visual feedback of things starting
sys.stdout.flush() # (de-buffered)
time.sleep(0.5)
logger.debug('Making sure everything is up and running')
client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go
if ipcluster == "ipcluster":
subprocess.Popen(f"ipcluster start -n {ncpus}", shell=True, close_fds=(os.name != 'nt'))
else:
shell_source(slurm_script)
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
logger.debug([pdir, profile])
c = Client(ipython_dir=pdir, profile=profile)
ee = c[:]
ne = len(ee)
logger.info(f'Running on {ne} engines.')
c.close()
sys.stdout.write("start_server: done\n")


def shell_source(script: str) -> None:
""" Run a source-style bash script, copy resulting env vars to current process. """
# XXX This function is weird and maybe not a good idea. People easily might expect
# it to handle conditionals. Maybe just make them provide a key-value file
#introduce echo to indicate the end of the output
pipe = subprocess.Popen(f". {script}; env; echo 'FINISHED_CLUSTER'", stdout=subprocess.PIPE, shell=True)

env = dict()
while True:
line = pipe.stdout.readline().decode('utf-8').rstrip()
if 'FINISHED_CLUSTER' in line: # find the keyword set above to determine the end of the output stream
break
logger.debug("shell_source parsing line[" + str(line) + "]")
lsp = str(line).split("=", 1)
if len(lsp) > 1:
env[lsp[0]] = lsp[1]

os.environ.update(env)
pipe.stdout.close()

subprocess.Popen(shlex.split(f"{ipcluster} start -n {ncpus}"),
shell=True,
close_fds=(os.name != 'nt'))
time.sleep(1.5)
# Check that all processes have started
client = ipyparallel.Client()
time.sleep(1.5)
while len(client) < ncpus:
sys.stdout.write(".") # Give some visual feedback of things starting
sys.stdout.flush() # (de-buffered)
time.sleep(0.5)
logger.debug('Making sure everything is up and running')
client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go

def stop_server(ipcluster: str = 'ipcluster', pdir: str = None, profile: str = None, dview=None) -> None:
"""
Expand All @@ -185,61 +151,33 @@ def stop_server(ipcluster: str = 'ipcluster', pdir: str = None, profile: str = N
dview.terminate()
else:
logger.info("Stopping cluster...")
try:
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
is_slurm = True
except:
logger.debug('stop_server: not a slurm cluster')
is_slurm = False

if is_slurm:
if pdir is None and profile is None:
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
c = Client(ipython_dir=pdir, profile=profile)
ee = c[:]
ne = len(ee)
logger.info(f'Shutting down {ne} engines.')
c.close()
c.shutdown(hub=True)
shutil.rmtree('profile_' + str(profile))
try:
shutil.rmtree('./log/')
except:
logger.info('creating log folder') # FIXME Not what this means

files = glob.glob('*.log')
os.mkdir('./log')

for fl in files:
shutil.move(fl, './log/')

if ipcluster == "ipcluster":
proc = subprocess.Popen("ipcluster stop",
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))
else:
if ipcluster == "ipcluster":
proc = subprocess.Popen("ipcluster stop",
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))
else:
proc = subprocess.Popen(shlex.split(ipcluster + " stop"),
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))

line_out = proc.stderr.readline()
if b'CRITICAL' in line_out:
logger.info("No cluster to stop...")
elif b'Stopping' in line_out:
st = time.time()
logger.debug('Waiting for cluster to stop...')
while (time.time() - st) < 4:
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
else:
logger.error(line_out)
logger.error('**** Unrecognized syntax in ipcluster output, waiting for server to stop anyways ****')
proc = subprocess.Popen(shlex.split(ipcluster + " stop"),
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))

line_out = proc.stderr.readline()
if b'CRITICAL' in line_out:
logger.info("No cluster to stop...")
elif b'Stopping' in line_out:
st = time.time()
logger.debug('Waiting for cluster to stop...')
while (time.time() - st) < 4:
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
else:
logger.error(line_out)
logger.error('**** Unrecognized syntax in ipcluster output, waiting for server to stop anyways ****')

proc.stderr.close()
proc.stderr.close()

logger.info("stop_cluster(): done")

Expand All @@ -256,7 +194,6 @@ def setup_cluster(backend:str = 'multiprocessing',
'multiprocessing' - Use multiprocessing library
'ipyparallel' - Use ipyparallel instead (better on Windows?)
'single' - Don't be parallel (good for debugging, slow)
'SLURM' - Try to use SLURM batch system (untested, involved).
Most backends will try, by default, to stop a running cluster if
it is running before setting up a new one, or throw an error if
they find one.
Expand All @@ -273,7 +210,7 @@ def setup_cluster(backend:str = 'multiprocessing',


Returns:
c: ipyparallel.Client object; only used for ipyparallel and SLURM backends, else None
c: ipyparallel.Client object; only used for ipyparallel backends, else None
dview: multicore processing engine that is used for parallel processing.
If backend is 'multiprocessing' then dview is Pool object.
If backend is 'ipyparallel' then dview is a DirectView object.
Expand Down Expand Up @@ -325,21 +262,6 @@ def setup_cluster(backend:str = 'multiprocessing',
c = None
n_processes = 1

elif backend == 'SLURM':
# Override n_processes from above because with slurm you're using cluster resources, not machine-local resources
# Warning: This code may no longer work; it has not been tested in a very, very long time
n_processes = int(os.environ.get('SLURM_NPROCS'))
try:
stop_server()
except:
logger.debug('Nothing to stop')
slurm_script = os.environ.get('SLURMSTART_SCRIPT') # An example of this is in the source repo under 'SLURM/slurmStart.sh'
logger.info([str(n_processes), slurm_script])
start_server(slurm_script=slurm_script, ncpus=n_processes)
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
logger.info([pdir, profile])
c = Client(ipython_dir=pdir, profile=profile)
dview = c[:]
else:
raise Exception('Unknown Backend')

Expand Down
Loading