Skip to content

Commit

Permalink
The following changes support the removal of Beanstalk from Teuthology.
Browse files Browse the repository at this point in the history
In place of Beanstalk, we will now be using Paddles for queue management in Teuthology.
This PR has the corresponding changes for the paddles PR: https://github.com/ceph/paddles/pull/94/files.

The changes include:
 1. Removing all beanstalk related code
 2. Teuthology scheduler and dispatcher using Paddles queue for scheduling and dispatching jobs
 3. Adding support for Paddles queue management
 4. Additional functionality of being able to change the priority of Teuthology jobs in the queued state in the teuthology-queue command

Signed-off-by: Aishwarya Mathuria <[email protected]>
  • Loading branch information
amathuria committed May 27, 2021
1 parent 0c70841 commit 946dc36
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 214 deletions.
8 changes: 4 additions & 4 deletions scripts/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config COFNFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --tube TUBE
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE
Start a dispatcher for the specified tube. Grab jobs from a beanstalk
Start a dispatcher for the specified machine type. Grab jobs from a paddles
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.
Expand All @@ -15,9 +15,9 @@
standard arguments:
-h, --help show this help message and exit
-v, --verbose be more verbose
-t, --tube TUBE which beanstalk tube to read jobs from
-l, --log-dir LOG_DIR path in which to store logs
-a DIR, --archive-dir DIR path to archive results in
--machine-type MACHINE_TYPE the machine type for the job
--supervisor run dispatcher in job supervisor mode
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
Expand Down
2 changes: 1 addition & 1 deletion scripts/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN
Kill running teuthology jobs:
1. Removes any queued jobs from the beanstalk queue
1. Removes any queued jobs from the paddles queue
2. Kills any running jobs
3. Nukes any machines involved
Expand Down
18 changes: 11 additions & 7 deletions scripts/queue.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import docopt

import teuthology.config
import teuthology.beanstalk
import teuthology.paddles_queue

doc = """
usage: teuthology-queue -h
teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-queue [-r] -m MACHINE_TYPE
teuthology-queue -m MACHINE_TYPE -D PATTERN
teuthology-queue -p SECONDS [-m MACHINE_TYPE]
teuthology-queue -s -m MACHINE_TYPE
teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER
teuthology-queue [-r] -m MACHINE_TYPE -u USER
teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER
teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
queue.
Arguments:
-m, --machine_type MACHINE_TYPE [default: multi]
-m, --machine_type MACHINE_TYPE
Which machine type queue to work on.
optional arguments:
Expand All @@ -28,9 +29,12 @@
-p, --pause SECONDS Pause queues for a number of seconds. A value of 0
will unpause. If -m is passed, pause that queue,
otherwise pause all queues.
-P, --priority PRIORITY
Change priority of queued jobs
-u, --user USER User who owns the jobs
"""


def main():
args = docopt.docopt(doc)
teuthology.beanstalk.main(args)
teuthology.paddles_queue.main(args)
2 changes: 1 addition & 1 deletion scripts/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Queue backend name, use prefix '@'
to append job config to the given
file path as yaml.
[default: beanstalk]
[default: paddles]
-n <name>, --name <name> Name of suite run the job is part of
-d <desc>, --description <desc> Job description
-o <owner>, --owner <owner> Job owner
Expand Down
6 changes: 3 additions & 3 deletions scripts/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def main():

