Skip to content

Commit

Permalink
HTCONDOR-2688 Use v2 htcondor bindings in python scripts
Browse files Browse the repository at this point in the history
condor_ce_trace required major changes due to altered or removed
interfaces.
  • Loading branch information
JaimeFrey committed Nov 12, 2024
1 parent e65876b commit 1028529
Show file tree
Hide file tree
Showing 17 changed files with 58 additions and 131 deletions.
4 changes: 2 additions & 2 deletions contrib/bdii/htcondor-ce-provider
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ from datetime import datetime
import subprocess
from collections import defaultdict, namedtuple
import signal
import htcondor
import classad as ca
import htcondor2 as htcondor
import classad2 as ca

SERVICE_LDIF = """dn: GLUE2ServiceID={central_manager},GLUE2GroupID=resource,o=glue
GLUE2ServiceID: {central_manager}
Expand Down
2 changes: 1 addition & 1 deletion contrib/bosco/bosco-cluster-remote-hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys

try:
import classad
import classad2 as classad
except ImportError:
sys.exit("ERROR: Could not load HTCondor Python bindings. "
"Ensure the 'htcondor' and 'classad' are in PYTHONPATH")
Expand Down
2 changes: 1 addition & 1 deletion src/collector_to_agis
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import optparse

if 'CONDOR_CONFIG' not in os.environ:
os.environ['CONDOR_CONFIG'] = '/etc/condor-ce/condor_config'
import htcondor
import htcondor2 as htcondor

import htcondorce.agis_compat

Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_host_network_check
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import fnmatch
import optparse

os.environ.setdefault('CONDOR_CONFIG', '/etc/condor-ce/condor_config')
import htcondor
import htcondor2 as htcondor
from htcondorce.tools import to_str

from socket import AF_INET, AF_INET6, inet_ntop
Expand Down
4 changes: 2 additions & 2 deletions src/condor_ce_info_status
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import optparse

os.environ.setdefault("CONDOR_CONFIG", "/etc/condor-ce/condor_config")

import classad
import htcondor
import classad2 as classad
import htcondor2 as htcondor

import htcondorce.info_query

Expand Down
6 changes: 3 additions & 3 deletions src/condor_ce_jobmetrics
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def _newstat():
return {"Running": 0, "Idle": 0, "Held": 0, "Jobs": 0}

def process_one_schedd(ad):
import htcondor
import classad
import htcondor2 as htcondor
import classad2 as classad
ad = classad.ClassAd(ad)
schedd = htcondor.Schedd(ad)
status_map = {1: "Idle", 2: "Running", 5: "Held"}
Expand All @@ -52,7 +52,7 @@ def process_one_schedd(ad):
gpu_job_count = collections.defaultdict(_newstat)
print(f"Processing CE {ad.get('Name', 'Unknown')}.", file=sys.stderr)
try:
for job in schedd.xquery("RoutedJob is UNDEFINED", ["JobStatus", 'x509UserProxyVOName',
for job in schedd.query("RoutedJob is UNDEFINED", ["JobStatus", 'x509UserProxyVOName',
'x509UserProxyFirstFQAN', 'x509userproxysubject', 'RequestGPUs']):
dn = job.get("x509userproxysubject") or 'Unknown'
vo = job.get('x509UserProxyVOName') or 'Unknown'
Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_register
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import socket
import subprocess
import sys

import htcondor
import htcondor2 as htcondor

DEFAULT_COLLECTOR_PORT="9619"

Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_router_defaults
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import re
import pwd
import classad
import classad2 as classad
from collections import OrderedDict

JOB_ROUTER_CONFIG = r"""JOB_ROUTER_TRANSFORM_Env @=jrt
Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_scitoken_exchange
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import time

os.environ.setdefault("CONDOR_CONFIG", "/etc/condor-ce/condor_config")

import htcondor
import htcondor2 as htcondor
import htcondorce.tools as ce


