Skip to content

Commit

Permalink
Respect .gitignore in code rsync
Browse files Browse the repository at this point in the history
  • Loading branch information
frthjf committed Nov 25, 2024
1 parent 4f3bfe6 commit 7113849
Showing 1 changed file with 87 additions and 78 deletions.
165 changes: 87 additions & 78 deletions docs/examples/slurm-execution/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,86 +8,11 @@
import arrow
import yaml
from machinable import Execution, Project
from machinable.config import to_dict
from machinable.errors import ExecutionFailed
from machinable.utils import chmodx, run_and_stream
from pydantic import BaseModel, ConfigDict


def yes_or_no() -> bool:
choice = input().lower()
return {"": True, "yes": True, "y": True, "no": False, "n": False}[choice]


def confirm(execution: Execution) -> bool:
sys.stdout.write(
"\n".join(execution.pending_executables.map(lambda x: x.module))
)
sys.stdout.write(
f"\nSubmitting {len(execution.pending_executables)} jobs ({len(execution.executables)} total). Proceed? [Y/n]: "
)
if yes_or_no():
sys.stdout.write("yes\n")
return True
else:
sys.stdout.write("no\n")
return False


class Job:
def __init__(self, job_id: str):
self.job_id = job_id
self.details = self._fetch_details()

def _fetch_details(self) -> dict:
cmd = ["scontrol", "show", "job", str(self.job_id)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None

details = {}
raw_info = result.stdout.split()
for item in raw_info:
if "=" in item:
key, value = item.split("=", 1)
details[key] = value
return details

@classmethod
def find_by_name(cls, job_name: str) -> Optional["Job"]:
cmd = ["squeue", "--name", job_name, "--noheader", "--format=%i"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
return cls(result.stdout.strip())

return None

@property
def status(
self,
) -> Literal[
"",
"PENDING",
"RUNNING",
"SUSPENDED",
"CANCELLED",
"COMPLETED",
"FAILED",
"TIMEOUT",
"PREEMPTED",
]:
return self.details.get("JobState", "")

@property
def info(self) -> dict:
return self.details

def cancel(self) -> bool:
cmd = ["scancel", str(self.job_id)]
result = subprocess.run(cmd, capture_output=True)
return result.returncode == 0


class Slurm(Execution):
class Config(BaseModel):
model_config = ConfigDict(extra="forbid")
Expand Down Expand Up @@ -153,12 +78,22 @@ def __call__(self):
raise ExecutionFailed(err)

source_code = Project.get().path()
if self.config.copy_project_source and not self.config.dry:
if self.config.copy_project_source:
print("Copy project source code ...")
source_code = self.local_directory(executable.id, "source_code")
cmd = ["rsync", "-rLptgoD", Project.get().path(""), source_code]
cmd = [
"rsync",
"-rLptgoD",
"--exclude '.git'",
"--filter='dir-merge,- .gitignore'",
Project.get().path(""),
source_code,
]
print(" ".join(cmd))
run_and_stream(cmd, check=True)
if not self.config.dry:
run_and_stream(cmd, check=True)
else:
print("Dry run, skipping rsync ...")

script = "#!/usr/bin/env bash\n"

Expand Down Expand Up @@ -360,3 +295,77 @@ def canonicalize_resources(self, resources):
canonicalized[prefix + "--" + k] = str(v)

return canonicalized


def yes_or_no() -> bool:
choice = input().lower()
return {"": True, "yes": True, "y": True, "no": False, "n": False}[choice]


def confirm(execution: Execution) -> bool:
sys.stdout.write(
"\n".join(execution.pending_executables.map(lambda x: x.module))
)
sys.stdout.write(
f"\nSubmitting {len(execution.pending_executables)} jobs ({len(execution.executables)} total). Proceed? [Y/n]: "
)
if yes_or_no():
sys.stdout.write("yes\n")
return True
else:
sys.stdout.write("no\n")
return False


class Job:
def __init__(self, job_id: str):
self.job_id = job_id
self.details = self._fetch_details()

def _fetch_details(self) -> dict:
cmd = ["scontrol", "show", "job", str(self.job_id)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None

details = {}
raw_info = result.stdout.split()
for item in raw_info:
if "=" in item:
key, value = item.split("=", 1)
details[key] = value
return details

@classmethod
def find_by_name(cls, job_name: str) -> Optional["Job"]:
cmd = ["squeue", "--name", job_name, "--noheader", "--format=%i"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
return cls(result.stdout.strip())

return None

@property
def status(
self,
) -> Literal[
"",
"PENDING",
"RUNNING",
"SUSPENDED",
"CANCELLED",
"COMPLETED",
"FAILED",
"TIMEOUT",
"PREEMPTED",
]:
return self.details.get("JobState", "")

@property
def info(self) -> dict:
return self.details

def cancel(self) -> bool:
cmd = ["scancel", str(self.job_id)]
result = subprocess.run(cmd, capture_output=True)
return result.returncode == 0

0 comments on commit 7113849

Please sign in to comment.