def parse_args():
parser = argparse.ArgumentParser(description="""
Grab jobs from a beanstalk queue and run the teuthology tests they
Grab jobs from a paddles queue and run the teuthology tests they
describe. One job is run at a time.
""")
parser.add_argument(
Expand All @@ -29,8 +29,8 @@ def parse_args():
required=True,
)
parser.add_argument(
'-t', '--tube',
help='which beanstalk tube to read jobs from',
'-m', '--machine-type',
help='which machine type the jobs will run on',
required=True,
)

Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
'orchestra': [
# For apache-libcloud when using python < 2.7.9
'backports.ssl_match_hostname',
'beanstalkc3 >= 0.4.0',
'httplib2',
'ndg-httpsclient', # for requests, urllib3
'pyasn1', # for requests, urllib3
Expand Down
40 changes: 19 additions & 21 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import subprocess
import sys
import yaml
import json

from datetime import datetime

from teuthology import setup_log_file, install_except_hook
from teuthology import beanstalk
from teuthology import report
from teuthology.config import config as teuth_config
from teuthology.exceptions import SkipJob
Expand Down Expand Up @@ -54,33 +54,39 @@ def load_config(archive_dir=None):
else:
teuth_config.archive_base = archive_dir

def clean_config(config):
result = {}
for key in config:
if config[key] is not None:
result[key] = config[key]
return result

def main(args):
# run dispatcher in job supervisor mode if --supervisor passed
if args["--supervisor"]:
return supervisor.main(args)

verbose = args["--verbose"]
tube = args["--tube"]
machine_type = args["--machine-type"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]

if archive_dir is None:
archive_dir = teuth_config.archive_base

if machine_type is None and teuth_config.machine_type is None:
return
# setup logging for disoatcher in {log_dir}
loglevel = logging.INFO
if verbose:
loglevel = logging.DEBUG
log.setLevel(loglevel)
log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
log_file_path = os.path.join(log_dir, f"dispatcher.{machine_type}.{os.getpid()}")
setup_log_file(log_file_path)
install_except_hook()

load_config(archive_dir=archive_dir)

connection = beanstalk.connect()
beanstalk.watch_tube(connection, tube)
result_proc = None

if teuth_config.teuthology_path is None:
Expand All @@ -103,18 +109,16 @@ def main(args):

load_config()

job = connection.reserve(timeout=60)
job = report.get_queued_job(machine_type)
if job is None:
continue

# bury the job so it won't be re-run if it fails
job.bury()
job_id = job.jid
log.info('Reserved job %d', job_id)
log.info('Config is: %s', job.body)
job_config = yaml.safe_load(job.body)
job_config['job_id'] = str(job_id)

job = clean_config(job)
report.try_push_job_info(job, dict(status='running'))
job_id = job.get('job_id')
log.info('Reserved job %s', job_id)
log.info('Config is: %s', job)
job_config = job

if job_config.get('stop_worker'):
keep_running = False

Expand Down Expand Up @@ -164,12 +168,6 @@ def main(args):
status='fail',
failure_reason=error_message))

# This try/except block is to keep the worker from dying when
# beanstalkc throws a SocketError
try:
job.delete()
except Exception:
log.exception("Saw exception while trying to delete job")


def lock_machines(job_config):
Expand Down
2 changes: 1 addition & 1 deletion teuthology/dispatcher/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
'--archive', job_config['archive_path'],
'--name', job_config['name'],
])
if job_config['description'] is not None:
if 'description' in job_config:
arg.extend(['--description', job_config['description']])
job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml')
arg.extend(['--', job_archive])
Expand Down
38 changes: 0 additions & 38 deletions teuthology/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import getpass


from teuthology import beanstalk
from teuthology import report
from teuthology.config import config
from teuthology import misc
Expand Down Expand Up @@ -59,7 +58,6 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None,
"you must also pass --machine-type")

if not preserve_queue:
remove_beanstalk_jobs(run_name, machine_type)
remove_paddles_jobs(run_name)
kill_processes(run_name, run_info.get('pids'))
if owner is not None:
Expand Down Expand Up @@ -101,7 +99,6 @@ def find_run_info(serializer, run_name):
if not os.path.isdir(job_dir):
continue
job_num += 1
beanstalk.print_progress(job_num, job_total, 'Reading Job: ')
job_info = serializer.job_info(run_name, job_id, simple=True)
for key in job_info.keys():
if key in run_info_fields and key not in run_info:
Expand All @@ -120,41 +117,6 @@ def remove_paddles_jobs(run_name):
report.try_delete_jobs(run_name, job_ids)


def remove_beanstalk_jobs(run_name, tube_name):
qhost = config.queue_host
qport = config.queue_port
if qhost is None or qport is None:
raise RuntimeError(
'Beanstalk queue information not found in {conf_path}'.format(
conf_path=config.yaml_path))
log.info("Checking Beanstalk Queue...")
beanstalk_conn = beanstalk.connect()
real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name)

curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready']
if curjobs != 0:
x = 1
while x != curjobs:
x += 1
job = beanstalk_conn.reserve(timeout=20)
if job is None:
continue
job_config = yaml.safe_load(job.body)
if run_name == job_config['name']:
job_id = job.stats()['id']
msg = "Deleting job from queue. ID: " + \
"{id} Name: {name} Desc: {desc}".format(
id=str(job_id),
name=job_config['name'],
desc=job_config['description'],
)
log.info(msg)
job.delete()
else:
print("No jobs in Beanstalk Queue")
beanstalk_conn.close()


def kill_processes(run_name, pids=None):
if pids:
to_kill = set(pids).intersection(psutil.pids())
Expand Down
Loading

0 comments on commit 946dc36

Please sign in to comment.