From 843ce210ce1127af55fdbde091b674bf1df6d776 Mon Sep 17 00:00:00 2001 From: Arup Sarker Date: Thu, 23 Nov 2023 00:13:55 -0500 Subject: [PATCH 1/5] Cylon Build and Scaling Test on UVA CS Cluster Signed-off-by: Arup Sarker --- rivanna/scripts/README.md | 2 +- target/rivanna/Makefile | 11 - target/rivanna/README-old.md | 60 --- target/rivanna/README.md | 165 +++--- target/rivanna/install-slurm.sh | 61 --- target/rivanna/install.sh | 30 -- target/rivanna/install.slurm | 63 --- target/rivanna/scripts/Makefile | 42 ++ target/rivanna/scripts/README.md | 51 +- target/rivanna/scripts/cancel.sh | 2 + .../rivanna/scripts/cylon-experiment-setup.py | 117 +++++ target/rivanna/scripts/cylon_scaling.py | 60 ++- target/rivanna/scripts/load.sh | 2 + target/rivanna/scripts/raptor.in.cfg | 55 ++ target/rivanna/scripts/raptor_master.py | 210 ++++++++ target/rivanna/scripts/raptor_worker.py | 42 ++ target/rivanna/scripts/rp-cylon.slurm | 22 + target/rivanna/scripts/rp-experiment-setup.py | 112 ++++ target/rivanna/scripts/rp_scaling.py | 484 ++++++++++++++++++ target/rivanna/scripts/scaling_job.slurm | 9 +- target/rivanna/scripts/submit.log | 1 + target/uva-cs/README.md | 50 ++ target/uva-cs/rp-scripts/README.md | 51 ++ target/uva-cs/rp-scripts/config.json | 223 ++++++++ target/uva-cs/rp-scripts/raptor.cfg | 55 ++ target/uva-cs/rp-scripts/raptor_master.py | 210 ++++++++ target/uva-cs/rp-scripts/raptor_worker.py | 42 ++ target/uva-cs/rp-scripts/resource_uva.json | 39 ++ target/uva-cs/rp-scripts/rp-cylon.slurm | 21 + target/uva-cs/rp-scripts/rp_scaling.py | 294 +++++++++++ target/uva-cs/scripts/Makefile | 42 ++ target/uva-cs/scripts/README.md | 43 ++ target/uva-cs/scripts/cancel.sh | 2 + .../uva-cs/scripts/cylon-experiment-setup.py | 117 +++++ target/uva-cs/scripts/cylon_scaling.py | 183 +++++++ target/uva-cs/scripts/load.sh | 2 + target/uva-cs/scripts/raptor.in.cfg | 55 ++ target/uva-cs/scripts/raptor_master.py | 210 ++++++++ target/uva-cs/scripts/raptor_worker.py | 42 ++ target/uva-cs/scripts/rp-cylon.slurm | 22 + target/uva-cs/scripts/rp-experiment-setup.py | 112 ++++ target/uva-cs/scripts/rp_scaling.py | 484 ++++++++++++++++++ target/uva-cs/scripts/scaling_job.sh | 27 + target/uva-cs/scripts/scaling_job.slurm | 30 ++ target/uva-cs/scripts/submit.log | 1 + 45 files changed, 3590 insertions(+), 368 deletions(-) delete mode 100644 target/rivanna/Makefile delete mode 100644 target/rivanna/README-old.md delete mode 100755 target/rivanna/install-slurm.sh delete mode 100755 target/rivanna/install.sh delete mode 100644 target/rivanna/install.slurm create mode 100644 target/rivanna/scripts/Makefile create mode 100755 target/rivanna/scripts/cancel.sh create mode 100644 target/rivanna/scripts/cylon-experiment-setup.py create mode 100755 target/rivanna/scripts/load.sh create mode 100644 target/rivanna/scripts/raptor.in.cfg create mode 100755 target/rivanna/scripts/raptor_master.py create mode 100755 target/rivanna/scripts/raptor_worker.py create mode 100644 target/rivanna/scripts/rp-cylon.slurm create mode 100644 target/rivanna/scripts/rp-experiment-setup.py create mode 100644 target/rivanna/scripts/rp_scaling.py create mode 100644 target/rivanna/scripts/submit.log create mode 100644 target/uva-cs/README.md create mode 100644 target/uva-cs/rp-scripts/README.md create mode 100644 target/uva-cs/rp-scripts/config.json create mode 100644 target/uva-cs/rp-scripts/raptor.cfg create mode 100755 target/uva-cs/rp-scripts/raptor_master.py create mode 100755 target/uva-cs/rp-scripts/raptor_worker.py create mode 100644 target/uva-cs/rp-scripts/resource_uva.json create mode 100644 target/uva-cs/rp-scripts/rp-cylon.slurm create mode 100644 target/uva-cs/rp-scripts/rp_scaling.py create mode 100644 target/uva-cs/scripts/Makefile create mode 100644 target/uva-cs/scripts/README.md create mode 100755 target/uva-cs/scripts/cancel.sh create mode 100644 target/uva-cs/scripts/cylon-experiment-setup.py create mode 100644 target/uva-cs/scripts/cylon_scaling.py create mode 100755 target/uva-cs/scripts/load.sh create mode 100644 target/uva-cs/scripts/raptor.in.cfg create mode 100755 target/uva-cs/scripts/raptor_master.py create mode 100755 target/uva-cs/scripts/raptor_worker.py create mode 100644 target/uva-cs/scripts/rp-cylon.slurm create mode 100644 target/uva-cs/scripts/rp-experiment-setup.py create mode 100644 target/uva-cs/scripts/rp_scaling.py create mode 100755 target/uva-cs/scripts/scaling_job.sh create mode 100644 target/uva-cs/scripts/scaling_job.slurm create mode 100644 target/uva-cs/scripts/submit.log diff --git a/rivanna/scripts/README.md b/rivanna/scripts/README.md index 13cb7a38d..a837a8ad1 100644 --- a/rivanna/scripts/README.md +++ b/rivanna/scripts/README.md @@ -6,7 +6,7 @@ pip install openssl-python python3 -m pip install urllib3==1.26.6 ``` -2. Make change in the ```cylon-experiment-setup.py ``` or ```cylon-experiment-setup.py ``` for the configurations changes. +2. Make change in the ```cylon-experiment-setup.py ``` or ```rp-experiment-setup.py ``` for the configurations changes. ``` combination = [\ diff --git a/target/rivanna/Makefile b/target/rivanna/Makefile deleted file mode 100644 index df8c787d1..000000000 --- a/target/rivanna/Makefile +++ /dev/null @@ -1,11 +0,0 @@ -BASE=../.. - -clean: - rm -rf build - rm -rf cy-rp-env - -install-frontend: - cd $BASE; time ./target/rivanna/install.sh - -install: - cd $BASE; time ./target/rivanna/install.sh diff --git a/target/rivanna/README-old.md b/target/rivanna/README-old.md deleted file mode 100644 index 6e4eee9d1..000000000 --- a/target/rivanna/README-old.md +++ /dev/null @@ -1,60 +0,0 @@ -# Running Cylon on Rivanna - -Arup Sarker (arupcsedu@gmail.com, djy8hg@virginia.edu) - - - -## Install instructions - -Rivanna is an HPC system offerbed by University of Virginia. -This will use custom dependencies of the system gcc, openmpi version. - -```shell - -git clone https://github.com/cylondata/cylon.git -cd cylon - -module load gcc/9.2.0 openmpi/3.1.6 python/3.7.7 cmake/3.23.3 - -python -m venv $PWD/cy-rp-env - -source $PWD/cy-rp-env/bin/activate - - -pip install pip -U -pip install pytest - -export CC=`which gcc` -export CXX=`which g++` -CC=gcc MPICC=mpicc pip install --no-binary mpi4py install mpi4py -pip install -U pytest-mpi -pip install numpy -pip install pyarrow==9.0.0 - - -rm -rf build -BUILD_PATH=$PWD/build -export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH - -./build.sh -pyenv $PWD/cy-rp-env -bpath $(pwd)/build --cpp --python_with_pyarrow --cython --test --cmake-flags "-DMPI_C_COMPILER=$(which mpicc) -DMPI_CXX_COMPILER=$(which mpicxx)" - -``` -It will take some time to build. So, grab a coffee!!! - -Let's perform a scaling operation with join. Before that, we have to install the dependencies as follow. - -```shell -pip install cloudmesh-common -pip install openssl-python -python3 -m pip install urllib3==1.26.6 -``` - -We will slum script to run the scaling operation. - -```shell -sbatch rivanna/scripts/scaling_job.slurm -``` - -For more details of the dependent libraries and Slurm scripts, Please checkout the following links: - -* diff --git a/target/rivanna/README.md b/target/rivanna/README.md index 2a4cfe5e7..6e4eee9d1 100644 --- a/target/rivanna/README.md +++ b/target/rivanna/README.md @@ -1,105 +1,60 @@ -# Developing and Using of Cylon on Rivanna - -Data: August 4th, 2023 - -# Using Cylon - -Currently we recommend to use the instrctions for developers. -For using cylon we recommend to follow the same instructions, but replaceing the **git clone** command with - -```` -git clone https://github.com/cylondata/cylon.git -```` - - -## Decveloping with Cylon - -### Clone Cylon - -First you must create a fork of cylon whcih is easiest done with the GUI. - -Please visit with your browser - -* - -and cick on fork. Let us assume you have the username xyz, Then the easiseets is to create a shortcut for the git user -to follow our documentation. - -```bash -exort GITUSER=xyz` -git clone https://github.com/$GITUSER/cylon.git -cd cylon -``` - -> Note: use the following line in case you do not want to fork -> -> ```bash -> git clone https://github.com/cylondata/cylon.git -> cd cylon -> ``` - -### Compile Cylon on Rivanna - -The following lines are included in [target/rivanna/install.sh](https://github.com/cylondata/cylon/blob/main/target/rivanna/README.md) - -Before executing it, please review it with - -```bash -target/rivanna/install.sh -``` - -If you need to make modifications, copy the script and execute the copy. - -If you are happy with the original script, please execute it with - -```bash -time ./target/rivanna/install.sh -``` - -The execution of the script will take some time. - -``` -real 61m17.789s -user 53m10.282s -sys 6m52.742s -``` - -The script will look as follows - -```bash -#! /bin/sh -PWD=`pwd` -BUILD_PATH=$PWD/build - -module load gcc/9.2.0 openmpi/3.1.6 python/3.7.7 cmake/3.23.3 - -python -m venv $PWD/cy-rp-env - -source $PWD/cy-rp-env/bin/activate - -# pip install -r $PWD/requirements.txt - -pip install pip -U -pip install pytest -pip install -U pytest-mpi -pip install numpy -# pip install pyarrow==9.0.0 - -export CC=`which gcc` -export CXX=`which g++` -CC=gcc MPICC=mpicc pip install --no-binary mpi4py install mpi4py - -rm -rf build - -export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH - -time ./build.sh -pyenv $PWD/cy-rp-env -bpath $PWD/build --cpp --python --cython --test --cmake-flags "-DMPI_C_COMPILER=$(which mpicc) -DMPI_CXX_COMPILER=$(which mpicxx)" -``` - -## Acknowledgement - -This documentation is based on documentation and installation improvements provided by - -* Arup Sarker (arupcsedu@gmail.com, djy8hg@virginia.edu) -* Niranda -* Gregor von Laszewski (laszewski@gmail.com) +# Running Cylon on Rivanna + +Arup Sarker (arupcsedu@gmail.com, djy8hg@virginia.edu) + + + +## Install instructions + +Rivanna is an HPC system offerbed by University of Virginia. +This will use custom dependencies of the system gcc, openmpi version. + +```shell + +git clone https://github.com/cylondata/cylon.git +cd cylon + +module load gcc/9.2.0 openmpi/3.1.6 python/3.7.7 cmake/3.23.3 + +python -m venv $PWD/cy-rp-env + +source $PWD/cy-rp-env/bin/activate + + +pip install pip -U +pip install pytest + +export CC=`which gcc` +export CXX=`which g++` +CC=gcc MPICC=mpicc pip install --no-binary mpi4py install mpi4py +pip install -U pytest-mpi +pip install numpy +pip install pyarrow==9.0.0 + + +rm -rf build +BUILD_PATH=$PWD/build +export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH + +./build.sh -pyenv $PWD/cy-rp-env -bpath $(pwd)/build --cpp --python_with_pyarrow --cython --test --cmake-flags "-DMPI_C_COMPILER=$(which mpicc) -DMPI_CXX_COMPILER=$(which mpicxx)" + +``` +It will take some time to build. So, grab a coffee!!! + +Let's perform a scaling operation with join. Before that, we have to install the dependencies as follow. + +```shell +pip install cloudmesh-common +pip install openssl-python +python3 -m pip install urllib3==1.26.6 +``` + +We will slum script to run the scaling operation. + +```shell +sbatch rivanna/scripts/scaling_job.slurm +``` + +For more details of the dependent libraries and Slurm scripts, Please checkout the following links: + +* diff --git a/target/rivanna/install-slurm.sh b/target/rivanna/install-slurm.sh deleted file mode 100755 index dbe27c2cf..000000000 --- a/target/rivanna/install-slurm.sh +++ /dev/null @@ -1,61 +0,0 @@ -#!/bin/bash -## xBATCH -A bii_dsc_community -## xBATCH -p standard -## xBATCH -N 1 -#X xSBATCH -c 37 - -# ./target/rivanna/install.sh - -SLURM_JOB_ID=$(sbatch --wait -A bii_dsc_community -p standard -N 1 -c37 ./target/rivanna/install.sh | awk '{print $4}') - -echo "Job $SLURM_JOB_ID is completed." - - -echo "###############################################################" -echo "# squeue" -echo "###############################################################" - -squeue --format="%.18i %.9P %.30j %.8u %.8T %.10M %.9l %.6D %R" -j $SLURM_JOB_ID - -echo "###############################################################" -echo "# seff" -echo "###############################################################" - -seff $SLURM_JOB_ID - -echo "###############################################################" -echo "# sacct analyze waittime" -echo "###############################################################" - -reserved_time=$(sacct -X -j $SLURM_JOB_ID -o Reserved --noheader) -submit_time=$(sacct -X -j $SLURM_JOB_ID -o Submit --noheader) -start_time=$(sacct -X -j $SLURM_JOB_ID -o Start --noheader) -end_time=$(sacct -X -j $SLURM_JOB_ID -o End --noheader) -cpu_time=$(sacct -X -j $SLURM_JOB_ID -o CPUTime --noheader) -elapsed_time=$(sacct -X -j $SLURM_JOB_ID -o Elapse --noheader) - -# Convert timestamps to seconds since epoch -submit_seconds=$(date -d "$submit_time" +"%s") -start_seconds=$(date -d "$start_time" +"%s") -end_seconds=$(date -d "$end_time" +"%s") - -# Calculate wait time in seconds -wait_time=$((start_seconds - submit_seconds)) - -# Convert wait time to HH:MM:SS format -wait_formatted=$(date -u -d @"$wait_time" +"%T") - -echo "JobID: $SLURM_JOB_ID" -echo "CPU Time: $cpu_time" -echo "Elapsed Time: $elapsed_time" -echo "Researved Time: $reserved_time" -echo "Submit Time: $submit_time" -echo "Start Time: $start_time" -echo "End Time: $end_time" -echo "Wait Time: $wait_formatted" -echo "Wait Time (s): $wait_time" - -echo "###############################################################" - -echo "completed" - diff --git a/target/rivanna/install.sh b/target/rivanna/install.sh deleted file mode 100755 index d41939469..000000000 --- a/target/rivanna/install.sh +++ /dev/null @@ -1,30 +0,0 @@ -#! /bin/sh - - -PWD=`pwd` -BUILD_PATH=$PWD/build - -module load gcc/9.2.0 openmpi/3.1.6 python/3.7.7 cmake/3.23.3 - -python -m venv $PWD/CYLON-ENV - -source $PWD/CYLON-ENV/bin/activate - -# pip install -r $PWD/requirements.txt - -pip install pip -U -pip install pytest -pip install -U pytest-mpi -pip install numpy -# pip install pyarrow==9.0.0 - -export CC=`which gcc` -export CXX=`which g++` -CC=gcc MPICC=mpicc pip install --no-binary mpi4py install mpi4py - -rm -rf build - -export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH - -time ./build.sh -j$(nproc) -pyenv $PWD/CYLON-ENV -bpath $PWD/build --cpp --python --cython --test --cmake-flags "-DMPI_C_COMPILER=$(which mpicc) -DMPI_CXX_COMPILER=$(which mpicxx)" - diff --git a/target/rivanna/install.slurm b/target/rivanna/install.slurm deleted file mode 100644 index 53814e830..000000000 --- a/target/rivanna/install.slurm +++ /dev/null @@ -1,63 +0,0 @@ -#!/bin/bash -#SBATCH -A bii_dsc_community -#SBATCH -p standard -#SBATCH -N 1 -#SBATCH -c 37 - -./target/rivanna/install.sh - -#SLURM_JOB_ID=$(sbatch my_job.sh | awk '{print $4}') - -# Wait for the job to complete -# srun --jobid=$SLURMJOB_ID echo "Waiting for job $JOB_ID to complete..." -# srun --jobid=$SLURMJOB_ID sacct --format=JobID,End --noheader - -#echo "Job $JOB_ID is completed." - -echo "###############################################################" -echo "# squeue" -echo "###############################################################" - -squeue --format="%.18i %.9P %.30j %.8u %.8T %.10M %.9l %.6D %R" -j $SLURM_JOB_ID - -echo "###############################################################" -echo "# seff" -echo "###############################################################" - -seff #SLURM_JOB_ID - -echo "###############################################################" -echo "# sacct analyze waittime" -echo "###############################################################" - -reserved_time=$(sacct -X -j $SLURM_JOB_ID -o Reserved --noheader) -submit_time=$(sacct -X -j $SLURM_JOB_ID -o Submit --noheader) -start_time=$(sacct -X -j $SLURM_JOB_ID -o Start --noheader) -end_time=$(sacct -X -j $SLURM_JOB_ID -o End --noheader) -cpu_time=$(sacct -X -j $SLURM_JOB_ID -o CPUTime --noheader) -elapsed_time=$(sacct -X -j $SLURM_JOB_ID -o Elapse --noheader) - -# Convert timestamps to seconds since epoch -submit_seconds=$(date -d "$submit_time" +"%s") -start_seconds=$(date -d "$start_time" +"%s") -end_seconds=$(date -d "$end_time" +"%s") - -# Calculate wait time in seconds -wait_time=$((start_seconds - submit_seconds)) - -# Convert wait time to HH:MM:SS format -wait_formatted=$(date -u -d @"$wait_time" +"%T") - -echo "JobID: $SLURM_JOB_ID" -echo "CPU Time: $cpu_time" -echo "Elapsed Time: $elapsed_time" -echo "Researved Time: $reserved_time" -echo "Submit Time: $submit_time" -echo "Start Time: $start_time" -echo "End Time: $end_time" -echo "Wait Time: $wait_formatted" -echo "Wait Time in seconnds: $wait_time" - -echo "###############################################################" - -echo "completed" diff --git a/target/rivanna/scripts/Makefile b/target/rivanna/scripts/Makefile new file mode 100644 index 000000000..c3c6af8ad --- /dev/null +++ b/target/rivanna/scripts/Makefile @@ -0,0 +1,42 @@ +SHELL=/bin/bash + +.PHONY: load image-singularity image-docker project + +all: ${EXECS} + +login: + ssh -tt rivanna "/opt/rci/bin/ijob --partition=parallel --account=bii_dsc_community --time=30:00 --ntasks-per-node=4 --nodes=2" + +load: + ./load.sh + +clean: + rm -f *.log *.err script-*.slurm + rm -r raptor-*.cfg rp.session.* + +rp: load + python rp-experiment-setup.py + +cy: load + python cylon-experiment-setup.py + +q: + squeue --format="%.18i %.9P %.50j %.8u %.8T %.10M %.9l %.6D %R" --me + +a: + squeue --format="%all" --me + + +qq: + watch squeue --format=\"%.18i %.9P %.50j %.8u %.8T %.10M %.9l %.6D %R\" --me + +i: + cat out.log + cat out.err + fgrep "###" out.log | wc -l + + +cancel: + - ./cancel.sh + - squeue -u ${USER} + diff --git a/target/rivanna/scripts/README.md b/target/rivanna/scripts/README.md index 6eaa056b7..a837a8ad1 100644 --- a/target/rivanna/scripts/README.md +++ b/target/rivanna/scripts/README.md @@ -6,38 +6,33 @@ pip install openssl-python python3 -m pip install urllib3==1.26.6 ``` -2. Run the scripts in set of **compute nodes** as follows. +2. Make change in the ```cylon-experiment-setup.py ``` or ```rp-experiment-setup.py ``` for the configurations changes. -```bash -#!/bin/bash -#SBATCH --nodes=4 -#SBATCH --ntasks-per-node=40 -#SBATCH --exclusive -#SBATCH --time=1:00:00 -#SBATCH --partition=bii -#SBATCH -A bii_dsc_community -#SBATCH --output=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.out -#SBATCH --error=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.err - - -module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 - -#module load gcc/11.2.0 -#module load openmpi/4.1.4 -#module load python/3.11.1 - -#source $HOME/CYLON/bin/activate -source $HOME/cylon_rp_venv/bin/activate - -BUILD_PATH=$PWD/build - -export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH +``` +combination = [\ + # (1,4, 5000, "parallel", "exclusive"), # always pending + (2,37, 1000000, "parallel", ""), + (4,37, 35000000, "parallel", ""), + (6,37, 35000000, "parallel", ""), + (8,37, 35000000, "parallel", ""), + (10,37, 35000000, "parallel", ""), + (12,37, 35000000, "parallel", ""), + (14,37, 35000000, "parallel", ""), +] +``` -which python gcc g++ +3. Load module and activate the python virtual environment +``` +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 +source /path_to_virtual_environment/cylon_rp_venv/bin/activate +``` +4. Run the scripts as follows. -#srun -n 160 python $PWD/rivanna/scripts/cylon_scaling.py -n 35000000 -mpirun -np 160 python rivanna/scripts/cylon_scaling.py -n 35000000 +```bash +make clean # For cleaning +make rp # For radical pilot +make cy # for bear metal Cylon ``` \ No newline at end of file diff --git a/target/rivanna/scripts/cancel.sh b/target/rivanna/scripts/cancel.sh new file mode 100755 index 000000000..78d88a29c --- /dev/null +++ b/target/rivanna/scripts/cancel.sh @@ -0,0 +1,2 @@ +#! /bin/sh +squeue -u $USER | awk '{print $1}' | tail -n+2 | xargs scancel diff --git a/target/rivanna/scripts/cylon-experiment-setup.py b/target/rivanna/scripts/cylon-experiment-setup.py new file mode 100644 index 000000000..20ec6e159 --- /dev/null +++ b/target/rivanna/scripts/cylon-experiment-setup.py @@ -0,0 +1,117 @@ +import os +import sys +from textwrap import dedent +from cloudmesh.common.util import writefile +from cloudmesh.common.util import readfile +from cloudmesh.common.util import banner +from cloudmesh.common.console import Console + +counter = 0 + +debug = True +debug = False + +partition="bii-gpu" + +partition="parallel" + + +# (nodes, threads, rows, partition, "exclusive") +combination = [\ + # (1,4, 5000, "parallel", "exclusive"), # always pending + (2,37, 1000000, "parallel", ""), + (4,37, 35000000, "parallel", ""), + (6,37, 35000000, "parallel", ""), + (8,37, 35000000, "parallel", ""), + (10,37, 35000000, "parallel", ""), + (12,37, 35000000, "parallel", ""), + (14,37, 35000000, "parallel", ""), +] + +''' +combination = [] +for nodes in range(0,50): + for threads in range(0,37): + combination.append((nodes+1, threads+1, "parallel", "")) +''' + +total = len(combination) +jobid="-%j" +# jobid="" + +f = open("submit.log", "w") +for nodes, threads, rows, partition, exclusive in combination: + counter = counter + 1 + + if exclusive == "exclusive": + exclusive = "#SBATCH --exclusive" + e = "e1" + else: + exclusive = "" + e = "e0" + + usable_threads = nodes * threads + + ''' + cores_per_node = nodes * threads - 2 + + print (cores_per_node) + + config = readfile("raptor.in.cfg") + + config = config.replace("CORES_PER_NODE", str(cores_per_node)) + config = config.replace("NO_OF_ROWS", str(rows)) + + + print (config) + + cfg_filename = f"raptor-{nodes}-{threads}.cfg" + + writefile(cfg_filename, config) + ''' + banner(f"SLURM {nodes} {threads} {counter}/{total}") + script=dedent(f""" + #!/bin/bash + #SBATCH --job-name=h-n={nodes:02d}-t={threads:02d}-e={e} + #SBATCH --nodes={nodes} + #SBATCH --ntasks-per-node={threads} + #SBATCH --time=15:00 + #SBATCH --output=out-{nodes:02d}-{threads:02d}{jobid}.log + #SBATCH --error=out-{nodes:02d}-{threads:02d}{jobid}.err + #SBATCH --partition=parallel + #SBATCH -A bii_dsc_community + {exclusive} + echo "..............................................................." + module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + echo "..............................................................." + source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate + echo "..............................................................." + BUILD_PATH=/project/bii_dsc_community/djy8hg/Project/cylon/build + echo "..............................................................." + export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH + echo "..............................................................." + which python gcc g++ + echo "..............................................................." + lscpu + echo "..............................................................." + time srun --exact --nodes {nodes} --ntasks {usable_threads} python cylon_scaling.py -n {rows} + echo "..............................................................." + """).strip() + + print (script) + filename = f"script-{nodes:02d}-{threads:02d}.slurm" + writefile(filename, script) + + + if not debug: + + r = os.system(f"sbatch {filename}") + total = nodes * threads + if r == 0: + msg = f"{counter} submitted: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.ok(msg) + else: + msg = f"{counter} failed: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.error(msg) + f.writelines([msg, "\n"]) +f.close() diff --git a/target/rivanna/scripts/cylon_scaling.py b/target/rivanna/scripts/cylon_scaling.py index 180dafc8e..4b2a860e6 100644 --- a/target/rivanna/scripts/cylon_scaling.py +++ b/target/rivanna/scripts/cylon_scaling.py @@ -11,7 +11,7 @@ from cloudmesh.common.Shell import Shell -def join(data=None): +def cylon_join(data=None): StopWatch.start(f"join_total_{data['host']}_{data['rows']}_{data['it']}") comm = MPI.COMM_WORLD @@ -60,8 +60,57 @@ def join(data=None): StopWatch.benchmark(tag=str(data)) env.finalize() + +def cylon_sort(data=None): + StopWatch.start(f"sort_total_{data['host']}_{data['rows']}_{data['it']}") + + comm = MPI.COMM_WORLD + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + + if env.rank == 0: + print("Task# ", data['task']) + + for i in range(data['it']): + env.barrier() + StopWatch.start(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}") + t1 = time.time() + df3 = df1.sort_values(by=[0], env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + StopWatch.stop(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}") -def slice(data=None): + StopWatch.stop(f"sort_total_{data['host']}_{data['rows']}_{data['it']}") + + if env.rank == 0: + StopWatch.benchmark(tag=str(data)) + + env.finalize() + + +def cylon_slice(data=None): StopWatch.start(f"slice_total_{data['host']}_{data['rows']}_{data['it']}") comm = MPI.COMM_WORLD @@ -113,6 +162,7 @@ def slice(data=None): env.finalize() + if __name__ == "__main__": parser = argparse.ArgumentParser(description="weak scaling") parser.add_argument('-n', dest='rows', type=int, required=True) @@ -123,9 +173,11 @@ def slice(data=None): args = vars(parser.parse_args()) args['host'] = "rivanna" - for i in range(160): + for i in range(1): args['task'] = i - join(args) + #cylon_slice(args) + #cylon_join(args) + cylon_sort(args) # os.system(f"{git} branch | fgrep '*' ") # os.system(f"{git} rev-parse HEAD") diff --git a/target/rivanna/scripts/load.sh b/target/rivanna/scripts/load.sh new file mode 100755 index 000000000..e3e5fbaf0 --- /dev/null +++ b/target/rivanna/scripts/load.sh @@ -0,0 +1,2 @@ +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 +source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate diff --git a/target/rivanna/scripts/raptor.in.cfg b/target/rivanna/scripts/raptor.in.cfg new file mode 100644 index 000000000..cebcecc20 --- /dev/null +++ b/target/rivanna/scripts/raptor.in.cfg @@ -0,0 +1,55 @@ +{ + # resource configuration + "cores_per_node" : CORES_PER_NODE, + "gpus_per_node" : 0, + "no_of_rows" : NO_OF_ROWS, + # raptor configuration + "n_masters" : 1, + "n_workers" : 1, + "masters_per_node" : 1, + "nodes_per_worker" : 1, + + # extra nodes for non-raptor rp tasks + "nodes_rp" : 1, + # extra resources for the rp agent (optional) + "nodes_agent" : 0, + + # pilot runtime in min + "runtime" : 60, + + # task configuration + "cores_per_task" : 1, + "sleep" : 3, + # These are used as the range of the for loops for defining and submitting + # non-raptor and raptor tasks, respectively. + "tasks_rp" : 1, + "tasks_raptor" : 1, + + "pilot_descr": { + "resource" : "uva.rivanna", + "runtime" : 60, + "access_schema": "interactive", + "queue" : "parallel", + "project" : "bii_dsc_community" + }, + + "master_descr": { + "mode" : "raptor.master", + "named_env" : "cylon_rp_venv", + "executable" : "./raptor_master.py" + }, + + "worker_descr": { + "mode" : "raptor.worker", + "named_env" : "cylon_rp_venv", + "pre_exec" : ["module load gcc/9.2.0", + "module load openmpi/3.1.6", + "module load python/3.7.7", + "export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH" + ], + + # custom worker class + "raptor_class" : "MyWorker", + "raptor_file" : "./raptor_worker.py" + } +} \ No newline at end of file diff --git a/target/rivanna/scripts/raptor_master.py b/target/rivanna/scripts/raptor_master.py new file mode 100755 index 000000000..6e417682e --- /dev/null +++ b/target/rivanna/scripts/raptor_master.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 + +import os +import sys +import time + +from collections import defaultdict + +import radical.utils as ru +import radical.pilot as rp + + +def out(msg): + sys.stdout.write('==== %s\n' % msg) + sys.stdout.flush() + + +# This script has to run as a task within a pilot allocation, and is +# a demonstration of a task overlay within the RCT framework. It is expected +# to be staged and launched by the `raptor.py` script in the radical.pilot +# examples/misc directory. +# This master task will: +# +# - create a master which bootstraps a specific communication layer +# - insert n workers into the pilot (again as a task) +# - perform RPC handshake with those workers +# - send RPC requests to the workers +# - terminate the worker +# +# The worker itself is an external program which is not covered in this code. + +RANKS = 2 + + + +# ------------------------------------------------------------------------------ +# +class MyMaster(rp.raptor.Master): + ''' + This class provides the communication setup for the task overlay: it will + set up the request / response communication queues and provide the endpoint + information to be forwarded to the workers. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + self._cnt = 0 + self._submitted = defaultdict(int) + self._collected = defaultdict(int) + + # initialize the task overlay base class. That base class will ensure + # proper communication channels to the pilot agent. + super().__init__(cfg=cfg) + + self._sleep = self._cfg.sleep + + + # -------------------------------------------------------------------------- + # + def submit(self): + + # self._prof.prof('create_start') + + # create additional tasks to be distributed to the workers. + + #tds = list() + + #self._prof.prof('create_stop') + + # wait for outstanding tasks to complete + while True: + + completed = sum(self._collected.values()) + submitted = sum(self._submitted.values()) + + if submitted: + # request_cb has been called, so we can wait for completion + + self._log.info('=== submit done?: %d >= %d ', completed, submitted) + + if completed >= submitted: + break + + time.sleep(1) + + self._log.info('=== submit done!') + + + # -------------------------------------------------------------------------- + # + def request_cb(self, tasks): + + for task in tasks: + + self._log.debug('=== request_cb %s\n', task['uid']) + + mode = task['description']['mode'] + uid = task['description']['uid'] + + self._submitted[mode] += 1 + + # for each `function` mode task, submit one more `proc` mode request + if mode == rp.TASK_FUNC: + self.submit_tasks(rp.TaskDescription( + {'uid' : 'extra' + uid, + # 'timeout' : 10, + 'mode' : rp.TASK_PROC, + 'ranks' : RANKS, + 'executable': '/bin/sh', + 'arguments' : ['-c', + 'sleep %d; ' % self._sleep + + 'echo "hello $RP_RANK/$RP_RANKS: ' + '$RP_TASK_ID"'], + 'raptor_id' : 'master.000000'})) + + return tasks + + + # -------------------------------------------------------------------------- + # + def result_cb(self, tasks): + ''' + Log results. + + Log file is named by the master tasks UID. + ''' + for task in tasks: + + mode = task['description']['mode'] + self._collected[mode] += 1 + + # NOTE: `state` will be `AGENT_EXECUTING` + self._log.info('=== result_cb %s: %s [%s] [%s]', + task['uid'], + task['state'], + task['stdout'], + task['return_value']) + + # Note that stdout is part of the master task result. + print('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task['uid'], task['state'], task['stdout'], + task['return_value'])) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + # This master script runs as a task within a pilot allocation. The purpose + # of this master is to (a) spawn a set or workers within the same + # allocation, (b) to distribute work items (`hello` function calls) to those + # workers, and (c) to collect the responses again. + cfg_fname = str(sys.argv[1]) + cfg = ru.Config(cfg=ru.read_json(cfg_fname)) + cfg.rank = int(sys.argv[2]) + + n_workers = cfg.n_workers + nodes_per_worker = cfg.nodes_per_worker + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + descr = cfg.worker_descr + pwd = os.getcwd() + + # one node is used by master. Alternatively (and probably better), we could + # reduce one of the worker sizes by one core. But it somewhat depends on + # the worker type and application workload to judge if that makes sense, so + # we leave it for now. + + # create a master class instance - this will establish communication to the + # pilot agent + master = MyMaster(cfg) + + # insert `n` worker tasks into the agent. The agent will schedule (place) + # those workers and execute them. Insert one smaller worker (see above) + # NOTE: this assumes a certain worker size / layout + out('workers: %d' % n_workers) + descr['ranks'] = nodes_per_worker * cores_per_node + descr['gpus_per_rank'] = nodes_per_worker * gpus_per_node + worker_ids = master.submit_workers( + [rp.TaskDescription(descr) for _ in range(n_workers)]) + + # wait until `m` of those workers are up + # This is optional, work requests can be submitted before and will wait in + # a work queue. + # FIXME + master.wait_workers(count=1) + + out('start') + master.start() + out('submit') + master.submit() + + # let some time pass for client side tasks to complete + time.sleep(600) + + out('stop') + # TODO: can be run from thread? + master.stop() + out('join') + + # TODO: worker state callback + master.join() + out('done') + + # TODO: expose RPC hooks + + +# ------------------------------------------------------------------------------ diff --git a/target/rivanna/scripts/raptor_worker.py b/target/rivanna/scripts/raptor_worker.py new file mode 100755 index 000000000..d1ae066dc --- /dev/null +++ b/target/rivanna/scripts/raptor_worker.py @@ -0,0 +1,42 @@ + +import time +import random + +import radical.pilot as rp + +RANKS = 2 + + +# ------------------------------------------------------------------------------ +# +class MyWorker(rp.raptor.MPIWorker): + ''' + This class provides the required functionality to execute work requests. + In this simple example, the worker only implements a single call: `hello`. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + super().__init__(cfg) + + self.register_mode('foo', self._dispatch_foo) + + + # -------------------------------------------------------------------------- + # + def _dispatch_foo(self, task): + + import pprint + self._log.debug('==== running foo\n%s', + pprint.pformat(task['description'])) + + return 'out', 'err', 0, None, None + + + # -------------------------------------------------------------------------- + # + +# ------------------------------------------------------------------------------ + diff --git a/target/rivanna/scripts/rp-cylon.slurm b/target/rivanna/scripts/rp-cylon.slurm new file mode 100644 index 000000000..75e05ab9d --- /dev/null +++ b/target/rivanna/scripts/rp-cylon.slurm @@ -0,0 +1,22 @@ +#!/bin/bash +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=37 +##SBATCH --exclusive +#SBATCH --time=00:20:00 +#SBATCH --partition=parallel +#SBATCH -A bii_dsc_community +#SBATCH --output=out-%x-%j.out +#SBATCH --error=err-%x-%j.err + + +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + + +source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate + +export RADICAL_LOG_LVL="DEBUG" +export RADICAL_PROFILE="TRUE" +export RADICAL_PILOT_DBURL="mongodb://rct-tutorial:HXH7vExF7GvCeMWn@95.217.193.116:27017/rct-tutorial" +export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH + +python groupby.py diff --git a/target/rivanna/scripts/rp-experiment-setup.py b/target/rivanna/scripts/rp-experiment-setup.py new file mode 100644 index 000000000..8d1901a72 --- /dev/null +++ b/target/rivanna/scripts/rp-experiment-setup.py @@ -0,0 +1,112 @@ +import os +import sys +from textwrap import dedent +from cloudmesh.common.util import writefile +from cloudmesh.common.util import readfile +from cloudmesh.common.util import banner +from cloudmesh.common.console import Console + +counter = 0 + +debug = True +debug = False + +partition="bii-gpu" + +partition="parallel" + + +# (nodes, threads, rows, partition, "exclusive") +combination = [\ + # (2,4, 5000, "parallel", "exclusive"), # always pending + #(2,37, 1000000, "parallel", ""), + (4,37, 35000000, "parallel", ""), + (6,37, 35000000, "parallel", ""), + (8,37, 35000000, "parallel", ""), + (10,37, 35000000, "parallel", ""), + (12,37, 35000000, "parallel", ""), + (14,37, 35000000, "parallel", ""), +] + +''' +combination = [] +for nodes in range(0,50): + for threads in range(0,37): + combination.append((nodes+1, threads+1, "parallel", "")) +''' + +total = len(combination) +jobid="-%j" +# jobid="" + +f = open("submit.log", "w") +for nodes, threads, rows, partition, exclusive in combination: + counter = counter + 1 + + if exclusive == "exclusive": + exclusive = "#SBATCH --exclusive" + e = "e1" + else: + exclusive = "" + e = "e0" + + cores_per_node = nodes * threads - 2 + + print (cores_per_node) + + config = readfile("raptor.in.cfg") + + config = config.replace("CORES_PER_NODE", str(cores_per_node)) + config = config.replace("NO_OF_ROWS", str(rows)) + + + print (config) + + cfg_filename = f"raptor-{nodes}-{threads}.cfg" + + writefile(cfg_filename, config) + + banner(f"SLURM {nodes} {threads} {counter}/{total}") + script=dedent(f""" + #!/bin/bash + #SBATCH --job-name=h-n={nodes:02d}-t={threads:02d}-e={e} + #SBATCH --nodes={nodes} + #SBATCH --ntasks-per-node={threads} + #SBATCH --time=15:00 + #SBATCH --output=out-{nodes:02d}-{threads:02d}{jobid}.log + #SBATCH --error=out-{nodes:02d}-{threads:02d}{jobid}.err + #SBATCH --partition=parallel + #SBATCH -A bii_dsc_community + {exclusive} + echo "..............................................................." + ./load.sh + echo "..............................................................." + source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate + echo "..............................................................." + export RADICAL_LOG_LVL="DEBUG" + export RADICAL_PROFILE="TRUE" + export RADICAL_PILOT_DBURL="mongodb://rct-tutorial:HXH7vExF7GvCeMWn@95.217.193.116:27017/rct-tutorial" + echo "..............................................................." + lscpu + echo "..............................................................." + time python rp_scaling.py {cfg_filename} + echo "..............................................................." + """).strip() + + print (script) + filename = f"script-{nodes:02d}-{threads:02d}.slurm" + writefile(filename, script) + + + if not debug: + + r = os.system(f"sbatch {filename}") + total = nodes * threads + if r == 0: + msg = f"{counter} submitted: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.ok(msg) + else: + msg = f"{counter} failed: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.error(msg) + f.writelines([msg, "\n"]) +f.close() diff --git a/target/rivanna/scripts/rp_scaling.py b/target/rivanna/scripts/rp_scaling.py new file mode 100644 index 000000000..b4cc4e5bc --- /dev/null +++ b/target/rivanna/scripts/rp_scaling.py @@ -0,0 +1,484 @@ +#!/usr/bin/env python3 + +''' +Demonstrate the "raptor" features for remote Task management. + +This script and its supporting files may use relative file paths. Run from the +directory in which you found it. + +Refer to the ``raptor.cfg`` file in the same directory for configurable run time +details. + +By default, this example uses the ``local.localhost`` resource with the +``local`` access scheme where RP oversubscribes resources to emulate multiple +nodes. + +In this example, we + - Launch one or more raptor "master" task(s), which self-submits additional + tasks (results are logged in the master's `result_cb` callback). + - Stage scripts to be used by a raptor "Worker" + - Provision a Python virtual environment with + :py:func:`~radical.pilot.prepare_env` + - Submit several tasks that will be routed through the master(s) to the + worker(s). + - Submit a non-raptor task in the same Pilot environment + +''' + +import os +import sys +from textwrap import dedent +from cloudmesh.common.util import writefile +from cloudmesh.common.util import readfile +from cloudmesh.common.util import banner +from cloudmesh.common.console import Console + +import radical.utils as ru +import radical.pilot as rp + + +# To enable logging, some environment variables need to be set. +# Ref +# * https://radicalpilot.readthedocs.io/en/stable/overview.html#what-about-logging +# * https://radicalpilot.readthedocs.io/en/stable/developer.html#debugging +# For terminal output, set RADICAL_LOG_TGT=stderr or RADICAL_LOG_TGT=stdout +logger = ru.Logger('raptor') +PWD = os.path.abspath(os.path.dirname(__file__)) +RANKS = 72 + + +# ------------------------------------------------------------------------------ +# +@rp.pythontask +def cylon_join(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + +@rp.pythontask +def cylon_slice(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + #data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + #df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1[max_val * 0.2 :max_val * u, env] # distributed slice + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + +@rp.pythontask +def cylon_groupby(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + + df3 = df1.groupby(by=0).agg({ + "1": "sum", + "2": "min" + }) + + df4 = df1.groupby(by=0).min() + + df5 = df1.groupby(by=[0, 1]).max() + + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + +@rp.pythontask +def cylon_sort(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + #data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + #df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + #df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + #print("Distributed Sort with sort options", env.rank) + bins = env.world_size * 2 + #df3 = df1.sort_values(by=[0], num_bins=bins, num_samples=bins, env=env) + df3 = df1.sort_values(by=[0], env=env) + #print(df3) + + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + +@rp.pythontask +def cylon_concat(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + import pycylon as cn + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data3 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + df3 = DataFrame(pd.DataFrame(data3).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df4 = cn.concat(axis=0, objs=[df1, df2, df3], env=env) + #print(df4) + + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df4)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + + +# ------------------------------------------------------------------------------ +# +def task_state_cb(task, state): + logger.info('task %s: %s', task.uid, state) + if state == rp.FAILED: + logger.error('task %s failed', task.uid) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + if len(sys.argv) < 2: + cfg_file = '%s/raptor.cfg' % PWD + + else: + cfg_file = sys.argv[1] + + cfg = ru.Config(cfg=ru.read_json(cfg_file)) + + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + n_masters = cfg.n_masters + n_workers = cfg.n_workers + masters_per_node = cfg.masters_per_node + nodes_per_worker = cfg.nodes_per_worker + n_rows = cfg.no_of_rows + + # we use a reporter class for nicer output + report = ru.Reporter(name='radical.pilot') + report.title('Raptor example (RP version %s)' % rp.version) + + session = rp.Session() + try: + pd = rp.PilotDescription(cfg.pilot_descr) + + pd.cores = cores_per_node + 2 + pd.gpus = 0 + pd.runtime = 60 + + pmgr = rp.PilotManager(session=session) + tmgr = rp.TaskManager(session=session) + tmgr.register_callback(task_state_cb) + + pilot = pmgr.submit_pilots(pd) + tmgr.add_pilots(pilot) + + pmgr.wait_pilots(uids=pilot.uid, state=[rp.PMGR_ACTIVE]) + + report.info('Stage files for the worker `my_hello` command.\n') + # See raptor_worker.py. + pilot.stage_in({'source': ru.which('radical-pilot-hello.sh'), + 'target': 'radical-pilot-hello.sh', + 'action': rp.TRANSFER}) + + # Issue an RPC to provision a Python virtual environment for the later + # raptor tasks. Note that we are telling prepare_env to install + # radical.pilot and radical.utils from sdist archives on the local + # filesystem. This only works for the default resource, local.localhost. + report.info('Call pilot.prepare_env()... ') + pilot.prepare_env(env_name='cylon_rp_venv', + env_spec={'type' : 'venv', + 'path' : '/project/bii_dsc_community/djy8hg/cylon_rp_venv', + 'setup': []}) + report.info('done\n') + + # Launch a raptor master task, which will launch workers and self-submit + # some additional tasks for illustration purposes. + + master_ids = [ru.generate_id('master.%(item_counter)06d', + ru.ID_CUSTOM, ns=session.uid) + for _ in range(n_masters)] + + tds = list() + #f = open("output-{cores_per_node}-{n_rows}.log", "w") + for i in range(n_masters): + td = rp.TaskDescription(cfg.master_descr) + td.mode = rp.RAPTOR_MASTER + td.uid = master_ids[i] + td.arguments = [cfg_file, i] + td.cpu_processes = 1 + td.cpu_threads = 1 + td.named_env = 'rp' + td.input_staging = [{'source': '%s/raptor_master.py' % PWD, + 'target': 'raptor_master.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': '%s/raptor_worker.py' % PWD, + 'target': 'raptor_worker.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': cfg_file, + 'target': os.path.basename(cfg_file), + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS} + ] + tds.append(td) + + if len(tds) > 0: + report.info('Submit raptor master(s) %s\n' + % str([t.uid for t in tds])) + task = tmgr.submit_tasks(tds) + if not isinstance(task, list): + task = [task] + + states = tmgr.wait_tasks( + uids=[t.uid for t in task], + state=rp.FINAL + [rp.AGENT_EXECUTING], + timeout=60 + ) + logger.info('Master states: %s', str(states)) + + tds = list() + for i in range(1): + + #bson = cylon_join(comm=None, data={'unique': 0.9, 'scaling': 's', 'rows':n_rows, 'it': 10}) # For join test + #bson = cylon_groupby(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':n_rows, 'it': 10}) + #bson = cylon_sort(comm=None, data={'unique': 0.9, 'scaling': 's', 'rows':n_rows, 'it': 10}) + #bson = cylon_slice(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':n_rows, 'it': 10}) + bson = cylon_concat(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':n_rows, 'it': 10}) + + tds.append(rp.TaskDescription({ + 'uid' : 'task.cylon.w.%06d' % i, + 'mode' : rp.TASK_FUNC, + 'ranks' : cores_per_node, + 'function' : bson, + 'raptor_id' : master_ids[i % n_masters]})) + + + if len(tds) > 0: + report.info('Submit tasks %s.\n' % str([t.uid for t in tds])) + tasks = tmgr.submit_tasks(tds) + + logger.info('Wait for tasks %s', [t.uid for t in tds]) + tmgr.wait_tasks(uids=[t.uid for t in tasks]) + + for task in tasks: + report.info('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task.uid, task.state, task.stdout, task.return_value)) + + finally: + session.close(download=True) + + report.info('Logs from the master task should now be in local files \n') + report.info('like %s/%s/%s.log\n' % (session.uid, pilot.uid, master_ids[0])) + +# ------------------------------------------------------------------------------ \ No newline at end of file diff --git a/target/rivanna/scripts/scaling_job.slurm b/target/rivanna/scripts/scaling_job.slurm index eb05a2f86..22ec1fb80 100644 --- a/target/rivanna/scripts/scaling_job.slurm +++ b/target/rivanna/scripts/scaling_job.slurm @@ -1,9 +1,9 @@ #!/bin/bash #SBATCH --nodes=4 -#SBATCH --ntasks-per-node=37 +#SBATCH --ntasks-per-node=40 #SBATCH --exclusive #SBATCH --time=1:00:00 -#SBATCH --partition=parallel +#SBATCH --partition=bii #SBATCH -A bii_dsc_community #SBATCH --output=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.out #SBATCH --error=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.err @@ -15,9 +15,8 @@ module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 #module load openmpi/4.1.4 #module load python/3.11.1 -export BASE=/project/bii_dsc_community/thf2bn #source $HOME/CYLON/bin/activate -source $BASE/cylon_rp_venv/bin/activate +source $HOME/cylon_rp_venv/bin/activate BUILD_PATH=$PWD/build @@ -28,4 +27,4 @@ which python gcc g++ #srun -n 160 python $PWD/rivanna/scripts/cylon_scaling.py -n 35000000 -time mpirun -np 148 python rivanna/scripts/cylon_scaling.py -n 35000000 +mpirun -np 160 python rivanna/scripts/cylon_scaling.py -n 35000000 diff --git a/target/rivanna/scripts/submit.log b/target/rivanna/scripts/submit.log new file mode 100644 index 000000000..d53e601ab --- /dev/null +++ b/target/rivanna/scripts/submit.log @@ -0,0 +1 @@ +1 submitted: nodes=20 threads=37 total=740 diff --git a/target/uva-cs/README.md b/target/uva-cs/README.md new file mode 100644 index 000000000..46eba1d66 --- /dev/null +++ b/target/uva-cs/README.md @@ -0,0 +1,50 @@ +# Running Cylon on UVA CS Cluster + +Arup Sarker (arupcsedu@gmail.com, djy8hg@virginia.edu) + + + +## Install instructions + +UVA CS Cluster is an HPC system offerbed by Department of Computer Science, University of Virginia. +This will use custom dependencies of the system gcc, openmpi version. + +```shell +ssh your_computing_id@portal.cs.virginia.edu +git clone https://github.com/cylondata/cylon.git +cd cylon + +module load gcc-11.2.0 openmpi-4.1.4 + +conda env create -f conda/environments/cylon.yml +conda activate cylon_dev +DIR=$HOME/anaconda3/envs/cylon_dev + +export PATH=$DIR/bin:$PATH LD_LIBRARY_PATH=$DIR/lib:$LD_LIBRARY_PATH PYTHONPATH=$DIR/lib/python3.9/site-packages + + +which python gcc g++ +#---- (END) ---- + +python build.py --cpp --test --python --pytest + +``` +It will take some time to build. So, grab a coffee!!! + +Let's perform a scaling operation with join. Before that, we have to install the dependencies as follow. + +```shell +pip install cloudmesh-common +pip install openssl-python +``` + +We will slum script to run the scaling operation. + +```shell +cd target/uva-cs/scripts/ +sbatch scaling_job.sh +``` + +For more details of the dependent libraries and Slurm scripts, Please checkout the following links: + +* diff --git a/target/uva-cs/rp-scripts/README.md b/target/uva-cs/rp-scripts/README.md new file mode 100644 index 000000000..28a496536 --- /dev/null +++ b/target/uva-cs/rp-scripts/README.md @@ -0,0 +1,51 @@ +# Running Cylon on Rivanna + +Arup Sarker (arupcsedu@gmail.com, djy8hg@virginia.edu) + + + +## Install instructions for Radical Pilot + +Rivanna is an HPC system offerbed by University of Virginia. +This will use custom dependencies of the system gcc, openmpi version. +Use the same python environment "cylon_rct" for radical-pilot + +```shell +module load gcc/9.2.0 openmpi/3.1.6 python/3.7.7 cmake/3.23.3 +source $HOME/cylon_rct/bin/activate +pip install radical.pilot +``` +For checking all dependent library version: + +```shell +radical-stack +``` +You need to export mongo-db url: + +```shell +export RADICAL_PILOT_DBURL="mongodb:ADD_YOUR_URL" +``` +Setup is done. Now let's execute scaling with cylon. + +```shell +cd /some_path_to/cylon/rivanna/rp-scripts +python rp_scaling.py +``` + +If you want to make any change in the uva resource file(/some_path_to/radical.pilot/src/radical/pilot/configs) or any other places in the radical pilot source, + +```shell +git clone https://github.com/radical-cybertools/radical.pilot.git +cd radical.pilot +``` +For reflecting those change, you need to upgrade radical-pilot by, + +```shell +pip install . --upgrade +``` + +To uninstall radical pilot, execute + +```shell +pip uninstall radical.pilot +``` \ No newline at end of file diff --git a/target/uva-cs/rp-scripts/config.json b/target/uva-cs/rp-scripts/config.json new file mode 100644 index 000000000..ceed11256 --- /dev/null +++ b/target/uva-cs/rp-scripts/config.json @@ -0,0 +1,223 @@ + +{ + "local.localhost" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 32, + "gpus" : 0 + }, + + "local.localhost_test" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 1, + "gpus" : 0 + }, + + "local.localhost_flux" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 128, + "gpus" : 0 + }, + + "local.localhost_prte" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 64 + }, + + "local.local" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 64 + }, + + "anl.arcticus" : { + "project" : null, + "queue" : "arcticus", + "schema" : "local", + "cores" : 96, + "gpus" : 2 + }, + + "nersc.edison" : { + "project" : null, + "queue" : "debug", + "schema" : "ssh", + "cores" : 64 + }, + + "access.expanse": { + "project" : "UNC100", + "queue" : "compute", + "schema" : "local", + "cores" : 128 + }, + + "access.supermic_ssh" : { + "project" : "TG-MCB090174", + "queue" : "workq", + "schema" : "gsissh", + "cores" : 64 + }, + + "access.supermic" : { + "project" : "TG-MCB090174", + "queue" : "workq", + "schema" : "gsissh", + "cores" : 80 + }, + + "access.supermic_orte" : { + "project" : "TG-CCR140028", + "queue" : "workq", + "schema" : "gsissh", + "cores" : 80 + }, + + "princeton.traverse" : { + "project" : "tromp", + "queue" : "all", + "schema" : "local", + "cores" : 32, + "gpus" : 4 + }, + + "princeton.tiger_cpu" : { + "project" : "geo", + "queue" : "cpu", + "schema" : "ssh", + "cores" : 40 + }, + + "princeton.tiger_gpu" : { + "project" : "geo", + "queue" : "gpu", + "schema" : "local", + "cores" : 56 + }, + + "tacc.frontera" : { + "project" : "MCB20006", + "queue" : "development", + "schema" : "ssh", + "cores" : 64 + }, + + "access.longhorn" : { + "project" : "FTA-Jha", + "queue" : "development", + "schema" : "local", + "cores" : 40 + }, + + "access.bridges2" : { + "project" : null, + "queue" : "RM", + "schema" : "local", + "cores" : 128 + }, + + "access.wrangler" : { + "project" : "TG-MCB090174", + "queue" : "debug", + "schema" : "gsissh", + "cores" : 24 + }, + + "access.stampede2_ssh" : { + "project" : "TG-MCB090174", + "queue" : "development", + "schema" : "local", + "cores" : 204 + }, + + "access.stampede2_srun" : { + "project" : "TG-MCB090174", + "queue" : "development", + "schema" : "local", + "cores" : 204 + }, + + "access.stampede2_ibrun" : { + "project" : "TG-MCB090174", + "queue" : "development", + "schema" : "local", + "cores" : 204 + }, + + "ncar.cheyenne": { + "project" : "URTG0014", + "queue" : "regular", + "schema" : "local", + "cores" : 72 + }, + + "llnl.lassen" : { + "project" : "CV_DDMD", + "queue" : "pdebug", + # "queue" : "pbatch", + "schema" : "local", + "cores" : 30, + "gpus" : 0 + }, + + "ornl.spock" : { + "project" : "csc449", + "queue" : "ecp", + "schema" : "local", + "cores" : 64, + "gpus" : 4 + }, + + "ornl.summit" : { + "project" : "CSC343", + "queue" : "batch", + "schema" : "local", + "cores" : 168, + "gpus" : 6 + }, + + "ornl.summit_prte" : { + "project" : "geo111", + "queue" : "batch", + "schema" : "local", + "cores" : 168, + "gpus" : 6 + }, + + "ornl.rhea_aprun" : { + "project" : "BIP149", + "queue" : "batch", + "schema" : "local", + "cores" : 64 + }, + + "radical.three" : { + "project" : null, + "queue" : null, + "schema" : "ssh", + "cores" : 8 + }, + + "rutgers.amarel" : { + "project" : null, + "queue" : null, + "schema" : "ssh", + "cores" : 8 + }, + + "uva.rivanna" : { + "project" : null, + "queue" : "standard", + "schema" : "local", + "cores" : 8 + } +} + diff --git a/target/uva-cs/rp-scripts/raptor.cfg b/target/uva-cs/rp-scripts/raptor.cfg new file mode 100644 index 000000000..bf202a248 --- /dev/null +++ b/target/uva-cs/rp-scripts/raptor.cfg @@ -0,0 +1,55 @@ +{ + # resource configuration + "cores_per_node" : 398, + "gpus_per_node" : 0, + + # raptor configuration + "n_masters" : 1, + "n_workers" : 1, + "masters_per_node" : 1, + "nodes_per_worker" : 1, + + # extra nodes for non-raptor rp tasks + "nodes_rp" : 1, + # extra resources for the rp agent (optional) + "nodes_agent" : 0, + + # pilot runtime in min + "runtime" : 60, + + # task configuration + "cores_per_task" : 1, + "sleep" : 3, + # These are used as the range of the for loops for defining and submitting + # non-raptor and raptor tasks, respectively. + "tasks_rp" : 1, + "tasks_raptor" : 1, + + "pilot_descr": { + "resource" : "uva.rivanna", + "runtime" : 60, + "access_schema": "interactive", + "queue" : "bii", + "project" : "bii_dsc_community" + }, + + "master_descr": { + "mode" : "raptor.master", + "named_env" : "cylon_rp_venv", + "executable" : "./raptor_master.py" + }, + + "worker_descr": { + "mode" : "raptor.worker", + "named_env" : "cylon_rp_venv", + "pre_exec" : ["module load gcc/9.2.0", + "module load openmpi/3.1.6", + "module load python/3.7.7", + "export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH" + ], + + # custom worker class + "raptor_class" : "MyWorker", + "raptor_file" : "./raptor_worker.py" + } +} \ No newline at end of file diff --git a/target/uva-cs/rp-scripts/raptor_master.py b/target/uva-cs/rp-scripts/raptor_master.py new file mode 100755 index 000000000..6e417682e --- /dev/null +++ b/target/uva-cs/rp-scripts/raptor_master.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 + +import os +import sys +import time + +from collections import defaultdict + +import radical.utils as ru +import radical.pilot as rp + + +def out(msg): + sys.stdout.write('==== %s\n' % msg) + sys.stdout.flush() + + +# This script has to run as a task within a pilot allocation, and is +# a demonstration of a task overlay within the RCT framework. It is expected +# to be staged and launched by the `raptor.py` script in the radical.pilot +# examples/misc directory. +# This master task will: +# +# - create a master which bootstraps a specific communication layer +# - insert n workers into the pilot (again as a task) +# - perform RPC handshake with those workers +# - send RPC requests to the workers +# - terminate the worker +# +# The worker itself is an external program which is not covered in this code. + +RANKS = 2 + + + +# ------------------------------------------------------------------------------ +# +class MyMaster(rp.raptor.Master): + ''' + This class provides the communication setup for the task overlay: it will + set up the request / response communication queues and provide the endpoint + information to be forwarded to the workers. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + self._cnt = 0 + self._submitted = defaultdict(int) + self._collected = defaultdict(int) + + # initialize the task overlay base class. That base class will ensure + # proper communication channels to the pilot agent. + super().__init__(cfg=cfg) + + self._sleep = self._cfg.sleep + + + # -------------------------------------------------------------------------- + # + def submit(self): + + # self._prof.prof('create_start') + + # create additional tasks to be distributed to the workers. + + #tds = list() + + #self._prof.prof('create_stop') + + # wait for outstanding tasks to complete + while True: + + completed = sum(self._collected.values()) + submitted = sum(self._submitted.values()) + + if submitted: + # request_cb has been called, so we can wait for completion + + self._log.info('=== submit done?: %d >= %d ', completed, submitted) + + if completed >= submitted: + break + + time.sleep(1) + + self._log.info('=== submit done!') + + + # -------------------------------------------------------------------------- + # + def request_cb(self, tasks): + + for task in tasks: + + self._log.debug('=== request_cb %s\n', task['uid']) + + mode = task['description']['mode'] + uid = task['description']['uid'] + + self._submitted[mode] += 1 + + # for each `function` mode task, submit one more `proc` mode request + if mode == rp.TASK_FUNC: + self.submit_tasks(rp.TaskDescription( + {'uid' : 'extra' + uid, + # 'timeout' : 10, + 'mode' : rp.TASK_PROC, + 'ranks' : RANKS, + 'executable': '/bin/sh', + 'arguments' : ['-c', + 'sleep %d; ' % self._sleep + + 'echo "hello $RP_RANK/$RP_RANKS: ' + '$RP_TASK_ID"'], + 'raptor_id' : 'master.000000'})) + + return tasks + + + # -------------------------------------------------------------------------- + # + def result_cb(self, tasks): + ''' + Log results. + + Log file is named by the master tasks UID. + ''' + for task in tasks: + + mode = task['description']['mode'] + self._collected[mode] += 1 + + # NOTE: `state` will be `AGENT_EXECUTING` + self._log.info('=== result_cb %s: %s [%s] [%s]', + task['uid'], + task['state'], + task['stdout'], + task['return_value']) + + # Note that stdout is part of the master task result. + print('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task['uid'], task['state'], task['stdout'], + task['return_value'])) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + # This master script runs as a task within a pilot allocation. The purpose + # of this master is to (a) spawn a set or workers within the same + # allocation, (b) to distribute work items (`hello` function calls) to those + # workers, and (c) to collect the responses again. + cfg_fname = str(sys.argv[1]) + cfg = ru.Config(cfg=ru.read_json(cfg_fname)) + cfg.rank = int(sys.argv[2]) + + n_workers = cfg.n_workers + nodes_per_worker = cfg.nodes_per_worker + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + descr = cfg.worker_descr + pwd = os.getcwd() + + # one node is used by master. Alternatively (and probably better), we could + # reduce one of the worker sizes by one core. But it somewhat depends on + # the worker type and application workload to judge if that makes sense, so + # we leave it for now. + + # create a master class instance - this will establish communication to the + # pilot agent + master = MyMaster(cfg) + + # insert `n` worker tasks into the agent. The agent will schedule (place) + # those workers and execute them. Insert one smaller worker (see above) + # NOTE: this assumes a certain worker size / layout + out('workers: %d' % n_workers) + descr['ranks'] = nodes_per_worker * cores_per_node + descr['gpus_per_rank'] = nodes_per_worker * gpus_per_node + worker_ids = master.submit_workers( + [rp.TaskDescription(descr) for _ in range(n_workers)]) + + # wait until `m` of those workers are up + # This is optional, work requests can be submitted before and will wait in + # a work queue. + # FIXME + master.wait_workers(count=1) + + out('start') + master.start() + out('submit') + master.submit() + + # let some time pass for client side tasks to complete + time.sleep(600) + + out('stop') + # TODO: can be run from thread? + master.stop() + out('join') + + # TODO: worker state callback + master.join() + out('done') + + # TODO: expose RPC hooks + + +# ------------------------------------------------------------------------------ diff --git a/target/uva-cs/rp-scripts/raptor_worker.py b/target/uva-cs/rp-scripts/raptor_worker.py new file mode 100755 index 000000000..d1ae066dc --- /dev/null +++ b/target/uva-cs/rp-scripts/raptor_worker.py @@ -0,0 +1,42 @@ + +import time +import random + +import radical.pilot as rp + +RANKS = 2 + + +# ------------------------------------------------------------------------------ +# +class MyWorker(rp.raptor.MPIWorker): + ''' + This class provides the required functionality to execute work requests. + In this simple example, the worker only implements a single call: `hello`. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + super().__init__(cfg) + + self.register_mode('foo', self._dispatch_foo) + + + # -------------------------------------------------------------------------- + # + def _dispatch_foo(self, task): + + import pprint + self._log.debug('==== running foo\n%s', + pprint.pformat(task['description'])) + + return 'out', 'err', 0, None, None + + + # -------------------------------------------------------------------------- + # + +# ------------------------------------------------------------------------------ + diff --git a/target/uva-cs/rp-scripts/resource_uva.json b/target/uva-cs/rp-scripts/resource_uva.json new file mode 100644 index 000000000..de4cdc193 --- /dev/null +++ b/target/uva-cs/rp-scripts/resource_uva.json @@ -0,0 +1,39 @@ +{ + "description" : "Heterogeneous community-model Linux cluster", + "notes" : "Access from registered UVA IP address. See https://www.rc.virginia.edu/userinfo/rivanna/login/", + "schemas" : ["local", "ssh", "interactive"], + "local" : + { + "job_manager_endpoint" : "slurm://rivanna.hpc.virginia.edu/", + "filesystem_endpoint" : "file://rivanna.hpc.virginia.edu/" + }, + "ssh" : + { + "job_manager_endpoint" : "slurm+ssh://rivanna.hpc.virginia.edu/", + "filesystem_endpoint" : "sftp://rivanna.hpc.virginia.edu/" + }, + "interactive" : + { + "job_manager_endpoint" : "fork://localhost/", + "filesystem_endpoint" : "file://localhost/" + }, + "default_queue" : "standard", + "resource_manager" : "SLURM", + "agent_scheduler" : "CONTINUOUS", + "agent_spawner" : "POPEN", + "launch_methods" : { + "order": ["MPIRUN"], + "MPIRUN" : {} + }, + "pre_bootstrap_0" : [ + "module load gcc/9.2.0", + "module load openmpi/3.1.6", + "module load python/3.7.7", + "export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH" + ], + "default_remote_workdir" : "/scratch/$USER", + "python_dist" : "default", + "virtenv_dist" : "default", + "virtenv_mode" : "create", + "rp_version" : "local" +} \ No newline at end of file diff --git a/target/uva-cs/rp-scripts/rp-cylon.slurm b/target/uva-cs/rp-scripts/rp-cylon.slurm new file mode 100644 index 000000000..fe360d5d4 --- /dev/null +++ b/target/uva-cs/rp-scripts/rp-cylon.slurm @@ -0,0 +1,21 @@ +#!/bin/bash +#SBATCH --nodes=10 +#SBATCH --ntasks-per-node=40 +#SBATCH --exclusive +#SBATCH --time=2:00:00 +#SBATCH --partition=bii +#SBATCH -A bii_dsc_community +#SBATCH --output=rivanna/scripts/cylogs/rp-cylon-3n-40w-5m-%x-%j.out +#SBATCH --error=rivanna/scripts/cylogs/rp-cylon-3n-40w-5m-%x-%j.err + + +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + + +source $HOME/cylon_rp_venv/bin/activate + +export RADICAL_LOG_LVL="DEBUG" +export RADICAL_PROFILE="TRUE" +export RADICAL_PILOT_DBURL="mongodb://rct-tutorial:HXH7vExF7GvCeMWn@95.217.193.116:27017/rct-tutorial" + +python rivanna/rp-scripts/rp_scaling.py diff --git a/target/uva-cs/rp-scripts/rp_scaling.py b/target/uva-cs/rp-scripts/rp_scaling.py new file mode 100644 index 000000000..f65e1eb65 --- /dev/null +++ b/target/uva-cs/rp-scripts/rp_scaling.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python3 + +''' +Demonstrate the "raptor" features for remote Task management. + +This script and its supporting files may use relative file paths. Run from the +directory in which you found it. + +Refer to the ``raptor.cfg`` file in the same directory for configurable run time +details. + +By default, this example uses the ``local.localhost`` resource with the +``local`` access scheme where RP oversubscribes resources to emulate multiple +nodes. + +In this example, we + - Launch one or more raptor "master" task(s), which self-submits additional + tasks (results are logged in the master's `result_cb` callback). + - Stage scripts to be used by a raptor "Worker" + - Provision a Python virtual environment with + :py:func:`~radical.pilot.prepare_env` + - Submit several tasks that will be routed through the master(s) to the + worker(s). + - Submit a non-raptor task in the same Pilot environment + +''' + +import os +import sys + +import radical.utils as ru +import radical.pilot as rp + + +# To enable logging, some environment variables need to be set. +# Ref +# * https://radicalpilot.readthedocs.io/en/stable/overview.html#what-about-logging +# * https://radicalpilot.readthedocs.io/en/stable/developer.html#debugging +# For terminal output, set RADICAL_LOG_TGT=stderr or RADICAL_LOG_TGT=stdout +logger = ru.Logger('raptor') +PWD = os.path.abspath(os.path.dirname(__file__)) +RANKS = 398 + + +# ------------------------------------------------------------------------------ +# +@rp.pythontask +def cylon_join(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':20000000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + + +def cylon_slice(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':20000000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1[0:20000000, env] # distributed slice + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + +# ------------------------------------------------------------------------------ +# +def task_state_cb(task, state): + logger.info('task %s: %s', task.uid, state) + if state == rp.FAILED: + logger.error('task %s failed', task.uid) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + if len(sys.argv) < 2: + cfg_file = '%s/raptor.cfg' % PWD + + else: + cfg_file = sys.argv[1] + + cfg = ru.Config(cfg=ru.read_json(cfg_file)) + + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + n_masters = cfg.n_masters + n_workers = cfg.n_workers + masters_per_node = cfg.masters_per_node + nodes_per_worker = cfg.nodes_per_worker + + # we use a reporter class for nicer output + report = ru.Reporter(name='radical.pilot') + report.title('Raptor example (RP version %s)' % rp.version) + + session = rp.Session() + try: + pd = rp.PilotDescription(cfg.pilot_descr) + + pd.cores = 400 + pd.gpus = 0 + pd.runtime = 60 + + pmgr = rp.PilotManager(session=session) + tmgr = rp.TaskManager(session=session) + tmgr.register_callback(task_state_cb) + + pilot = pmgr.submit_pilots(pd) + tmgr.add_pilots(pilot) + + pmgr.wait_pilots(uids=pilot.uid, state=[rp.PMGR_ACTIVE]) + + report.info('Stage files for the worker `my_hello` command.\n') + # See raptor_worker.py. + pilot.stage_in({'source': ru.which('radical-pilot-hello.sh'), + 'target': 'radical-pilot-hello.sh', + 'action': rp.TRANSFER}) + + # Issue an RPC to provision a Python virtual environment for the later + # raptor tasks. Note that we are telling prepare_env to install + # radical.pilot and radical.utils from sdist archives on the local + # filesystem. This only works for the default resource, local.localhost. + report.info('Call pilot.prepare_env()... ') + pilot.prepare_env(env_name='cylon_rp_venv', + env_spec={'type' : 'venv', + 'path' : '$HOME/cylon_rp_venv', + 'setup': []}) + report.info('done\n') + + # Launch a raptor master task, which will launch workers and self-submit + # some additional tasks for illustration purposes. + + master_ids = [ru.generate_id('master.%(item_counter)06d', + ru.ID_CUSTOM, ns=session.uid) + for _ in range(n_masters)] + + tds = list() + for i in range(n_masters): + td = rp.TaskDescription(cfg.master_descr) + td.mode = rp.RAPTOR_MASTER + td.uid = master_ids[i] + td.arguments = [cfg_file, i] + td.cpu_processes = 1 + td.cpu_threads = 1 + td.named_env = 'rp' + td.input_staging = [{'source': '%s/raptor_master.py' % PWD, + 'target': 'raptor_master.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': '%s/raptor_worker.py' % PWD, + 'target': 'raptor_worker.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': cfg_file, + 'target': os.path.basename(cfg_file), + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS} + ] + tds.append(td) + + if len(tds) > 0: + report.info('Submit raptor master(s) %s\n' + % str([t.uid for t in tds])) + task = tmgr.submit_tasks(tds) + if not isinstance(task, list): + task = [task] + + states = tmgr.wait_tasks( + uids=[t.uid for t in task], + state=rp.FINAL + [rp.AGENT_EXECUTING], + timeout=60 + ) + logger.info('Master states: %s', str(states)) + + tds = list() + for i in range(1): + + bson = cylon_join(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':20000000, 'it': 10}) + tds.append(rp.TaskDescription({ + 'uid' : 'task.cylon.w.%06d' % i, + 'mode' : rp.TASK_FUNC, + 'ranks' : RANKS, + 'function' : bson, + 'raptor_id' : master_ids[i % n_masters]})) + + + if len(tds) > 0: + report.info('Submit tasks %s.\n' % str([t.uid for t in tds])) + tasks = tmgr.submit_tasks(tds) + + logger.info('Wait for tasks %s', [t.uid for t in tds]) + tmgr.wait_tasks(uids=[t.uid for t in tasks]) + + for task in tasks: + report.info('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task.uid, task.state, task.stdout, task.return_value)) + + finally: + session.close(download=True) + + report.info('Logs from the master task should now be in local files \n') + report.info('like %s/%s/%s.log\n' % (session.uid, pilot.uid, master_ids[0])) + +# ------------------------------------------------------------------------------ \ No newline at end of file diff --git a/target/uva-cs/scripts/Makefile b/target/uva-cs/scripts/Makefile new file mode 100644 index 000000000..c3c6af8ad --- /dev/null +++ b/target/uva-cs/scripts/Makefile @@ -0,0 +1,42 @@ +SHELL=/bin/bash + +.PHONY: load image-singularity image-docker project + +all: ${EXECS} + +login: + ssh -tt rivanna "/opt/rci/bin/ijob --partition=parallel --account=bii_dsc_community --time=30:00 --ntasks-per-node=4 --nodes=2" + +load: + ./load.sh + +clean: + rm -f *.log *.err script-*.slurm + rm -r raptor-*.cfg rp.session.* + +rp: load + python rp-experiment-setup.py + +cy: load + python cylon-experiment-setup.py + +q: + squeue --format="%.18i %.9P %.50j %.8u %.8T %.10M %.9l %.6D %R" --me + +a: + squeue --format="%all" --me + + +qq: + watch squeue --format=\"%.18i %.9P %.50j %.8u %.8T %.10M %.9l %.6D %R\" --me + +i: + cat out.log + cat out.err + fgrep "###" out.log | wc -l + + +cancel: + - ./cancel.sh + - squeue -u ${USER} + diff --git a/target/uva-cs/scripts/README.md b/target/uva-cs/scripts/README.md new file mode 100644 index 000000000..06fd60658 --- /dev/null +++ b/target/uva-cs/scripts/README.md @@ -0,0 +1,43 @@ +1. Install Cloudmesh + +``` +pip install cloudmesh-common +pip install openssl-python + +``` + +2. Make change in the ```cylon-experiment-setup.py ``` or ```rp-experiment-setup.py ``` for the configurations changes. + +``` +combination = [\ + # (1,4, 5000, "parallel", "exclusive"), # always pending + (2,37, 1000000, "parallel", ""), + (4,37, 35000000, "parallel", ""), + (6,37, 35000000, "parallel", ""), + (8,37, 35000000, "parallel", ""), + (10,37, 35000000, "parallel", ""), + (12,37, 35000000, "parallel", ""), + (14,37, 35000000, "parallel", ""), +] +``` + + +3. Load module and activate the python virtual environment + +``` +DIR=$HOME/anaconda3/envs/cylon_dev + +module load gcc-11.2.0 openmpi-4.1.4 +conda activate cylon_dev + + +export PATH=$DIR/bin:$PATH LD_LIBRARY_PATH=$DIR/lib:$LD_LIBRARY_PATH PYTHONPATH=$DIR/lib/python3.9/site-packages +``` +4. Run the scripts as follows. + +```bash +make clean # For cleaning +make rp # For radical pilot +make cy # for bear metal Cylon + +``` \ No newline at end of file diff --git a/target/uva-cs/scripts/cancel.sh b/target/uva-cs/scripts/cancel.sh new file mode 100755 index 000000000..78d88a29c --- /dev/null +++ b/target/uva-cs/scripts/cancel.sh @@ -0,0 +1,2 @@ +#! /bin/sh +squeue -u $USER | awk '{print $1}' | tail -n+2 | xargs scancel diff --git a/target/uva-cs/scripts/cylon-experiment-setup.py b/target/uva-cs/scripts/cylon-experiment-setup.py new file mode 100644 index 000000000..20ec6e159 --- /dev/null +++ b/target/uva-cs/scripts/cylon-experiment-setup.py @@ -0,0 +1,117 @@ +import os +import sys +from textwrap import dedent +from cloudmesh.common.util import writefile +from cloudmesh.common.util import readfile +from cloudmesh.common.util import banner +from cloudmesh.common.console import Console + +counter = 0 + +debug = True +debug = False + +partition="bii-gpu" + +partition="parallel" + + +# (nodes, threads, rows, partition, "exclusive") +combination = [\ + # (1,4, 5000, "parallel", "exclusive"), # always pending + (2,37, 1000000, "parallel", ""), + (4,37, 35000000, "parallel", ""), + (6,37, 35000000, "parallel", ""), + (8,37, 35000000, "parallel", ""), + (10,37, 35000000, "parallel", ""), + (12,37, 35000000, "parallel", ""), + (14,37, 35000000, "parallel", ""), +] + +''' +combination = [] +for nodes in range(0,50): + for threads in range(0,37): + combination.append((nodes+1, threads+1, "parallel", "")) +''' + +total = len(combination) +jobid="-%j" +# jobid="" + +f = open("submit.log", "w") +for nodes, threads, rows, partition, exclusive in combination: + counter = counter + 1 + + if exclusive == "exclusive": + exclusive = "#SBATCH --exclusive" + e = "e1" + else: + exclusive = "" + e = "e0" + + usable_threads = nodes * threads + + ''' + cores_per_node = nodes * threads - 2 + + print (cores_per_node) + + config = readfile("raptor.in.cfg") + + config = config.replace("CORES_PER_NODE", str(cores_per_node)) + config = config.replace("NO_OF_ROWS", str(rows)) + + + print (config) + + cfg_filename = f"raptor-{nodes}-{threads}.cfg" + + writefile(cfg_filename, config) + ''' + banner(f"SLURM {nodes} {threads} {counter}/{total}") + script=dedent(f""" + #!/bin/bash + #SBATCH --job-name=h-n={nodes:02d}-t={threads:02d}-e={e} + #SBATCH --nodes={nodes} + #SBATCH --ntasks-per-node={threads} + #SBATCH --time=15:00 + #SBATCH --output=out-{nodes:02d}-{threads:02d}{jobid}.log + #SBATCH --error=out-{nodes:02d}-{threads:02d}{jobid}.err + #SBATCH --partition=parallel + #SBATCH -A bii_dsc_community + {exclusive} + echo "..............................................................." + module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + echo "..............................................................." + source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate + echo "..............................................................." + BUILD_PATH=/project/bii_dsc_community/djy8hg/Project/cylon/build + echo "..............................................................." + export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH + echo "..............................................................." + which python gcc g++ + echo "..............................................................." + lscpu + echo "..............................................................." + time srun --exact --nodes {nodes} --ntasks {usable_threads} python cylon_scaling.py -n {rows} + echo "..............................................................." + """).strip() + + print (script) + filename = f"script-{nodes:02d}-{threads:02d}.slurm" + writefile(filename, script) + + + if not debug: + + r = os.system(f"sbatch {filename}") + total = nodes * threads + if r == 0: + msg = f"{counter} submitted: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.ok(msg) + else: + msg = f"{counter} failed: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.error(msg) + f.writelines([msg, "\n"]) +f.close() diff --git a/target/uva-cs/scripts/cylon_scaling.py b/target/uva-cs/scripts/cylon_scaling.py new file mode 100644 index 000000000..4b2a860e6 --- /dev/null +++ b/target/uva-cs/scripts/cylon_scaling.py @@ -0,0 +1,183 @@ +import time +import argparse + +import pandas as pd +from mpi4py import MPI +from numpy.random import default_rng +from pycylon.frame import CylonEnv, DataFrame +from pycylon.net import MPIConfig +from cloudmesh.common.StopWatch import StopWatch +from cloudmesh.common.dotdict import dotdict +from cloudmesh.common.Shell import Shell + + +def cylon_join(data=None): + StopWatch.start(f"join_total_{data['host']}_{data['rows']}_{data['it']}") + + comm = MPI.COMM_WORLD + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + if env.rank == 0: + print("Task# ", data['task']) + + for i in range(data['it']): + env.barrier() + StopWatch.start(f"join_{i}_{data['host']}_{data['rows']}_{data['it']}") + t1 = time.time() + df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + StopWatch.stop(f"join_{i}_{data['host']}_{data['rows']}_{data['it']}") + + StopWatch.stop(f"join_total_{data['host']}_{data['rows']}_{data['it']}") + + if env.rank == 0: + StopWatch.benchmark(tag=str(data)) + + env.finalize() + +def cylon_sort(data=None): + StopWatch.start(f"sort_total_{data['host']}_{data['rows']}_{data['it']}") + + comm = MPI.COMM_WORLD + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + + if env.rank == 0: + print("Task# ", data['task']) + + for i in range(data['it']): + env.barrier() + StopWatch.start(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}") + t1 = time.time() + df3 = df1.sort_values(by=[0], env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + StopWatch.stop(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}") + + StopWatch.stop(f"sort_total_{data['host']}_{data['rows']}_{data['it']}") + + if env.rank == 0: + StopWatch.benchmark(tag=str(data)) + + env.finalize() + + +def cylon_slice(data=None): + StopWatch.start(f"slice_total_{data['host']}_{data['rows']}_{data['it']}") + + comm = MPI.COMM_WORLD + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + if env.rank == 0: + print("Task# ", data['task']) + + for i in range(data['it']): + env.barrier() + StopWatch.start(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}") + t1 = time.time() + df3 = df1[0:20000000, env] # distributed slice + #print(df3) + #df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + StopWatch.stop(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}") + + StopWatch.stop(f"slice_total_{data['host']}_{data['rows']}_{data['it']}") + + if env.rank == 0: + StopWatch.benchmark(tag=str(data)) + + env.finalize() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="weak scaling") + parser.add_argument('-n', dest='rows', type=int, required=True) + parser.add_argument('-i', dest='it', type=int, default=10) + parser.add_argument('-u', dest='unique', type=float, default=0.9, help="unique factor") + parser.add_argument('-s', dest='scaling', type=str, default='w', choices=['s', 'w'], + help="s=strong w=weak") + + args = vars(parser.parse_args()) + args['host'] = "rivanna" + for i in range(1): + args['task'] = i + #cylon_slice(args) + #cylon_join(args) + cylon_sort(args) + + # os.system(f"{git} branch | fgrep '*' ") + # os.system(f"{git} rev-parse HEAD") diff --git a/target/uva-cs/scripts/load.sh b/target/uva-cs/scripts/load.sh new file mode 100755 index 000000000..e3e5fbaf0 --- /dev/null +++ b/target/uva-cs/scripts/load.sh @@ -0,0 +1,2 @@ +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 +source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate diff --git a/target/uva-cs/scripts/raptor.in.cfg b/target/uva-cs/scripts/raptor.in.cfg new file mode 100644 index 000000000..cebcecc20 --- /dev/null +++ b/target/uva-cs/scripts/raptor.in.cfg @@ -0,0 +1,55 @@ +{ + # resource configuration + "cores_per_node" : CORES_PER_NODE, + "gpus_per_node" : 0, + "no_of_rows" : NO_OF_ROWS, + # raptor configuration + "n_masters" : 1, + "n_workers" : 1, + "masters_per_node" : 1, + "nodes_per_worker" : 1, + + # extra nodes for non-raptor rp tasks + "nodes_rp" : 1, + # extra resources for the rp agent (optional) + "nodes_agent" : 0, + + # pilot runtime in min + "runtime" : 60, + + # task configuration + "cores_per_task" : 1, + "sleep" : 3, + # These are used as the range of the for loops for defining and submitting + # non-raptor and raptor tasks, respectively. + "tasks_rp" : 1, + "tasks_raptor" : 1, + + "pilot_descr": { + "resource" : "uva.rivanna", + "runtime" : 60, + "access_schema": "interactive", + "queue" : "parallel", + "project" : "bii_dsc_community" + }, + + "master_descr": { + "mode" : "raptor.master", + "named_env" : "cylon_rp_venv", + "executable" : "./raptor_master.py" + }, + + "worker_descr": { + "mode" : "raptor.worker", + "named_env" : "cylon_rp_venv", + "pre_exec" : ["module load gcc/9.2.0", + "module load openmpi/3.1.6", + "module load python/3.7.7", + "export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH" + ], + + # custom worker class + "raptor_class" : "MyWorker", + "raptor_file" : "./raptor_worker.py" + } +} \ No newline at end of file diff --git a/target/uva-cs/scripts/raptor_master.py b/target/uva-cs/scripts/raptor_master.py new file mode 100755 index 000000000..6e417682e --- /dev/null +++ b/target/uva-cs/scripts/raptor_master.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 + +import os +import sys +import time + +from collections import defaultdict + +import radical.utils as ru +import radical.pilot as rp + + +def out(msg): + sys.stdout.write('==== %s\n' % msg) + sys.stdout.flush() + + +# This script has to run as a task within a pilot allocation, and is +# a demonstration of a task overlay within the RCT framework. It is expected +# to be staged and launched by the `raptor.py` script in the radical.pilot +# examples/misc directory. +# This master task will: +# +# - create a master which bootstraps a specific communication layer +# - insert n workers into the pilot (again as a task) +# - perform RPC handshake with those workers +# - send RPC requests to the workers +# - terminate the worker +# +# The worker itself is an external program which is not covered in this code. + +RANKS = 2 + + + +# ------------------------------------------------------------------------------ +# +class MyMaster(rp.raptor.Master): + ''' + This class provides the communication setup for the task overlay: it will + set up the request / response communication queues and provide the endpoint + information to be forwarded to the workers. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + self._cnt = 0 + self._submitted = defaultdict(int) + self._collected = defaultdict(int) + + # initialize the task overlay base class. That base class will ensure + # proper communication channels to the pilot agent. + super().__init__(cfg=cfg) + + self._sleep = self._cfg.sleep + + + # -------------------------------------------------------------------------- + # + def submit(self): + + # self._prof.prof('create_start') + + # create additional tasks to be distributed to the workers. + + #tds = list() + + #self._prof.prof('create_stop') + + # wait for outstanding tasks to complete + while True: + + completed = sum(self._collected.values()) + submitted = sum(self._submitted.values()) + + if submitted: + # request_cb has been called, so we can wait for completion + + self._log.info('=== submit done?: %d >= %d ', completed, submitted) + + if completed >= submitted: + break + + time.sleep(1) + + self._log.info('=== submit done!') + + + # -------------------------------------------------------------------------- + # + def request_cb(self, tasks): + + for task in tasks: + + self._log.debug('=== request_cb %s\n', task['uid']) + + mode = task['description']['mode'] + uid = task['description']['uid'] + + self._submitted[mode] += 1 + + # for each `function` mode task, submit one more `proc` mode request + if mode == rp.TASK_FUNC: + self.submit_tasks(rp.TaskDescription( + {'uid' : 'extra' + uid, + # 'timeout' : 10, + 'mode' : rp.TASK_PROC, + 'ranks' : RANKS, + 'executable': '/bin/sh', + 'arguments' : ['-c', + 'sleep %d; ' % self._sleep + + 'echo "hello $RP_RANK/$RP_RANKS: ' + '$RP_TASK_ID"'], + 'raptor_id' : 'master.000000'})) + + return tasks + + + # -------------------------------------------------------------------------- + # + def result_cb(self, tasks): + ''' + Log results. + + Log file is named by the master tasks UID. + ''' + for task in tasks: + + mode = task['description']['mode'] + self._collected[mode] += 1 + + # NOTE: `state` will be `AGENT_EXECUTING` + self._log.info('=== result_cb %s: %s [%s] [%s]', + task['uid'], + task['state'], + task['stdout'], + task['return_value']) + + # Note that stdout is part of the master task result. + print('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task['uid'], task['state'], task['stdout'], + task['return_value'])) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + # This master script runs as a task within a pilot allocation. The purpose + # of this master is to (a) spawn a set or workers within the same + # allocation, (b) to distribute work items (`hello` function calls) to those + # workers, and (c) to collect the responses again. + cfg_fname = str(sys.argv[1]) + cfg = ru.Config(cfg=ru.read_json(cfg_fname)) + cfg.rank = int(sys.argv[2]) + + n_workers = cfg.n_workers + nodes_per_worker = cfg.nodes_per_worker + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + descr = cfg.worker_descr + pwd = os.getcwd() + + # one node is used by master. Alternatively (and probably better), we could + # reduce one of the worker sizes by one core. But it somewhat depends on + # the worker type and application workload to judge if that makes sense, so + # we leave it for now. + + # create a master class instance - this will establish communication to the + # pilot agent + master = MyMaster(cfg) + + # insert `n` worker tasks into the agent. The agent will schedule (place) + # those workers and execute them. Insert one smaller worker (see above) + # NOTE: this assumes a certain worker size / layout + out('workers: %d' % n_workers) + descr['ranks'] = nodes_per_worker * cores_per_node + descr['gpus_per_rank'] = nodes_per_worker * gpus_per_node + worker_ids = master.submit_workers( + [rp.TaskDescription(descr) for _ in range(n_workers)]) + + # wait until `m` of those workers are up + # This is optional, work requests can be submitted before and will wait in + # a work queue. + # FIXME + master.wait_workers(count=1) + + out('start') + master.start() + out('submit') + master.submit() + + # let some time pass for client side tasks to complete + time.sleep(600) + + out('stop') + # TODO: can be run from thread? + master.stop() + out('join') + + # TODO: worker state callback + master.join() + out('done') + + # TODO: expose RPC hooks + + +# ------------------------------------------------------------------------------ diff --git a/target/uva-cs/scripts/raptor_worker.py b/target/uva-cs/scripts/raptor_worker.py new file mode 100755 index 000000000..d1ae066dc --- /dev/null +++ b/target/uva-cs/scripts/raptor_worker.py @@ -0,0 +1,42 @@ + +import time +import random + +import radical.pilot as rp + +RANKS = 2 + + +# ------------------------------------------------------------------------------ +# +class MyWorker(rp.raptor.MPIWorker): + ''' + This class provides the required functionality to execute work requests. + In this simple example, the worker only implements a single call: `hello`. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + super().__init__(cfg) + + self.register_mode('foo', self._dispatch_foo) + + + # -------------------------------------------------------------------------- + # + def _dispatch_foo(self, task): + + import pprint + self._log.debug('==== running foo\n%s', + pprint.pformat(task['description'])) + + return 'out', 'err', 0, None, None + + + # -------------------------------------------------------------------------- + # + +# ------------------------------------------------------------------------------ + diff --git a/target/uva-cs/scripts/rp-cylon.slurm b/target/uva-cs/scripts/rp-cylon.slurm new file mode 100644 index 000000000..75e05ab9d --- /dev/null +++ b/target/uva-cs/scripts/rp-cylon.slurm @@ -0,0 +1,22 @@ +#!/bin/bash +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=37 +##SBATCH --exclusive +#SBATCH --time=00:20:00 +#SBATCH --partition=parallel +#SBATCH -A bii_dsc_community +#SBATCH --output=out-%x-%j.out +#SBATCH --error=err-%x-%j.err + + +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + + +source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate + +export RADICAL_LOG_LVL="DEBUG" +export RADICAL_PROFILE="TRUE" +export RADICAL_PILOT_DBURL="mongodb://rct-tutorial:HXH7vExF7GvCeMWn@95.217.193.116:27017/rct-tutorial" +export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH + +python groupby.py diff --git a/target/uva-cs/scripts/rp-experiment-setup.py b/target/uva-cs/scripts/rp-experiment-setup.py new file mode 100644 index 000000000..8d1901a72 --- /dev/null +++ b/target/uva-cs/scripts/rp-experiment-setup.py @@ -0,0 +1,112 @@ +import os +import sys +from textwrap import dedent +from cloudmesh.common.util import writefile +from cloudmesh.common.util import readfile +from cloudmesh.common.util import banner +from cloudmesh.common.console import Console + +counter = 0 + +debug = True +debug = False + +partition="bii-gpu" + +partition="parallel" + + +# (nodes, threads, rows, partition, "exclusive") +combination = [\ + # (2,4, 5000, "parallel", "exclusive"), # always pending + #(2,37, 1000000, "parallel", ""), + (4,37, 35000000, "parallel", ""), + (6,37, 35000000, "parallel", ""), + (8,37, 35000000, "parallel", ""), + (10,37, 35000000, "parallel", ""), + (12,37, 35000000, "parallel", ""), + (14,37, 35000000, "parallel", ""), +] + +''' +combination = [] +for nodes in range(0,50): + for threads in range(0,37): + combination.append((nodes+1, threads+1, "parallel", "")) +''' + +total = len(combination) +jobid="-%j" +# jobid="" + +f = open("submit.log", "w") +for nodes, threads, rows, partition, exclusive in combination: + counter = counter + 1 + + if exclusive == "exclusive": + exclusive = "#SBATCH --exclusive" + e = "e1" + else: + exclusive = "" + e = "e0" + + cores_per_node = nodes * threads - 2 + + print (cores_per_node) + + config = readfile("raptor.in.cfg") + + config = config.replace("CORES_PER_NODE", str(cores_per_node)) + config = config.replace("NO_OF_ROWS", str(rows)) + + + print (config) + + cfg_filename = f"raptor-{nodes}-{threads}.cfg" + + writefile(cfg_filename, config) + + banner(f"SLURM {nodes} {threads} {counter}/{total}") + script=dedent(f""" + #!/bin/bash + #SBATCH --job-name=h-n={nodes:02d}-t={threads:02d}-e={e} + #SBATCH --nodes={nodes} + #SBATCH --ntasks-per-node={threads} + #SBATCH --time=15:00 + #SBATCH --output=out-{nodes:02d}-{threads:02d}{jobid}.log + #SBATCH --error=out-{nodes:02d}-{threads:02d}{jobid}.err + #SBATCH --partition=parallel + #SBATCH -A bii_dsc_community + {exclusive} + echo "..............................................................." + ./load.sh + echo "..............................................................." + source /project/bii_dsc_community/djy8hg/cylon_rp_venv/bin/activate + echo "..............................................................." + export RADICAL_LOG_LVL="DEBUG" + export RADICAL_PROFILE="TRUE" + export RADICAL_PILOT_DBURL="mongodb://rct-tutorial:HXH7vExF7GvCeMWn@95.217.193.116:27017/rct-tutorial" + echo "..............................................................." + lscpu + echo "..............................................................." + time python rp_scaling.py {cfg_filename} + echo "..............................................................." + """).strip() + + print (script) + filename = f"script-{nodes:02d}-{threads:02d}.slurm" + writefile(filename, script) + + + if not debug: + + r = os.system(f"sbatch {filename}") + total = nodes * threads + if r == 0: + msg = f"{counter} submitted: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.ok(msg) + else: + msg = f"{counter} failed: nodes={nodes:02d} threads={threads:02d} total={total}" + Console.error(msg) + f.writelines([msg, "\n"]) +f.close() diff --git a/target/uva-cs/scripts/rp_scaling.py b/target/uva-cs/scripts/rp_scaling.py new file mode 100644 index 000000000..b4cc4e5bc --- /dev/null +++ b/target/uva-cs/scripts/rp_scaling.py @@ -0,0 +1,484 @@ +#!/usr/bin/env python3 + +''' +Demonstrate the "raptor" features for remote Task management. + +This script and its supporting files may use relative file paths. Run from the +directory in which you found it. + +Refer to the ``raptor.cfg`` file in the same directory for configurable run time +details. + +By default, this example uses the ``local.localhost`` resource with the +``local`` access scheme where RP oversubscribes resources to emulate multiple +nodes. + +In this example, we + - Launch one or more raptor "master" task(s), which self-submits additional + tasks (results are logged in the master's `result_cb` callback). + - Stage scripts to be used by a raptor "Worker" + - Provision a Python virtual environment with + :py:func:`~radical.pilot.prepare_env` + - Submit several tasks that will be routed through the master(s) to the + worker(s). + - Submit a non-raptor task in the same Pilot environment + +''' + +import os +import sys +from textwrap import dedent +from cloudmesh.common.util import writefile +from cloudmesh.common.util import readfile +from cloudmesh.common.util import banner +from cloudmesh.common.console import Console + +import radical.utils as ru +import radical.pilot as rp + + +# To enable logging, some environment variables need to be set. +# Ref +# * https://radicalpilot.readthedocs.io/en/stable/overview.html#what-about-logging +# * https://radicalpilot.readthedocs.io/en/stable/developer.html#debugging +# For terminal output, set RADICAL_LOG_TGT=stderr or RADICAL_LOG_TGT=stdout +logger = ru.Logger('raptor') +PWD = os.path.abspath(os.path.dirname(__file__)) +RANKS = 72 + + +# ------------------------------------------------------------------------------ +# +@rp.pythontask +def cylon_join(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + +@rp.pythontask +def cylon_slice(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + #data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + #df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1[max_val * 0.2 :max_val * u, env] # distributed slice + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + +@rp.pythontask +def cylon_groupby(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + + df3 = df1.groupby(by=0).agg({ + "1": "sum", + "2": "min" + }) + + df4 = df1.groupby(by=0).min() + + df5 = df1.groupby(by=[0, 1]).max() + + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + +@rp.pythontask +def cylon_sort(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + #data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + #df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + #df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + #print("Distributed Sort with sort options", env.rank) + bins = env.world_size * 2 + #df3 = df1.sort_values(by=[0], num_bins=bins, num_samples=bins, env=env) + df3 = df1.sort_values(by=[0], env=env) + #print(df3) + + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + +@rp.pythontask +def cylon_concat(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':5000, 'it': 10}): + + import time + import argparse + + import pandas as pd + import pycylon as cn + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data3 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + df3 = DataFrame(pd.DataFrame(data3).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df4 = cn.concat(axis=0, objs=[df1, df2, df3], env=env) + #print(df4) + + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df4)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + + +# ------------------------------------------------------------------------------ +# +def task_state_cb(task, state): + logger.info('task %s: %s', task.uid, state) + if state == rp.FAILED: + logger.error('task %s failed', task.uid) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + if len(sys.argv) < 2: + cfg_file = '%s/raptor.cfg' % PWD + + else: + cfg_file = sys.argv[1] + + cfg = ru.Config(cfg=ru.read_json(cfg_file)) + + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + n_masters = cfg.n_masters + n_workers = cfg.n_workers + masters_per_node = cfg.masters_per_node + nodes_per_worker = cfg.nodes_per_worker + n_rows = cfg.no_of_rows + + # we use a reporter class for nicer output + report = ru.Reporter(name='radical.pilot') + report.title('Raptor example (RP version %s)' % rp.version) + + session = rp.Session() + try: + pd = rp.PilotDescription(cfg.pilot_descr) + + pd.cores = cores_per_node + 2 + pd.gpus = 0 + pd.runtime = 60 + + pmgr = rp.PilotManager(session=session) + tmgr = rp.TaskManager(session=session) + tmgr.register_callback(task_state_cb) + + pilot = pmgr.submit_pilots(pd) + tmgr.add_pilots(pilot) + + pmgr.wait_pilots(uids=pilot.uid, state=[rp.PMGR_ACTIVE]) + + report.info('Stage files for the worker `my_hello` command.\n') + # See raptor_worker.py. + pilot.stage_in({'source': ru.which('radical-pilot-hello.sh'), + 'target': 'radical-pilot-hello.sh', + 'action': rp.TRANSFER}) + + # Issue an RPC to provision a Python virtual environment for the later + # raptor tasks. Note that we are telling prepare_env to install + # radical.pilot and radical.utils from sdist archives on the local + # filesystem. This only works for the default resource, local.localhost. + report.info('Call pilot.prepare_env()... ') + pilot.prepare_env(env_name='cylon_rp_venv', + env_spec={'type' : 'venv', + 'path' : '/project/bii_dsc_community/djy8hg/cylon_rp_venv', + 'setup': []}) + report.info('done\n') + + # Launch a raptor master task, which will launch workers and self-submit + # some additional tasks for illustration purposes. + + master_ids = [ru.generate_id('master.%(item_counter)06d', + ru.ID_CUSTOM, ns=session.uid) + for _ in range(n_masters)] + + tds = list() + #f = open("output-{cores_per_node}-{n_rows}.log", "w") + for i in range(n_masters): + td = rp.TaskDescription(cfg.master_descr) + td.mode = rp.RAPTOR_MASTER + td.uid = master_ids[i] + td.arguments = [cfg_file, i] + td.cpu_processes = 1 + td.cpu_threads = 1 + td.named_env = 'rp' + td.input_staging = [{'source': '%s/raptor_master.py' % PWD, + 'target': 'raptor_master.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': '%s/raptor_worker.py' % PWD, + 'target': 'raptor_worker.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': cfg_file, + 'target': os.path.basename(cfg_file), + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS} + ] + tds.append(td) + + if len(tds) > 0: + report.info('Submit raptor master(s) %s\n' + % str([t.uid for t in tds])) + task = tmgr.submit_tasks(tds) + if not isinstance(task, list): + task = [task] + + states = tmgr.wait_tasks( + uids=[t.uid for t in task], + state=rp.FINAL + [rp.AGENT_EXECUTING], + timeout=60 + ) + logger.info('Master states: %s', str(states)) + + tds = list() + for i in range(1): + + #bson = cylon_join(comm=None, data={'unique': 0.9, 'scaling': 's', 'rows':n_rows, 'it': 10}) # For join test + #bson = cylon_groupby(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':n_rows, 'it': 10}) + #bson = cylon_sort(comm=None, data={'unique': 0.9, 'scaling': 's', 'rows':n_rows, 'it': 10}) + #bson = cylon_slice(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':n_rows, 'it': 10}) + bson = cylon_concat(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':n_rows, 'it': 10}) + + tds.append(rp.TaskDescription({ + 'uid' : 'task.cylon.w.%06d' % i, + 'mode' : rp.TASK_FUNC, + 'ranks' : cores_per_node, + 'function' : bson, + 'raptor_id' : master_ids[i % n_masters]})) + + + if len(tds) > 0: + report.info('Submit tasks %s.\n' % str([t.uid for t in tds])) + tasks = tmgr.submit_tasks(tds) + + logger.info('Wait for tasks %s', [t.uid for t in tds]) + tmgr.wait_tasks(uids=[t.uid for t in tasks]) + + for task in tasks: + report.info('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task.uid, task.state, task.stdout, task.return_value)) + + finally: + session.close(download=True) + + report.info('Logs from the master task should now be in local files \n') + report.info('like %s/%s/%s.log\n' % (session.uid, pilot.uid, master_ids[0])) + +# ------------------------------------------------------------------------------ \ No newline at end of file diff --git a/target/uva-cs/scripts/scaling_job.sh b/target/uva-cs/scripts/scaling_job.sh new file mode 100755 index 000000000..ebd627807 --- /dev/null +++ b/target/uva-cs/scripts/scaling_job.sh @@ -0,0 +1,27 @@ +#!/bin/bash --login + +#SBATCH --nodes=4 +#SBATCH --ntasks-per-node=20 +#SBATCH --time=0:60:00 +#SBATCH --job-name="Cylon Scaling" +# +#SBATCH --time=0:60:00 +#SBATCH --partition=main +#SBATCH --error="%j-stderr.txt" +#SBATCH --output="%j-stdout.txt" +# + + +DIR=$HOME/anaconda3/envs/cylon_dev + +module load gcc-11.2.0 openmpi-4.1.4 +conda activate cylon_dev + + +export PATH=$DIR/bin:$PATH LD_LIBRARY_PATH=$DIR/lib:$LD_LIBRARY_PATH PYTHONPATH=$DIR/lib/python3.9/site-packages + + +which python gcc g++ + +mpirun -np 20 python cylon_scaling.py -n 35000000 + diff --git a/target/uva-cs/scripts/scaling_job.slurm b/target/uva-cs/scripts/scaling_job.slurm new file mode 100644 index 000000000..22ec1fb80 --- /dev/null +++ b/target/uva-cs/scripts/scaling_job.slurm @@ -0,0 +1,30 @@ +#!/bin/bash +#SBATCH --nodes=4 +#SBATCH --ntasks-per-node=40 +#SBATCH --exclusive +#SBATCH --time=1:00:00 +#SBATCH --partition=bii +#SBATCH -A bii_dsc_community +#SBATCH --output=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.out +#SBATCH --error=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.err + + +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + +#module load gcc/11.2.0 +#module load openmpi/4.1.4 +#module load python/3.11.1 + +#source $HOME/CYLON/bin/activate +source $HOME/cylon_rp_venv/bin/activate + +BUILD_PATH=$PWD/build + +export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH + + +which python gcc g++ + + +#srun -n 160 python $PWD/rivanna/scripts/cylon_scaling.py -n 35000000 +mpirun -np 160 python rivanna/scripts/cylon_scaling.py -n 35000000 diff --git a/target/uva-cs/scripts/submit.log b/target/uva-cs/scripts/submit.log new file mode 100644 index 000000000..d53e601ab --- /dev/null +++ b/target/uva-cs/scripts/submit.log @@ -0,0 +1 @@ +1 submitted: nodes=20 threads=37 total=740 From 4ed907ccdd75549662bfc38db17a155865c8edcf Mon Sep 17 00:00:00 2001 From: arupcsedu Date: Fri, 24 Nov 2023 22:21:51 -0500 Subject: [PATCH 2/5] Update README.md --- target/uva-cs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target/uva-cs/README.md b/target/uva-cs/README.md index 46eba1d66..b4814b4d2 100644 --- a/target/uva-cs/README.md +++ b/target/uva-cs/README.md @@ -38,7 +38,7 @@ pip install cloudmesh-common pip install openssl-python ``` -We will slum script to run the scaling operation. +We will use a slum script to run the scaling operation. ```shell cd target/uva-cs/scripts/ From 87152562cb38912d8b7b2a3eb8ef3f61904448bc Mon Sep 17 00:00:00 2001 From: arupcsedu Date: Fri, 24 Nov 2023 22:22:28 -0500 Subject: [PATCH 3/5] Update README.md --- target/rivanna/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target/rivanna/README.md b/target/rivanna/README.md index 6e4eee9d1..a337f0ae8 100644 --- a/target/rivanna/README.md +++ b/target/rivanna/README.md @@ -49,7 +49,7 @@ pip install openssl-python python3 -m pip install urllib3==1.26.6 ``` -We will slum script to run the scaling operation. +We will use a slurm script to run the scaling operation. ```shell sbatch rivanna/scripts/scaling_job.slurm From f08d75bde417433a152b93ac31510cdc479af5cf Mon Sep 17 00:00:00 2001 From: arupcsedu Date: Fri, 24 Nov 2023 22:22:49 -0500 Subject: [PATCH 4/5] Update README.md --- target/uva-cs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target/uva-cs/README.md b/target/uva-cs/README.md index b4814b4d2..e6dfb3f97 100644 --- a/target/uva-cs/README.md +++ b/target/uva-cs/README.md @@ -38,7 +38,7 @@ pip install cloudmesh-common pip install openssl-python ``` -We will use a slum script to run the scaling operation. +We will use a slurm script to run the scaling operation. ```shell cd target/uva-cs/scripts/ From c47c65765d2bae001697d1963343d91a6cbde53d Mon Sep 17 00:00:00 2001 From: arupcsedu Date: Fri, 24 Nov 2023 22:23:14 -0500 Subject: [PATCH 5/5] Update README.md --- rivanna/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rivanna/README.md b/rivanna/README.md index 6e4eee9d1..a337f0ae8 100644 --- a/rivanna/README.md +++ b/rivanna/README.md @@ -49,7 +49,7 @@ pip install openssl-python python3 -m pip install urllib3==1.26.6 ``` -We will slum script to run the scaling operation. +We will use a slurm script to run the scaling operation. ```shell sbatch rivanna/scripts/scaling_job.slurm