Expand Down
145 changes: 36 additions & 109 deletions src/condor_ce_trace
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import traceback

os.environ.setdefault("CONDOR_CONFIG", "/etc/condor-ce/condor_config")

import classad
import htcondor
#import classad
#import htcondor
import classad2 as classad
import htcondor2 as htcondor
import htcondorce.tools as ce

G_DEBUG = False
Expand All @@ -39,14 +41,10 @@ def verify_matching_condor_versions():
'Please ensure that you only have one version of HTCondor installed.')


def run_ping(address, daemon='SCHEDD'):
def run_ping(collector_name, schedd_name):
print("Testing HTCondor-CE authorization...")

if not address.startswith("<"):
address = "<%s>" % address
if daemon == "SCHEDD":
args = ["condor_ce_ping", "-addr", str(address), "-verbose", "-type", daemon, "-debug", "WRITE"]
else:
args = ["condor_ce_ping", "-addr", str(address), "-verbose", "-type", daemon, "-debug", "READ"]
args = ["condor_ce_ping", "-pool", collector_name, "-name", schedd_name, "-verbose", "-debug", "WRITE"]

try:
rc, stdout, _ = ce.run_command(args)
Expand All @@ -60,13 +58,13 @@ def run_ping(address, daemon='SCHEDD'):

if rc < 0:
raise ce.CondorRunException("Failed to ping %s; condor_ping terminated with signal %d." \
% (address, -rc))
% (schedd_name, -rc))
elif rc > 0:
if re.search('Failed to connect', stdout):
raise ce.CondorRunException("Failed to ping %s: Please contact the site's system adminstrator to " \
"ensure that the CE you're trying to contact is functional." % address)
"ensure that the CE you're trying to contact is functional." % schedd_name)
else:
message = "Failed to ping %s; authorization check exited with code %d." % (address, rc)
message = "Failed to ping %s; authorization check exited with code %d." % (schedd_name, rc)
if not G_DEBUG:
message = message + " Re-run the command with '-d' for more verbose output."
raise ce.CondorRunException(message)
Expand All @@ -76,7 +74,9 @@ def run_ping(address, daemon='SCHEDD'):
raise ce.CondorUserException("User %s does not have permissions for %s. Please contact the CE's " \
"system administrator to ensure that your user is mapped properly " \
"in the site's authentication system."
% (unauthorized_user.group(1), address))
% (unauthorized_user.group(1), schedd_name))

print(f"Verified WRITE access for scheduler daemon {schedd_name}")

def parse_opts():

Expand Down Expand Up @@ -104,91 +104,18 @@ def parse_opts():
return opts, args


def check_authz(collector_ad, schedd_ad):
print("Testing HTCondor-CE authorization...")
ping_args = [(collector_ad, 'READ', 'collector'),
(schedd_ad, 'WRITE', 'scheduler')]

for daemon_ad, cmd, dtype in ping_args:
addr = daemon_ad['MyAddress']
try:
ping_ad = htcondor.SecMan().ping(daemon_ad, cmd)
except RuntimeError:
# if python binding ping fails to authz, it raises an exception and doesn't tell us anything useful
# actually run condor_ping for troubleshooting information
msg = 'ERROR: %s access failed for %s daemon at %s.' % (cmd, dtype, addr)
if not G_DEBUG:
msg += " Re-run with '--debug' for more information."
else:
cmd = ['condor_ping', '-addr', addr, '-verbose', cmd]
_, stdout, _ = ce.run_command(cmd)
msg += '\n' + stdout
raise ce.CondorRunException(msg)
else:
print(f"Verified {cmd} access for {dtype} daemon at {addr}")
if G_DEBUG:
print("*"*5, "Ping response ClassAd", "*"*5, ping_ad)
print("*"*20)


