Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat smartspim zarr compression #2

Merged
merged 9 commits into from
May 30, 2024
13 changes: 12 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "aind-smartspim-data-transformation"
description = "Generated from aind-library-template"
license = {text = "MIT"}
requires-python = ">=3.7"
requires-python = ">=3.9"
authors = [
{name = "Allen Institute for Neural Dynamics"}
]
Expand All @@ -17,6 +17,17 @@ readme = "README.md"
dynamic = ["version"]

dependencies = [
'aind-data-transformation>=0.0.18',
'zarr==2.18.2',
'numcodecs==0.11.0',
'dask-image==2024.5.3',
'xarray_multiscale==1.1.0',
'bokeh==2.4.2',
'pims==0.6.1',
'dask[distributed]==2024.5.1',
'ome-zarr==0.8.2',
'imagecodecs[all]==2023.3.16',
'natsort==8.4.0',
]

[project.optional-dependencies]
Expand Down
25 changes: 25 additions & 0 deletions scripts/singularity_build.def
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Bootstrap: docker
From: python:3.10-bullseye

%setup
# Copy project directory into container
cp -R . ${SINGULARITY_ROOTFS}/aind-smartspim-data-transformation

%post
# Installing dask mpi
wget https://www.mpich.org/static/downloads/3.2/mpich-3.2.tar.gz
tar xfz mpich-3.2.tar.gz
rm mpich-3.2.tar.gz
mkdir mpich-build
cd mpich-build
../mpich-3.2/configure --disable-fortran 2>&1 | tee c.txt
make 2>&1 | tee m.txt
make install 2>&1 | tee mi.txt
cd ..

cd ${SINGULARITY_ROOTFS}/aind-smartspim-data-transformation
pip install . --no-cache-dir
rm -rf ${SINGULARITY_ROOTFS}/aind-smartspim-data-transformation

pip install mpi4py --no-cache-dir
pip install dask_mpi
3 changes: 2 additions & 1 deletion src/aind_smartspim_data_transformation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Init package"""
__version__ = "0.0.0"

__version__ = "0.0.1"
3 changes: 3 additions & 0 deletions src/aind_smartspim_data_transformation/compress/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Init internal package
"""
139 changes: 139 additions & 0 deletions src/aind_smartspim_data_transformation/compress/dask_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Module for dask utilities
"""

import logging
import os
import socket
from enum import Enum
from typing import Optional, Tuple

import distributed
import requests
from distributed import Client, LocalCluster

try:
from dask_mpi import initialize

DASK_MPI_INSTALLED = True
except ImportError:
DASK_MPI_INSTALLED = False

LOGGER = logging.getLogger(__name__)


class Deployment(Enum):
"""Deployment enums"""

LOCAL = "local"
SLURM = "slurm"


def log_dashboard_address(
client: distributed.Client, login_node_address: str = "hpc-login"
) -> None:
"""
Logs the terminal command required to access the Dask dashboard

Args:
client: the Client instance
login_node_address: the address of the cluster login node
"""
host = client.run_on_scheduler(socket.gethostname)
port = client.scheduler_info()["services"]["dashboard"]
user = os.getenv("USER")
LOGGER.info(
f"To access the dashboard, run the following in "
"a terminal: ssh -L {port}:{host}:{port} {user}@"
f"{login_node_address} "
)


def get_deployment() -> str:
"""
Gets the SLURM deployment if this
exists

Returns
-------
str
SLURM_JOBID
"""
if os.getenv("SLURM_JOBID") is None:
deployment = Deployment.LOCAL.value
else:
# we're running on the Allen HPC
deployment = Deployment.SLURM.value
return deployment


def get_client(
deployment: str = Deployment.LOCAL.value,
worker_options: Optional[dict] = None,
n_workers: int = 1,
processes=True,
) -> Tuple[distributed.Client, int]:
"""
Create a distributed Client

Args:
deployment: the type of deployment. Either "local" or "slurm"
worker_options: a dictionary of options to pass to the worker class
n_workers: the number of workers (only applies to "local" deployment)

Returns:
the distributed Client and number of workers
"""
if deployment == Deployment.SLURM.value:
if not DASK_MPI_INSTALLED:
raise ImportError(
"dask-mpi must be installed to use the SLURM deployment"
)
if worker_options is None:
worker_options = {}
slurm_job_id = os.getenv("SLURM_JOBID")
if slurm_job_id is None:
raise Exception(
"SLURM_JOBID environment variable is not set."
"Are you running under SLURM?"
)
initialize(
nthreads=int(os.getenv("SLURM_CPUS_PER_TASK", 1)),
local_directory=f"/scratch/fast/{slurm_job_id}",
worker_class="distributed.nanny.Nanny",
worker_options=worker_options,
)
client = Client()
log_dashboard_address(client)
n_workers = int(os.getenv("SLURM_NTASKS"))
elif deployment == Deployment.LOCAL.value:
client = Client(
LocalCluster(
n_workers=n_workers, processes=processes, threads_per_worker=1
)
)
else:
raise NotImplementedError
return client, n_workers


def cancel_slurm_job(
job_id: str, api_url: str, headers: dict
) -> requests.Response:
"""
Attempt to release resources and cancel the job

Args:
job_id: the SLURM job ID
api_url: the URL of the SLURM REST API.
E.g., "http://myhost:80/api/slurm/v0.0.36"

Raises:
HTTPError: if the request to cancel the job fails
"""
# Attempt to release resources and cancel the job
# Workaround for https://github.com/dask/dask-mpi/issues/87
endpoint = f"{api_url}/job/{job_id}"
response = requests.delete(endpoint, headers=headers)

return response
Loading
Loading