From be34af23de8479978ae338f8e9c06aa46f5c3b81 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 4 May 2022 19:37:40 +0530 Subject: [PATCH] teuthology/queue: Single command for queue operations Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk. Signed-off-by: Aishwarya Mathuria --- scripts/dispatcher.py | 1 - scripts/paddles_queue.py | 45 ------------ scripts/queue.py | 15 +++- teuthology/dispatcher/__init__.py | 7 +- teuthology/dispatcher/supervisor.py | 7 +- teuthology/kill.py | 2 - teuthology/queue/__init__.py | 106 ---------------------------- teuthology/queue/beanstalk.py | 16 ++--- teuthology/queue/paddles.py | 49 ++++++------- teuthology/queue/util.py | 101 ++++++++++++++++++++++++++ teuthology/report.py | 20 +++--- teuthology/schedule.py | 4 +- teuthology/test/test_dispatcher.py | 3 +- 13 files changed, 165 insertions(+), 211 deletions(-) delete mode 100644 scripts/paddles_queue.py create mode 100644 teuthology/queue/util.py diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index fffdb634b8..63779f1246 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -26,7 +26,6 @@ --queue-backend BACKEND choose between paddles and beanstalk """ -import docopt import sys import teuthology.dispatcher.supervisor diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py deleted file mode 100644 index 8487fd938e..0000000000 --- a/scripts/paddles_queue.py +++ /dev/null @@ -1,45 +0,0 @@ -import docopt - -import teuthology.config -import teuthology.queue.paddles_queue -doc = """ -usage: teuthology-paddles-queue -h - teuthology-paddles-queue -s -m MACHINE_TYPE - teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -u -m MACHINE_TYPE -U USER - -List Jobs in queue. -If -D is passed, then jobs with PATTERN in the job name are deleted from the -queue. - -Arguments: - -m, --machine_type MACHINE_TYPE - Which machine type queue to work on. - -optional arguments: - -h, --help Show this help message and exit - -D, --delete PATTERN Delete Jobs with PATTERN in their name - -d, --description Show job descriptions - -r, --runs Only show run names - -f, --full Print the entire job config. Use with caution. - -s, --status Prints the status of the queue - -t, --time SECONDS Pause queues for a number of seconds. - If -m is passed, pause that queue, - otherwise pause all queues. - -p, --pause Pause queue - -u, --unpause Unpause queue - -P, --priority PRIORITY - Change priority of queued jobs - -U, --user USER User who owns the jobs - -R, --run-name RUN_NAME - Used to change priority of all jobs in the run. -""" - - -def main(): - args = docopt.docopt(doc) - teuthology.paddles_queue.main(args) diff --git a/scripts/queue.py b/scripts/queue.py index 2c466a7be9..1d9112c22e 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,15 +1,16 @@ import docopt -import teuthology.config import teuthology.queue.beanstalk import teuthology.queue.paddles +from teuthology.config import config doc = """ usage: teuthology-queue -h teuthology-queue [-s|-d|-f] -m MACHINE_TYPE teuthology-queue [-r] -m MACHINE_TYPE teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER] + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -29,9 +30,17 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs (only in Paddles queues) + -U, --user USER User who owns the jobs + -R, --run-name RUN_NAME + Used to change priority of all jobs in the run. """ def main(): args = docopt.docopt(doc) - teuthology.queue.main(args) + if config.backend == 'beanstalk': + teuthology.queue.beanstalk.main(args) + else: + teuthology.queue.paddles.main(args) diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 47e90797b3..3ec1153fdf 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -84,9 +84,7 @@ def main(args): "There is already a teuthology-dispatcher process running:" f" {procs}" ) - verbose = args["--verbose"] machine_type = args["--machine-type"] - log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] exit_on_empty_queue = args["--exit-on-empty-queue"] backend = args['--queue-backend'] @@ -111,6 +109,8 @@ def main(args): if backend == 'beanstalk': connection = beanstalk.connect() beanstalk.watch_tube(connection, machine_type) + elif backend == 'paddles': + report.create_machine_type_queue(machine_type) result_proc = None @@ -152,6 +152,9 @@ def main(args): else: job = report.get_queued_job(machine_type) if job is None: + if exit_on_empty_queue and not job_procs: + log.info("Queue is empty and no supervisor processes running; exiting!") + break continue job = clean_config(job) report.try_push_job_info(job, dict(status='running')) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 55512dcf23..5dd0e4e4ca 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -82,9 +82,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): if teuth_config.results_server: try: report.try_delete_jobs(job_config['name'], job_config['job_id']) - except Exception as e: - log.warning("Unable to delete job %s, exception occurred: %s", - job_config['job_id'], e) + except Exception: + log.exception("Unable to delete job %s", job_config['job_id']) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), @@ -142,7 +141,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if 'description' in job_config: + if job_config.get('description') is not None: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/kill.py b/teuthology/kill.py index 8ce4ddad5a..2e464d6713 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -14,9 +14,7 @@ from teuthology.queue import beanstalk from teuthology import report from teuthology.lock import ops as lock_ops -from teuthology import beanstalk from teuthology.config import config -from teuthology import misc log = logging.getLogger(__name__) diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py index 2a0b6ff363..e69de29bb2 100644 --- a/teuthology/queue/__init__.py +++ b/teuthology/queue/__init__.py @@ -1,106 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.config import config - -log = logging.getLogger(__name__) - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - if config.backend == 'paddles': - paddles.main(args) - else: - beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py index 90b1cbd6d3..c668e4f6bc 100644 --- a/teuthology/queue/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@ -1,12 +1,10 @@ import beanstalkc import yaml import logging -import pprint -import sys -from collections import OrderedDict from teuthology.config import config -from teuthology import report +from teuthology.queue import util + log = logging.getLogger(__name__) @@ -47,7 +45,7 @@ def callback(jobs_dict) # Try to figure out a sane timeout based on how many jobs are in the queue timeout = job_count / 2000.0 * 60 for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = connection.reserve(timeout=timeout) if job is None or job.body is None: continue @@ -57,7 +55,7 @@ def callback(jobs_dict) if pattern is not None and pattern not in job_name: continue processor.add_job(job_id, job_config, job) - end_progress() + util.end_progress() processor.complete() @@ -105,13 +103,13 @@ def main(args): pause_tube(connection, machine_type, pause_duration) elif delete: walk_jobs(connection, machine_type, - JobDeleter(delete)) + util.JobDeleter(delete)) elif runs: walk_jobs(connection, machine_type, - RunPrinter()) + util.RunPrinter()) else: walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) + util.JobPrinter(show_desc=show_desc, full=full)) except KeyboardInterrupt: log.info("Interrupted.") finally: diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py index f2ea8b84c8..489d638e2f 100644 --- a/teuthology/queue/paddles.py +++ b/teuthology/queue/paddles.py @@ -1,11 +1,7 @@ import logging -import pprint -import sys -from collections import OrderedDict from teuthology import report -from teuthology.dispatcher import pause_queue - +from teuthology.queue import util log = logging.getLogger(__name__) @@ -14,19 +10,17 @@ def stats_queue(machine_type): stats = report.get_queue_stats(machine_type) if stats['paused'] is None: log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) else: log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) -def update_priority(machine_type, priority, user, run_name=None): +def update_priority(machine_type, priority, run_name=None): if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) + jobs = report.get_jobs_by_run(machine_type, run_name) for job in jobs: job['priority'] = priority report.try_push_job_info(job) @@ -34,55 +28,54 @@ def update_priority(machine_type, priority, user, run_name=None): def walk_jobs(machine_type, processor, user): log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] + job_count = report.get_queue_stats(machine_type)['queued_jobs'] jobs = report.get_user_jobs_queue(machine_type, user) if job_count == 0: - log.info('No jobs in queue') + log.info('No jobs in Paddles queue') return for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = jobs[i-1] if job is None: continue job_id = job['job_id'] processor.add_job(job_id, job) - end_progress() + util.end_progress() processor.complete() def main(args): machine_type = args['--machine_type'] - #user = args['--user'] - #run_name = args['--run_name'] - #priority = args['--priority'] + user = args['--user'] + run_name = args['--run-name'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] show_desc = args['--description'] full = args['--full'] pause_duration = args['--pause'] - #unpause = args['--unpause'] - #pause_duration = args['--time'] + priority = args['--priority'] try: if status: stats_queue(machine_type) if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - #else: - #pause_queue(machine_type, pause, user) + if not user: + log.info('Please enter user to pause Paddles queue') + return + report.pause_queue(machine_type, user, pause_duration) elif priority: update_priority(machine_type, priority, run_name) elif delete: walk_jobs(machine_type, - JobDeleter(delete), user) + util.JobDeleter(delete), user) elif runs: walk_jobs(machine_type, - RunPrinter(), user) + util.RunPrinter(), user) else: walk_jobs(machine_type, - JobPrinter(show_desc=show_desc, full=full), + util.JobPrinter(show_desc=show_desc, full=full), user) except KeyboardInterrupt: log.info("Interrupted.") diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py new file mode 100644 index 0000000000..2a7642e726 --- /dev/null +++ b/teuthology/queue/util.py @@ -0,0 +1,101 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report + +log = logging.getLogger(__name__) + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) diff --git a/teuthology/report.py b/teuthology/report.py index 5cb4e854cb..079b561f44 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -388,7 +388,7 @@ def last_run(self): def get_top_job(self, queue): - uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri, + uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri, queue=queue) inc = random.uniform(0, 1) with safe_while( @@ -545,13 +545,14 @@ def create_queue(self, queue): response.raise_for_status() - def update_queue(self, queue, paused, paused_by, pause_duration=None): + def update_queue(self, queue, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) + if pause_duration is not None: pause_duration = int(pause_duration) - queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by, + queue_info = {'queue': queue, 'paused_by': paused_by, 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} @@ -594,9 +595,6 @@ def queue_stats(self, queue): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {queue}".format( - queue=queue, - )) return response.json() else: msg = response.text @@ -668,12 +666,18 @@ def get_user_jobs_queue(queue, user, run_name=None): return return reporter.queued_jobs(queue, user, run_name) +def get_jobs_by_run(queue, run_name): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(queue, None, run_name) + -def pause_queue(queue, paused, paused_by, pause_duration=None): +def pause_queue(queue, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(queue, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused_by, pause_duration) def is_queue_paused(queue): diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 81dd4d548f..3a370e86db 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,7 +1,7 @@ import os import yaml -import teuthology.beanstalk +import teuthology.queue.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -115,7 +115,7 @@ def beanstalk_schedule_job(job_config, backend, num=1): """ num = int(num) tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() + beanstalk = teuthology.queue.beanstalk.connect() beanstalk.use(tube) queue = report.create_machine_type_queue(job_config['machine_type']) job_config['queue'] = queue diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py index 9a6d0ff564..16c61618a0 100644 --- a/teuthology/test/test_dispatcher.py +++ b/teuthology/test/test_dispatcher.py @@ -66,7 +66,8 @@ def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, '--machine-type': 'test_queue', '--supervisor': False, '--verbose': False, - '--queue-backend': 'paddles' + '--queue-backend': 'paddles', + '--exit-on-empty-queue': False } m = mock.MagicMock()