def job_proxy_info(proxy_path):
"""Given the path to a proxy, return a dict of x509 ClassAd attrs describing the proxy
"""
results = {}
subj_rc, subj_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-subject"])
if subj_rc != 0:
raise ce.CondorUserException("Cannot parse proxy at %s." % proxy_path)
subj_stdout = subj_stdout.strip()
results['x509userproxysubject'] = subj_stdout
results['x509UserProxyFQAN'] = subj_stdout

time_rc, time_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-timeleft"])
if time_rc != 0:
raise ce.CondorUserException("Cannot parse proxy at %s." % proxy_path)
time_stdout = int(time_stdout)
if time_stdout <= 0:
print("WARNING: available proxy appears to be expired")
results['x509UserProxyExpiration'] = int(time.time()) + int(time_stdout)

vo_rc, vo_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-vo"])
if vo_rc == 0:
results['x509UserProxyVOName'] = vo_stdout.strip()

fqan_rc, fqan_stdout, _ = ce.run_command(["voms-proxy-info", "-file", proxy_path, "-fqan"])
if fqan_rc == 0:
fqan_lines = [i.strip() for i in fqan_stdout.strip().split('\n')]
results['x509UserProxyFirstFQAN'] = fqan_lines[0]
results['x509UserProxyFQAN'] += "," + ",".join(fqan_lines)

return results


def set_classad_value_type(value):
if value.lower() == 'true':
return True
elif value.lower() == 'false':
return False
elif re.match(r'\d+\.\d+$', value):
return float(value)
elif re.match(r'\d+$', value):
return int(value)

return value


def setup_user_creds():
"""Return a dict of token/X.509 attributes that are necessary for remote submission to a schedd
"""
results = {}
try:
results['SciTokensFile'] = ce.bearer_token_path()
results['scitokens_file'] = ce.bearer_token_path()
except OSError:
pass

try:
proxy = ce.x509_user_proxy_path()
results['x509userproxy'] = proxy
results.update(job_proxy_info(proxy))
except OSError:
pass

Expand All @@ -200,24 +127,24 @@ def setup_user_creds():

def check_job_submit(job_info, schedd_ad, setup_creds=True):

job_ad = classad.ClassAd()
job_ad["Cmd"] = '/usr/bin/env'
job_ad["Args"] = ''
job_ad["TransferExecutable"] = False
job_ad['Out'] = job_info['stdout_file']
job_ad['Err'] = job_info['stderr_file']
job_ad['Log'] = job_info['log_file']
job_ad['LeaveJobInQueue'] = classad.ExprTree("( StageOutFinish > 0 ) =!= true")
sub = {
"universe": "vanilla",
"executable": "/usr/bin/env",
"transfer_executable": "false",
"output": job_info['stdout_file'],
"error": job_info['stderr_file'],
"log": job_info['log_file'],
"leave_in_queue": "( StageOutFinish > 0 ) =!= true",
}
for attr in job_info['attribute']:
key, value = attr.split('=', 1)
# Accept submit format '+AttributeName'
job_ad[key.lstrip('+').strip()] = set_classad_value_type(value.strip())
sub.update({key, value})

if setup_creds:
job_ad.update(setup_user_creds())
sub.update(setup_user_creds())

if G_DEBUG:
print(f"Job ad, pre-submit:\n{job_ad}")
print(f"Job submit description:\n{sub}")

try:
schedd = htcondor.Schedd(schedd_ad)
Expand All @@ -226,18 +153,20 @@ def check_job_submit(job_info, schedd_ad, setup_creds=True):
print(f"Submitting job to schedd {schedd_ad['MyAddress']}")
ad_results = []
try:
cluster = schedd.submit(job_ad, 1, True, ad_results)
result = schedd.submit(htcondor.Submit(sub), 1, True)
except RuntimeError as exc:
raise ce.CondorRunException("- Failed to submit job to %s due to the following error:\n%s" \
% (schedd_ad['Machine'], exc))

cluster = result.cluster()
print(f"- Successful submission; cluster ID {cluster}")

