Skip to content

Commit

Permalink
more cli stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
BerndSchuller committed Jul 3, 2024
1 parent 5406215 commit 70a29e4
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 76 deletions.
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ Issue tracker: https://github.com/HumanBrainProject/pyunicore
Version 1.1.0 (mmm dd, 2024)
----------------------------
- API CHANGE: new Storage.put_file() method accepting
str-like or file-like data to upload to a remote destination
str-like or file-like data to upload to a remote destination
- new feature: new pyfilesystem implementation "uftpmount" which mounts
the remote directory and then accesses it via the local FS (OSFS)
the remote directory and then accesses it via the local FS (OSFS)

Version 1.0.1 (Mar 22, 2024)
----------------------------
Expand Down
18 changes: 15 additions & 3 deletions pyunicore/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,23 @@ def add_base_args(self) -> argparse._ArgumentGroup:
def add_command_args(self):
pass

def run(self, args):
def setup(self, args):
self.args = self.parser.parse_args(args)
self.is_verbose = self.args.verbose
self.config_file = self.args.configuration
self.load_user_properties()
self.create_credential()
self.registry = self.create_registry()

def get_description(self):
return "N/A"

def get_synopsis(self):
return "N/A"

def get_group(self):
return "Other"

def create_credential(self):
auth_method = self.config.get("authentication-method", "USERNAME").upper()
if "USERNAME" == auth_method:
Expand Down Expand Up @@ -149,12 +155,18 @@ def get_synopsis(self):
return """Gets a JWT authentication token from a UNICORE token endpoint.
Lifetime and other properties can be configured."""

def get_description(self):
return "issue an authentication token"

def get_group(self):
return "Utilities"

