diff --git a/CHANGES.md b/CHANGES.md index 74e63ff..37d6a86 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,9 +6,9 @@ Issue tracker: https://github.com/HumanBrainProject/pyunicore Version 1.1.0 (mmm dd, 2024) ---------------------------- - API CHANGE: new Storage.put_file() method accepting - str-like or file-like data to upload to a remote destination + str-like or file-like data to upload to a remote destination - new feature: new pyfilesystem implementation "uftpmount" which mounts - the remote directory and then accesses it via the local FS (OSFS) + the remote directory and then accesses it via the local FS (OSFS) Version 1.0.1 (Mar 22, 2024) ---------------------------- diff --git a/pyunicore/cli/base.py b/pyunicore/cli/base.py index f49b385..0e26a29 100644 --- a/pyunicore/cli/base.py +++ b/pyunicore/cli/base.py @@ -64,7 +64,7 @@ def add_base_args(self) -> argparse._ArgumentGroup: def add_command_args(self): pass - def run(self, args): + def setup(self, args): self.args = self.parser.parse_args(args) self.is_verbose = self.args.verbose self.config_file = self.args.configuration @@ -72,9 +72,15 @@ def run(self, args): self.create_credential() self.registry = self.create_registry() + def get_description(self): + return "N/A" + def get_synopsis(self): return "N/A" + def get_group(self): + return "Other" + def create_credential(self): auth_method = self.config.get("authentication-method", "USERNAME").upper() if "USERNAME" == auth_method: @@ -149,12 +155,18 @@ def get_synopsis(self): return """Gets a JWT authentication token from a UNICORE token endpoint. Lifetime and other properties can be configured.""" + def get_description(self): + return "issue an authentication token" + + def get_group(self): + return "Utilities" + def run(self, args): - super().run(args) + super().setup(args) site_name = self.args.sitename if site_name: if self.registry: - endpoint = self.registry.site(site_name).resource_url + endpoint = self.registry.site_urls[site_name] else: raise ValueError( "Sitename resolution requires registry - please check your configuration!" diff --git a/pyunicore/cli/exec.py b/pyunicore/cli/exec.py index 4ee143a..d470bb8 100644 --- a/pyunicore/cli/exec.py +++ b/pyunicore/cli/exec.py @@ -7,11 +7,9 @@ from pyunicore.client import Job -class Exec(Base): +class JobExecutionBase(Base): + def add_command_args(self): - self.parser.prog = "unicore exec" - self.parser.description = self.get_synopsis() - self.parser.add_argument("commands", help="Command(s) to run", nargs="*") self.parser.add_argument("-s", "--sitename", required=False, type=str, help="Site name") self.parser.add_argument( "-S", "--server-url", required=False, type=str, help="Server URL to submit to" @@ -30,22 +28,15 @@ def add_command_args(self): action="store_true", help="Dry run, do not submit the job", ) - self.parser.add_argument( - "-L", - "--login-node", - required=False, - type=str, - help="Login node to use", - ) self.parser.add_argument( "-T", "--tags", required=False, help="Tag the job with the given tag(s) (comma-separated)", ) - self.parser.add_argument( - "-Q", "--keep", required=False, action="store_true", help="Don't remove finished job" - ) + + def get_group(self): + return "Job execution" def get_site_client(self): if self.args.sitename: @@ -64,19 +55,6 @@ def get_site_client(self): site_client = Client(self.credential, site_url=self.args.server_url) return site_client - def build_job(self): - job_definition = {"Job type": "ON_LOGIN_NODE"} - if self.args.login_node: - job_definition["Login node"] = self.args.login_node - if len(self.args.commands) > 0: - job_definition["Executable"] = self.args.commands[0] - if len(self.args.commands) > 1: - job_definition["Arguments"] = self.args.commands[1:] - if self.args.tags: - job_definition["Tags"] = self.args.tags.split(",") - self.verbose(json.dumps(job_definition, indent=2)) - return job_definition - def run_job(self, job_definition, submission_endpoint): job = submission_endpoint.new_job(job_definition) self.verbose("Submitted job: %s" % job.resource_url) @@ -85,7 +63,7 @@ def run_job(self, job_definition, submission_endpoint): job.poll() return job - def get_output(self, job: Job): + def fetch_output(self, job: Job): try: self.verbose(f"{job.status}, exit code {job.properties['exitCode']}") except KeyError: @@ -99,6 +77,24 @@ def get_output(self, job: Job): print(str(f.read(), "UTF-8")) print("*** End of error output.") + +class Exec(JobExecutionBase): + def add_command_args(self): + super().add_command_args() + self.parser.prog = "unicore exec" + self.parser.description = self.get_synopsis() + self.parser.add_argument("commands", help="Command(s) to run", nargs="*") + self.parser.add_argument( + "-L", + "--login-node", + required=False, + type=str, + help="Login node to use", + ) + self.parser.add_argument( + "-Q", "--keep", required=False, action="store_true", help="Don't remove finished job" + ) + def get_synopsis(self): return """Runs a command through UNICORE. The command will not be run through a remote queue, but on the cluster login node. The command and @@ -106,8 +102,24 @@ def get_synopsis(self): for the job to finish and print standard output and error to the console.""" + def get_description(self): + return "run a command through UNICORE" + + def build_job(self) -> dict: + job_definition = {"Job type": "ON_LOGIN_NODE"} + if self.args.login_node: + job_definition["Login node"] = self.args.login_node + if len(self.args.commands) > 0: + job_definition["Executable"] = self.args.commands[0] + if len(self.args.commands) > 1: + job_definition["Arguments"] = self.args.commands[1:] + if self.args.tags: + job_definition["Tags"] = self.args.tags.split(",") + self.verbose(json.dumps(job_definition, indent=2)) + return job_definition + def run(self, args): - super().run(args) + super().setup(args) site_client = self.get_site_client() self.verbose("Submission endpoint: %s" % site_client.resource_url) jd = self.build_job() @@ -116,6 +128,139 @@ def run(self, args): return job = self.run_job(jd, site_client) if not self.args.asynchronous: - self.get_output(job) - if not self.args.keep: - job.delete() + self.fetch_output(job) + if not self.args.keep: + job.delete() + + +class Run(JobExecutionBase): + + def add_command_args(self): + super().add_command_args() + self.parser.prog = "unicore run" + self.parser.description = self.get_synopsis() + self.parser.add_argument("jobs", help="Job file(s) to run", nargs="*") + + def get_synopsis(self): + return """Runs job(s) through UNICORE. The job definition(s) are read from or + stdin. A job can be executed in two modes. In the default synchronous mode, UCC + will wait for the job to finish. In asynchonous mode, initiated + by the 'a' option, the job will be submitted and started.""" + + def get_description(self): + return "runs job(s) through UNICORE" + + def build_job(self, jobfile=None) -> dict: + with open(jobfile) as f: + job_definition = json.load(f) + if self.args.tags: + job_definition["Tags"] = self.args.tags.split(",") + self.verbose(json.dumps(job_definition, indent=2)) + return job_definition + + def run(self, args): + super().setup(args) + site_client = self.get_site_client() + self.verbose("Submission endpoint: %s" % site_client.resource_url) + + if len(self.args.jobs) > 0: + for jobfile in self.args.jobs: + self.verbose("Reading job from <%s>" % jobfile) + jd = self.build_job(jobfile) + if self.args.dry_run: + self.verbose("Dry run, not submitting anything.") + continue + else: + job = self.run_job(jd, site_client) + if not self.args.asynchronous: + self.fetch_output(job) + + +class ListJobs(Base): + def add_command_args(self): + self.parser.prog = "unicore list-jobs" + self.parser.description = self.get_synopsis() + self.parser.add_argument("-s", "--sitename", required=False, type=str, help="Site name") + self.parser.add_argument( + "-a", + "--asynchronous", + required=False, + action="store_true", + help="Just submit, don't wait for finish", + ) + self.parser.add_argument( + "-l", + "--long", + required=False, + action="store_true", + help="Detailed output", + ) + self.parser.add_argument( + "-T", + "--tags", + required=False, + help="Tag the job with the given tag(s) (comma-separated)", + ) + + def get_synopsis(self): + return """Lists your jobs per site. The list can be limited to a single + site specified using the '-s' option.""" + + def get_description(self): + return "list your jobs" + + def get_group(self): + return "Job execution" + + __f = " {:>24s} | {:>10s} | {:s} " + + def details(self, job: Job): + print(self.__f.format(job.properties["submissionTime"], job.status, job.resource_url)) + + def print_header(self): + print(self.__f.format("Submitted", "Status", "URL")) + print(" -------------------------|------------|----------------") + + def run(self, args): + super().setup(args) + tags = self.args.tags.split(",") if self.args.tags is not None else [] + if not self.registry: + raise ValueError("Registry required - please check your configuration!") + if self.args.long: + self.print_header() + for endpoint in self.registry.site_urls.values(): + site_client = Client(self.credential, site_url=endpoint) + for job in site_client.get_jobs(tags=tags): + if self.args.long: + self.details(job) + else: + print(job.resource_url) + + +class CancelJob(Base): + def add_command_args(self): + self.parser.prog = "unicore cancel-job" + self.parser.description = self.get_synopsis() + self.parser.add_argument("job_url", help="Job URL(s)", nargs="*") + + def get_synopsis(self): + return """Cancels UNICORE job(s). The job(s) are referenced either by URLs.""" + + def get_description(self): + return "cancel job(s)" + + def get_group(self): + return "Job execution" + + def run(self, args): + super().setup(args) + for endpoint in self.args.job_url: + self.verbose("Cancelling: %s" % endpoint) + Job(self.credential, job_url=endpoint).abort() + + +class JobWrapper: + + def __init__(self, job: dict): + self.job = job + self.local_imports = [] diff --git a/pyunicore/cli/io.py b/pyunicore/cli/io.py new file mode 100644 index 0000000..633ccd8 --- /dev/null +++ b/pyunicore/cli/io.py @@ -0,0 +1,65 @@ +""" Storage-related commands """ + +import re + +from pyunicore.cli.base import Base +from pyunicore.client import PathFile +from pyunicore.client import Storage + + +class LS(Base): + def add_command_args(self): + self.parser.prog = "unicore ls" + self.parser.description = self.get_synopsis() + self.parser.add_argument("remote_dirs", help="Remote directories to list", nargs="*") + self.parser.add_argument( + "-l", + "--long", + required=False, + action="store_true", + help="detailed listing", + ) + + def get_synopsis(self): + return """List directories on UNICORE storage(s).""" + + def get_description(self): + return "list directories" + + def get_group(self): + return "Data management" + + def split_storage_url(self, url: str): + base = re.match(r"(https://\S+/rest/core/storages/).*", url).group(1) + storage_id = re.match(r"https://\S+/rest/core/storages/(\S+).*", url).group(1) + tok = storage_id.split("/files") + storage_id = tok[0] + path = tok[1] if len(tok) > 1 else "/" + return base + storage_id, path + + def _detailed(self, name, p): + d = "d" if p["isDirectory"] is True else "-" + print(f"{d}{p['permissions']} {p['size']} {p['lastAccessed']} {name}") + + def print_single(self, p: PathFile): + if self.args.long is True: + self._detailed(p.name, p.properties) + else: + print(p.name) + + def run(self, args): + super().setup(args) + for endpoint in self.args.remote_dirs: + storage_url, file_path = self.split_storage_url(endpoint) + self.verbose(f"Listing: {file_path} on {storage_url}") + storage = Storage(self.credential, storage_url=storage_url) + p = storage.stat(file_path) + if p.isdir(): + ls = storage.contents(path=p.name)["content"] + for p in ls: + if self.args.long is True: + self._detailed(p, ls[p]) + else: + print(p) + else: + self.print_single(p) diff --git a/pyunicore/cli/main.py b/pyunicore/cli/main.py index 71e06cd..c5b410a 100644 --- a/pyunicore/cli/main.py +++ b/pyunicore/cli/main.py @@ -5,10 +5,15 @@ import pyunicore.cli.base import pyunicore.cli.exec +import pyunicore.cli.io _commands = { - "issue-token": pyunicore.cli.base.IssueToken, + "cancel-job": pyunicore.cli.exec.CancelJob, "exec": pyunicore.cli.exec.Exec, + "issue-token": pyunicore.cli.base.IssueToken, + "list-jobs": pyunicore.cli.exec.ListJobs, + "ls": pyunicore.cli.io.LS, + "run": pyunicore.cli.exec.Run, } @@ -33,7 +38,7 @@ def help(): ) print(s) for cmd in sorted(_commands): - print(f" {cmd:20} - {get_command(cmd).get_synopsis()}") + print(f" {cmd:20} - {get_command(cmd).get_description()}") print("Enter 'unicore -h' for help on a particular command.") diff --git a/pyunicore/client.py b/pyunicore/client.py index ebafdec..a790424 100755 --- a/pyunicore/client.py +++ b/pyunicore/client.py @@ -23,7 +23,9 @@ import requests -import pyunicore.credentials +from pyunicore.credentials import Anonymous +from pyunicore.credentials import AuthenticationFailedException +from pyunicore.credentials import Credential _DEFAULT_CACHE_TIME = 5 # in seconds @@ -76,7 +78,7 @@ class Transport: def __init__( self, - credential: pyunicore.credentials.Credential, + credential: Credential, verify=False, use_security_sessions=True, timeout=120, @@ -209,7 +211,9 @@ class Resource: properties and some common methods. """ - def __init__(self, security, resource_url, cache_time=_DEFAULT_CACHE_TIME): + def __init__( + self, security: Credential | Transport, resource_url: str, cache_time=_DEFAULT_CACHE_TIME + ): """ Create a new Resource. Args: @@ -219,7 +223,7 @@ def __init__(self, security, resource_url, cache_time=_DEFAULT_CACHE_TIME): when getting properties """ super().__init__() - if isinstance(security, pyunicore.credentials.Credential): + if isinstance(security, Credential): self.transport = Transport(security) elif isinstance(security, Transport): self.transport = security._clone() @@ -272,7 +276,7 @@ class Registry(Resource): Will collect the BASE URLs of all registered sites """ - def __init__(self, security, url, cache_time=_DEFAULT_CACHE_TIME): + def __init__(self, security: Credential | Transport, url: str, cache_time=_DEFAULT_CACHE_TIME): super().__init__(security, url, cache_time) self.refresh() @@ -321,13 +325,13 @@ class Client(Resource): def __init__( self, - security, - site_url, + security: Credential | Transport, + site_url: str, check_authentication=True, cache_time=_DEFAULT_CACHE_TIME, ): super().__init__(security, site_url, cache_time) - if isinstance(self.transport.credential, pyunicore.credentials.Anonymous): + if isinstance(self.transport.credential, Anonymous): check_authentication = False self.check_authentication = check_authentication if self.check_authentication: @@ -336,9 +340,7 @@ def __init__( def assert_authentication(self): '''Asserts that the remote role is not "anonymous"''' if self.access_info()["role"]["selected"] == "anonymous": - raise pyunicore.credentials.AuthenticationFailedException( - "Failure to authenticate at %s" % self.resource_url - ) + raise AuthenticationFailedException("Failure to authenticate at %s" % self.resource_url) def access_info(self): """get authentication and authentication information about the current user""" @@ -394,8 +396,13 @@ def get_jobs(self, offset=0, num=None, tags=[]): urls = self.transport.get(url=self.links["jobs"], params=q_params)["jobs"] return [Job(self.transport, url) for url in urls] - def new_job(self, job_description, inputs=[], autostart=True): - """submit and start a job on the site, optionally uploading input data files""" + def new_job(self, job_description: dict, inputs=None, autostart: bool = True): + """Submit and start a job on the site, optionally uploading local input data files + The input files can be either a simple array of local file names, or a dictionary + with the destination names as keys and the local file names as values. + """ + if inputs is None: + inputs = [] if len(inputs) > 0 or job_description.get("haveClientStageIn") is True: job_description["haveClientStageIn"] = "true" with closing(self.transport.post(url=self.links["jobs"], json=job_description)) as resp: @@ -408,12 +415,15 @@ def new_job(self, job_description, inputs=[], autostart=True): if len(inputs) > 0: working_dir = job.working_dir for input_item in inputs: - working_dir.upload(input_item) - if autostart and job_description.get("haveClientStageIn", None) == "true": + if isinstance(inputs, dict): + working_dir.upload(inputs[input_item], destination=input_item) + else: + working_dir.upload(input_item) + if autostart: job.start() return job - def execute(self, cmd, login_node=None): + def execute(self, cmd: str, login_node=None): """run a (non-batch) command on the site, executed on a login node Args: cmd - the command to run @@ -456,8 +466,8 @@ class Application(Resource): def __init__( self, - security, - app_url, + security: Credential | Transport, + app_url: str, submit_url=None, cache_time=_DEFAULT_CACHE_TIME, ): @@ -507,11 +517,18 @@ def ordinal(self): return i i += 1 + def __repr__(self): + return self._name_ + + __str__ = __repr__ + class Job(Resource): """wrapper around UNICORE job""" - def __init__(self, security, job_url, cache_time=_DEFAULT_CACHE_TIME): + def __init__( + self, security: Credential | Transport, job_url: str, cache_time=_DEFAULT_CACHE_TIME + ): super().__init__(security, job_url, cache_time) @property @@ -572,10 +589,10 @@ def poll(self, state=JobStatus.SUCCESSFUL, timeout=0): raise TimeoutError("Timeout waiting for job to become %s" % state.value) def __repr__(self): - return "Job: {} submitted: {} running: {}".format( + return "Job: {} submitted: {} status: {}".format( self.resource_url, self.properties["submissionTime"], - self.is_running(), + self.status, ) __str__ = __repr__ @@ -588,7 +605,9 @@ class Allocation(Job): correct job ID, so the task is started in the allocation. """ - def __init__(self, security, job_url, cache_time=_DEFAULT_CACHE_TIME): + def __init__( + self, security: Credential | Transport, job_url: str, cache_time=_DEFAULT_CACHE_TIME + ): super().__init__(security, job_url, cache_time) def new_job(self, job_description, inputs=[], autostart=True): @@ -621,10 +640,10 @@ def wait_until_available(self, timeout=0): break def __repr__(self): - return "Allocation: {} submitted: {} running: {}".format( + return "Allocation: {} submitted: {} status: {}".format( self.resource_url, self.properties["submissionTime"], - self.is_running(), + self.status, ) __str__ = __repr__ @@ -633,7 +652,9 @@ def __repr__(self): class Compute(Resource): """wrapper around a UNICORE compute resource (a specific cluster with queues)""" - def __init__(self, security, resource_url, cache_time=_DEFAULT_CACHE_TIME): + def __init__( + self, security: Credential | Transport, resource_url: str, cache_time=_DEFAULT_CACHE_TIME + ): super().__init__(security, resource_url, cache_time) def __repr__(self): @@ -655,7 +676,9 @@ def get_applications(self): class Storage(Resource): """wrapper around a UNICORE Storage resource""" - def __init__(self, security, storage_url, cache_time=_DEFAULT_CACHE_TIME): + def __init__( + self, security: Credential | Transport, storage_url: str, cache_time=_DEFAULT_CACHE_TIME + ): super().__init__(security, storage_url, cache_time) def _to_file_url(self, path): @@ -682,7 +705,7 @@ def stat(self, path): ret = PathFile(self, path_url, path) return ret - def listdir(self, base="/"): + def listdir(self, base="/") -> dict: """get a list of files and directories in the given base directory""" ret = {} for path, meta in self.contents(base)["content"].items(): @@ -848,7 +871,7 @@ def __repr__(self): class Path(Resource): """common base for files and directories""" - def __init__(self, storage, path_url, name, cache_time=_DEFAULT_CACHE_TIME): + def __init__(self, storage: Storage, path_url: str, name: str, cache_time=_DEFAULT_CACHE_TIME): super().__init__(storage.transport, path_url, cache_time) self.name = name self.storage = storage @@ -878,7 +901,7 @@ def __repr__(self): class PathDir(Path): - def __init__(self, storage, path_url, name, cache_time=_DEFAULT_CACHE_TIME): + def __init__(self, storage: Storage, path_url: str, name: str, cache_time=_DEFAULT_CACHE_TIME): super().__init__(storage, path_url, name, cache_time) def isdir(self): @@ -891,7 +914,7 @@ def __repr__(self): class PathFile(Path): - def __init__(self, storage, path_url, name, cache_time=_DEFAULT_CACHE_TIME): + def __init__(self, storage: Storage, path_url: str, name: str, cache_time=_DEFAULT_CACHE_TIME): super().__init__(storage, path_url, name, cache_time) def download(self, file): @@ -974,7 +997,7 @@ def ordinal(self): class Transfer(Resource): """wrapper around a UNICORE server-to-server transfer""" - def __init__(self, security, tr_url, cache_time=_DEFAULT_CACHE_TIME): + def __init__(self, security: Credential, tr_url: Transport, cache_time=_DEFAULT_CACHE_TIME): super().__init__(security, tr_url, cache_time) @property @@ -1033,8 +1056,8 @@ class WorkflowService(Resource): def __init__( self, - security, - workflows_url, + security: Credential | Transport, + workflows_url: str, check_authentication=True, cache_time=_DEFAULT_CACHE_TIME, ): @@ -1050,9 +1073,7 @@ def access_info(self): def assert_authentication(self): '''Asserts that the remote role is not "anonymous"''' if self.access_info()["role"]["selected"] == "anonymous": - raise pyunicore.credentials.AuthenticationFailedException( - "Failure to authenticate at %s" % self.resource_url - ) + raise AuthenticationFailedException("Failure to authenticate at %s" % self.resource_url) def get_workflows(self, offset=0, num=None, tags=[]): """get the list of workflows. @@ -1092,7 +1113,9 @@ def ordinal(self): class Workflow(Resource): """wrapper around a UNICORE workflow""" - def __init__(self, security, wf_url, cache_time=_DEFAULT_CACHE_TIME): + def __init__( + self, security: Credential | Transport, wf_url: str, cache_time=_DEFAULT_CACHE_TIME + ): super().__init__(security, wf_url, cache_time) @property diff --git a/tests/integration/cli/jobs/date.u b/tests/integration/cli/jobs/date.u new file mode 100644 index 0000000..b4525d3 --- /dev/null +++ b/tests/integration/cli/jobs/date.u @@ -0,0 +1,3 @@ +{ + "ApplicationName": "Date" +} diff --git a/tests/integration/cli/test_base.py b/tests/integration/cli/test_base.py index 503f8cf..fbd9348 100644 --- a/tests/integration/cli/test_base.py +++ b/tests/integration/cli/test_base.py @@ -4,6 +4,7 @@ class TestBase(unittest.TestCase): + def test_base_setup(self): cmd = base.Base() cmd.config_file = "tests/integration/cli/preferences" diff --git a/tests/integration/cli/test_exec.py b/tests/integration/cli/test_exec.py new file mode 100644 index 0000000..90fbaf8 --- /dev/null +++ b/tests/integration/cli/test_exec.py @@ -0,0 +1,31 @@ +import unittest + +import pyunicore.cli.exec as exec + + +class TestExec(unittest.TestCase): + + def test_exec(self): + cmd = exec.Exec() + config_file = "tests/integration/cli/preferences" + ep = "https://localhost:8080/DEMO-SITE/rest/core" + args = ["-c", config_file, "-v", "--keep", "--server-url", ep, "date"] + cmd.run(args) + + def test_run_1(self): + cmd = exec.Run() + config_file = "tests/integration/cli/preferences" + ep = "https://localhost:8080/DEMO-SITE/rest/core" + jobfile = "tests/integration/cli/jobs/date.u" + args = ["-c", config_file, "-v", "--server-url", ep, jobfile] + cmd.run(args) + + def test_list_jobs(self): + cmd = exec.ListJobs() + config_file = "tests/integration/cli/preferences" + args = ["-c", config_file, "-v", "-l"] + cmd.run(args) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/integration/test_basic.py b/tests/integration/test_basic.py index d9d3656..fff53f3 100644 --- a/tests/integration/test_basic.py +++ b/tests/integration/test_basic.py @@ -50,6 +50,20 @@ def test_run_uploaded_script(self): self.assertTrue(len(stdout) > 0) print(stdout) + def test_run_uploaded_script_2(self): + print("*** test_run_uploaded_script_2") + client = self.get_client() + job_desc = {"Executable": "bash", "Arguments": ["myscript.sh"]} + in_file = os.getcwd() + "/tests/integration/files/script.sh" + job = client.new_job(job_desc, {"myscript.sh": in_file}) + job.poll() + exit_code = int(job.properties["exitCode"]) + self.assertEqual(0, exit_code) + work_dir = job.working_dir + stdout = work_dir.stat("/stdout").raw().read() + self.assertTrue(len(stdout) > 0) + print(stdout) + def test_alloc_and_run_date(self): print("*** test_alloc_and_run_date") client = self.get_client()