print(f"Resulting job ad: {ad_results[0]}")
if G_DEBUG:
print(f"Resulting job ad: {result.clusterad()}")

print(f"Spooling cluster {cluster} files to schedd {schedd_ad['MyAddress']}")
try:
schedd.spool(ad_results)
schedd.spool(result)
except RuntimeError as exc:
raise ce.CondorRunException(f"- Failed to spool files to {schedd_ad['Machine']} due to the following error:\n{exc}")

Expand Down Expand Up @@ -321,19 +250,17 @@ def main():

try:
coll = htcondor.Collector(collector_hostname)
coll_ad = coll.locate(htcondor.DaemonTypes.Collector)
except IOError:
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, job_info['schedd_name'])
except Exception:
raise ce.CondorRunException("ERROR: Could not contact CE collector at '%s'. " % collector_hostname + \
"Verify that the Collector daemon is up with `condor_ce_status -any`.")

try:
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, job_info['schedd_name'])
except ValueError:
raise ce.CondorRunException('ERROR: Could not find CE schedd at %s.\n' % job_info['schedd_name'] + \
if schedd_ad is None:
raise ce.CondorRunException('ERROR: Could not find CE schedd %s.\n' % job_info['schedd_name'] + \
'Verify that the Scheduler daemon is up with `condor_ce_status -any`.')

os.environ.setdefault('_condor_SEC_CLIENT_AUTHENTICATION_METHODS', 'SCITOKENS,SSL,FS')
check_authz(coll_ad, schedd_ad)
run_ping(collector_hostname, job_info['schedd_name'])
try:
job_info.update(ce.generate_job_files())
check_job_submit(job_info, schedd_ad, setup_creds=not opts.skip_scitokens)
Expand Down
2 changes: 1 addition & 1 deletion src/condor_ce_view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ os.environ.setdefault('CONDOR_CONFIG', '/etc/condor-ce/condor_config')
import gunicorn.app.base
import gunicorn.glogging

import htcondor
import htcondor2 as htcondor
import htcondorce.web

ALIVE_HEARTBEAT = 60
Expand Down
2 changes: 1 addition & 1 deletion src/gratia_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

os.environ['CONDOR_CONFIG'] = '/etc/condor-ce/condor_config'

import htcondor
import htcondor2 as htcondor

GRATIA_DIR = htcondor.param['PER_JOB_HISTORY_DIR']
HISTORY_FILES = [os.path.join(GRATIA_DIR, x) for x in os.listdir(GRATIA_DIR)]
Expand Down
4 changes: 2 additions & 2 deletions src/htcondorce/info_query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import classad
import htcondor
import classad2 as classad
import htcondor2 as htcondor

import logging
_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion src/htcondorce/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def check_token_path(path, suffix=''):
# Punt on this one until we address HTCONDOR-634

try:
# 2. BEARER_TOKEN_PATH containing the path to the token
# 2. BEARER_TOKEN_FILE containing the path to the token
path = check_token_path(os.environ['BEARER_TOKEN_FILE'])
except (KeyError, FileNotFoundError):
try:
Expand Down
2 changes: 1 addition & 1 deletion src/htcondorce/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from jinja2 import Environment, FileSystemLoader, select_autoescape

import classad
import classad2 as classad
htcondor = None

import htcondorce.rrd
Expand Down
2 changes: 1 addition & 1 deletion src/htcondorce/web_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import socket
import traceback

import classad
import classad2 as classad
htcondor = None

def check_htcondor():
Expand Down
4 changes: 2 additions & 2 deletions src/verify_ce_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def find_malformed_entries(entries_config):

# Verify that the HTCondor Python bindings are in the PYTHONPATH
try:
import classad
import htcondor
import classad2 as classad
import htcondor2 as htcondor
except ImportError:
error("Could not load HTCondor Python bindings. "
+ "Please ensure that the 'htcondor' and 'classad' are in your PYTHONPATH")
Expand Down

0 comments on commit 1028529

Please sign in to comment.