def run(self, args):
super().run(args)
super().setup(args)
site_name = self.args.sitename
if site_name:
if self.registry:
endpoint = self.registry.site(site_name).resource_url
endpoint = self.registry.site_urls[site_name]
else:
raise ValueError(
"Sitename resolution requires registry - please check your configuration!"
Expand Down
209 changes: 177 additions & 32 deletions pyunicore/cli/exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
from pyunicore.client import Job


class Exec(Base):
class JobExecutionBase(Base):

def add_command_args(self):
self.parser.prog = "unicore exec"
self.parser.description = self.get_synopsis()
self.parser.add_argument("commands", help="Command(s) to run", nargs="*")
self.parser.add_argument("-s", "--sitename", required=False, type=str, help="Site name")
self.parser.add_argument(
"-S", "--server-url", required=False, type=str, help="Server URL to submit to"
Expand All @@ -30,22 +28,15 @@ def add_command_args(self):
action="store_true",
help="Dry run, do not submit the job",
)
self.parser.add_argument(
"-L",
"--login-node",
required=False,
type=str,
help="Login node to use",
)
self.parser.add_argument(
"-T",
"--tags",
required=False,
help="Tag the job with the given tag(s) (comma-separated)",
)
self.parser.add_argument(
"-Q", "--keep", required=False, action="store_true", help="Don't remove finished job"
)

def get_group(self):
return "Job execution"

def get_site_client(self):
if self.args.sitename:
Expand All @@ -64,19 +55,6 @@ def get_site_client(self):
site_client = Client(self.credential, site_url=self.args.server_url)
return site_client

def build_job(self):
job_definition = {"Job type": "ON_LOGIN_NODE"}
if self.args.login_node:
job_definition["Login node"] = self.args.login_node
if len(self.args.commands) > 0:
job_definition["Executable"] = self.args.commands[0]
if len(self.args.commands) > 1:
job_definition["Arguments"] = self.args.commands[1:]
if self.args.tags:
job_definition["Tags"] = self.args.tags.split(",")
self.verbose(json.dumps(job_definition, indent=2))
return job_definition

def run_job(self, job_definition, submission_endpoint):
job = submission_endpoint.new_job(job_definition)
self.verbose("Submitted job: %s" % job.resource_url)
Expand All @@ -85,7 +63,7 @@ def run_job(self, job_definition, submission_endpoint):
job.poll()
return job

def get_output(self, job: Job):
def fetch_output(self, job: Job):
try:
self.verbose(f"{job.status}, exit code {job.properties['exitCode']}")
except KeyError:
Expand All @@ -99,15 +77,49 @@ def get_output(self, job: Job):
print(str(f.read(), "UTF-8"))
print("*** End of error output.")


class Exec(JobExecutionBase):
def add_command_args(self):
super().add_command_args()
self.parser.prog = "unicore exec"
self.parser.description = self.get_synopsis()
self.parser.add_argument("commands", help="Command(s) to run", nargs="*")
self.parser.add_argument(
"-L",
"--login-node",
required=False,
type=str,
help="Login node to use",
)
self.parser.add_argument(
"-Q", "--keep", required=False, action="store_true", help="Don't remove finished job"
)

def get_synopsis(self):
return """Runs a command through UNICORE. The command will not be run through a
remote queue, but on the cluster login node. The command and
its arguments are taken from the command line. The client will wait
for the job to finish and print standard output and error to
the console."""

def get_description(self):
return "run a command through UNICORE"

def build_job(self) -> dict:
job_definition = {"Job type": "ON_LOGIN_NODE"}
if self.args.login_node:
job_definition["Login node"] = self.args.login_node
if len(self.args.commands) > 0:
job_definition["Executable"] = self.args.commands[0]
if len(self.args.commands) > 1:
job_definition["Arguments"] = self.args.commands[1:]
if self.args.tags:
job_definition["Tags"] = self.args.tags.split(",")
self.verbose(json.dumps(job_definition, indent=2))
return job_definition

def run(self, args):
super().run(args)
super().setup(args)
site_client = self.get_site_client()
self.verbose("Submission endpoint: %s" % site_client.resource_url)
jd = self.build_job()
Expand All @@ -116,6 +128,139 @@ def run(self, args):
return
job = self.run_job(jd, site_client)
if not self.args.asynchronous:
self.get_output(job)
if not self.args.keep:
job.delete()
self.fetch_output(job)
if not self.args.keep:
job.delete()


class Run(JobExecutionBase):

def add_command_args(self):
super().add_command_args()
self.parser.prog = "unicore run"
self.parser.description = self.get_synopsis()
self.parser.add_argument("jobs", help="Job file(s) to run", nargs="*")

def get_synopsis(self):
return """Runs job(s) through UNICORE. The job definition(s) are read from <jobs> or
stdin. A job can be executed in two modes. In the default synchronous mode, UCC
will wait for the job to finish. In asynchonous mode, initiated
by the 'a' option, the job will be submitted and started."""

def get_description(self):
return "runs job(s) through UNICORE"

def build_job(self, jobfile=None) -> dict:
with open(jobfile) as f:
job_definition = json.load(f)
if self.args.tags:
job_definition["Tags"] = self.args.tags.split(",")
self.verbose(json.dumps(job_definition, indent=2))
return job_definition

def run(self, args):
super().setup(args)
site_client = self.get_site_client()
self.verbose("Submission endpoint: %s" % site_client.resource_url)

if len(self.args.jobs) > 0:
for jobfile in self.args.jobs:
self.verbose("Reading job from <%s>" % jobfile)
jd = self.build_job(jobfile)
if self.args.dry_run:
self.verbose("Dry run, not submitting anything.")
continue
else:
job = self.run_job(jd, site_client)
if not self.args.asynchronous:
self.fetch_output(job)


class ListJobs(Base):
def add_command_args(self):
self.parser.prog = "unicore list-jobs"
self.parser.description = self.get_synopsis()
self.parser.add_argument("-s", "--sitename", required=False, type=str, help="Site name")
self.parser.add_argument(
"-a",
"--asynchronous",
required=False,
action="store_true",
help="Just submit, don't wait for finish",
)
self.parser.add_argument(
"-l",
"--long",
required=False,
action="store_true",
help="Detailed output",
)
self.parser.add_argument(
"-T",
"--tags",
required=False,
help="Tag the job with the given tag(s) (comma-separated)",
)

def get_synopsis(self):
return """Lists your jobs per site. The list can be limited to a single
site specified using the '-s' option."""

def get_description(self):
return "list your jobs"

def get_group(self):
return "Job execution"

__f = " {:>24s} | {:>10s} | {:s} "

def details(self, job: Job):
print(self.__f.format(job.properties["submissionTime"], job.status, job.resource_url))

def print_header(self):
print(self.__f.format("Submitted", "Status", "URL"))
print(" -------------------------|------------|----------------")

def run(self, args):
super().setup(args)
tags = self.args.tags.split(",") if self.args.tags is not None else []
if not self.registry:
raise ValueError("Registry required - please check your configuration!")
if self.args.long:
self.print_header()
for endpoint in self.registry.site_urls.values():
site_client = Client(self.credential, site_url=endpoint)
for job in site_client.get_jobs(tags=tags):
if self.args.long:
self.details(job)
else:
print(job.resource_url)


class CancelJob(Base):
def add_command_args(self):
self.parser.prog = "unicore cancel-job"
self.parser.description = self.get_synopsis()
self.parser.add_argument("job_url", help="Job URL(s)", nargs="*")

def get_synopsis(self):
return """Cancels UNICORE job(s). The job(s) are referenced either by URLs."""

def get_description(self):
return "cancel job(s)"

def get_group(self):
return "Job execution"

def run(self, args):
super().setup(args)
for endpoint in self.args.job_url:
self.verbose("Cancelling: %s" % endpoint)
Job(self.credential, job_url=endpoint).abort()


class JobWrapper:

def __init__(self, job: dict):
self.job = job
self.local_imports = []
65 changes: 65 additions & 0 deletions pyunicore/cli/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
""" Storage-related commands """

import re

from pyunicore.cli.base import Base
from pyunicore.client import PathFile
from pyunicore.client import Storage


class LS(Base):
def add_command_args(self):
self.parser.prog = "unicore ls"
self.parser.description = self.get_synopsis()
self.parser.add_argument("remote_dirs", help="Remote directories to list", nargs="*")
self.parser.add_argument(
"-l",
"--long",
required=False,
action="store_true",
help="detailed listing",
)

def get_synopsis(self):
return """List directories on UNICORE storage(s)."""

def get_description(self):
return "list directories"

def get_group(self):
return "Data management"

def split_storage_url(self, url: str):
base = re.match(r"(https://\S+/rest/core/storages/).*", url).group(1)
storage_id = re.match(r"https://\S+/rest/core/storages/(\S+).*", url).group(1)
tok = storage_id.split("/files")
storage_id = tok[0]
path = tok[1] if len(tok) > 1 else "/"
return base + storage_id, path

def _detailed(self, name, p):
d = "d" if p["isDirectory"] is True else "-"
print(f"{d}{p['permissions']} {p['size']} {p['lastAccessed']} {name}")

def print_single(self, p: PathFile):
if self.args.long is True:
self._detailed(p.name, p.properties)
else:
print(p.name)

def run(self, args):
super().setup(args)
for endpoint in self.args.remote_dirs:
storage_url, file_path = self.split_storage_url(endpoint)
self.verbose(f"Listing: {file_path} on {storage_url}")
storage = Storage(self.credential, storage_url=storage_url)
p = storage.stat(file_path)
if p.isdir():
ls = storage.contents(path=p.name)["content"]
for p in ls:
if self.args.long is True:
self._detailed(p, ls[p])
else:
print(p)
else:
self.print_single(p)
Loading

0 comments on commit 70a29e4

Please sign in to comment.