diff --git a/cicd/gitlab/env/test14 b/cicd/gitlab/env/test14 new file mode 100644 index 0000000000..2948d9fdfe --- /dev/null +++ b/cicd/gitlab/env/test14 @@ -0,0 +1,4 @@ +KUBECONTEXT=cmsweb-test14 +Environment=crab-dev-tw07 +REST_Instance=test14 +CRABClient_version=GH diff --git a/cicd/gitlab/parseEnv.sh b/cicd/gitlab/parseEnv.sh index f1fb49a316..50a155bab2 100755 --- a/cicd/gitlab/parseEnv.sh +++ b/cicd/gitlab/parseEnv.sh @@ -12,7 +12,7 @@ SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) TAG="${1}" # validate tag -REGEX_DEV_TAG='^pypi-(preprod|test2|test11|test12|test1)-.*' +REGEX_DEV_TAG='^pypi-(preprod|test2|test11|test12|test1|test14)-.*' REGEX_RELEASE_TAG='^v3\.[0-9]{6}.*' if [[ $TAG =~ $REGEX_DEV_TAG ]]; then # Do not quote regexp variable here IFS='-' read -ra TMPSTR <<< "${TAG}" diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index 12869e33bc..cf7a6e35cb 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -1,4 +1,7 @@ -""" This script is called by dag_bootstrap_startup.sh when the job is (re)submitted and: +""" +This script is called by dag_bootstrap_startup.sh when the job is (re)submitted. +It is not only do adjusting sites (blacklist/whitelist) but also: + - It downloads sandbox from S3 (if not exist). - It creates the webdir if necessary - It updates both the webdir ant the proxied version of it on the REST task db - For resubmission: adjust the exit codes of the PJ in the RunJobs.dag.nodes.log files and @@ -14,6 +17,7 @@ import time import glob import shutil +import logging from urllib.parse import urlencode import traceback from datetime import datetime @@ -23,8 +27,7 @@ import htcondor from RESTInteractions import CRABRest -from ServerUtilities import getProxiedWebDir, getColumn - +from ServerUtilities import getProxiedWebDir, getColumn, downloadFromS3 def printLog(msg): """ Utility function to print the timestamp in the log. Can be replaced @@ -32,6 +35,21 @@ def printLog(msg): """ print("%s: %s" % (datetime.utcnow(), msg)) +def setupStreamLogger(): + """ + Setup logger object with stream handler. Needed by `downloadFromS3()`. + + :returns: stream logger object + :rtype: logging.Logger + """ + logHandler = logging.StreamHandler() + logFormatter = logging.Formatter( + "%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s") + logHandler.setFormatter(logFormatter) + logger = logging.getLogger('AdjustSites') # hardcode + logger.addHandler(logHandler) + logger.setLevel(logging.DEBUG) + return logger def adjustPostScriptExitStatus(resubmitJobIds, filename): """ @@ -362,28 +380,24 @@ def setupLog(): os.close(logfd) -def checkTaskInfo(crabserver, ad): +def checkTaskInfo(taskDict, ad): """ - Function checks that given task is registered in the database with status SUBMITTED and with the - same clusterId and schedd name in the database as in the condor ads where it is currently running. - In case above condition is not met, script immediately terminates + Function checks that given task is registered in the database with status + SUBMITTED and with the same clusterId and schedd name in the database as in + the condor ads where it is currently running. + In case above condition is not met, script immediately terminates. + + :param taskDict: task info return from REST. + :type taskDict: dict + :param ad: kv classad + :type ad: dict """ - task = ad['CRAB_ReqName'] clusterIdOnSchedd = ad['ClusterId'] - data = {'subresource': 'search', 'workflow': task} - - try: - dictresult, _, _ = crabserver.get(api='task', data=data) - except HTTPException as hte: - printLog(traceback.format_exc()) - printLog(hte.headers) - printLog(hte.result) - sys.exit(2) - taskStatusOnDB = getColumn(dictresult, 'tm_task_status') - clusteridOnDB = getColumn(dictresult, 'clusterid') - scheddOnDB = getColumn(dictresult, 'tm_schedd') + taskStatusOnDB = getColumn(taskDict, 'tm_task_status') + clusteridOnDB = getColumn(taskDict, 'clusterid') + scheddOnDB = getColumn(taskDict, 'tm_schedd') scheddName = os.environ['schedd_name'] @@ -395,6 +409,44 @@ def checkTaskInfo(crabserver, ad): sys.exit(3) +def getSandbox(taskDict, crabserver): + """ + Getting user sandbox (sandbox.tar.gz) from S3. It will not redownload + sandbox if file exists. + + This function contains side effect where sandbox.tar.gz(_tmp) are created in + current directory. + + :param taskDict: task info return from REST. + :type taskDict: dict + :param crabserver: CRABRest object to talk with RESTCache API + :type crabserver: RESTInteractions.CRABRest + """ + sandboxTarBall = 'sandbox.tar.gz' + sandboxTarBallTmp = sandboxTarBall + '_tmp' + if os.path.exists(sandboxTarBall): + printLog('sandbox.tar.gz already exist. Do nothing.') + return + + # init logger require by downloadFromS3 + logger = setupStreamLogger() + + # get info + username = getColumn(taskDict, 'tm_username') + sandboxName = getColumn(taskDict, 'tm_user_sandbox') + + # download + try: + downloadFromS3(crabserver=crabserver, objecttype='sandbox', username=username, + tarballname=sandboxName, filepath=sandboxTarBallTmp, logger=logger) + shutil.move(sandboxTarBallTmp, sandboxTarBall) + except Exception as ex: + logger.exception("The CRAB server backend could not download the input sandbox with your code " + \ + "from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \ + "(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex)) + sys.exit(4) + + def main(): """ Need a doc string here. @@ -409,8 +461,7 @@ def main(): with open(os.environ['_CONDOR_JOB_AD']) as fd: ad = classad.parseOne(fd) - printLog("Parsed ad: %s" % ad) - + printLog("Parsed ad: %s\n" % ad) # instantiate a server object to talk with crabserver host = ad['CRAB_RestHost'] @@ -419,8 +470,24 @@ def main(): crabserver = CRABRest(host, cert, cert, retry=3, userAgent='CRABSchedd') crabserver.setDbInstance(dbInstance) + printLog("Sleeping 60 seconds to give TW time to update taskDB") time.sleep(60) # give TW time to update taskDB #8411 - checkTaskInfo(crabserver, ad) + + # get task info + task = ad['CRAB_ReqName'] + data = {'subresource': 'search', 'workflow': task} + try: + dictresult, _, _ = crabserver.get(api='task', data=data) + except HTTPException as hte: + printLog(traceback.format_exc()) + printLog(hte.headers) + printLog(hte.result) + sys.exit(2) + + # check task status + checkTaskInfo(taskDict=dictresult, ad=ad) + # get sandbox + getSandbox(taskDict=dictresult, crabserver=crabserver) # is this the first time this script runs for this task ? (it runs at each resubmit as well !) if not os.path.exists('WEB_DIR'): @@ -503,4 +570,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/scripts/dag_bootstrap_startup.sh b/scripts/dag_bootstrap_startup.sh index 0868864753..abb93a86f8 100755 --- a/scripts/dag_bootstrap_startup.sh +++ b/scripts/dag_bootstrap_startup.sh @@ -149,7 +149,8 @@ cp $_CONDOR_JOB_AD ./_CONDOR_JOB_AD if [ -e AdjustSites.py ]; then export schedd_name=`condor_config_val schedd_name` echo "Execute AdjustSites.py ..." - python3 AdjustSites.py + # need unbffered otherwise it will got weird looking log. + PYTHONUNBUFFERED=1 python3 AdjustSites.py ret=$? if [ $ret -eq 1 ]; then echo "Error: AdjustSites.py failed to update the webdir." >&2 @@ -158,11 +159,15 @@ if [ -e AdjustSites.py ]; then elif [ $ret -eq 2 ]; then echo "Error: Cannot get data from REST Interface" >&2 condor_qedit $CONDOR_ID DagmanHoldReason "'Cannot get data from REST Interface.'" - exit 1 + exit 1 elif [ $ret -eq 3 ]; then echo "Error: this dagman does not match task information in TASKS DB" >&2 condor_qedit $CONDOR_ID DagmanHoldReason "'This dagman does not match task information in TASKS DB'" exit 1 + elif [ $ret -eq 4 ]; then + echo "Error: Failed to get user sandbox from S3." >&2 + condor_qedit $CONDOR_ID DagmanHoldReason "'Failed to get user sandbox from S3.'" + exit 1 fi else echo "Error: AdjustSites.py does not exist." >&2 diff --git a/src/python/CRABInterface/RESTCache.py b/src/python/CRABInterface/RESTCache.py index 1660eaba0f..72fd084af0 100644 --- a/src/python/CRABInterface/RESTCache.py +++ b/src/python/CRABInterface/RESTCache.py @@ -13,7 +13,8 @@ # CRABServer dependecies here from CRABInterface.RESTExtensions import authz_login_valid, authz_operator -from CRABInterface.Regexps import RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME, RX_USERNAME, RX_TARBALLNAME +from CRABInterface.Regexps import (RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME, + RX_USERNAME, RX_TARBALLNAME, RX_PRESIGNED_CLIENT_METHOD) from ServerUtilities import getUsernameFromTaskname, MeasureTime @@ -99,9 +100,10 @@ def validate(self, apiobj, method, api, param, safe): validate_str('taskname', param, safe, RX_TASKNAME, optional=True) validate_str('username', param, safe, RX_USERNAME, optional=True) validate_str('tarballname', param, safe, RX_TARBALLNAME, optional=True) + validate_str('clientmethod', param, safe, RX_PRESIGNED_CLIENT_METHOD, optional=True) @restcall - def get(self, subresource, objecttype, taskname, username, tarballname): # pylint: disable=redefined-builtin + def get(self, subresource, objecttype, taskname, username, tarballname, clientmethod): # pylint: disable=redefined-builtin """ :arg str subresource: the specific information to be accessed; """ @@ -165,6 +167,8 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli authz_operator(username=ownerName, group='crab3', role='operator') if subresource == 'sandbox' and not username: raise MissingParameter("username is missing") + if not clientmethod: + clientmethod = 'get_object' # returns a PreSignedUrl to download the file within the expiration time expiration = 60 * 60 # 1 hour default is good for retries and debugging if objecttype in ['debugfiles', 'clientlog', 'twlog']: @@ -172,7 +176,7 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli try: with MeasureTime(self.logger, modulename=__name__, label="get.download.generate_presigned_post") as _: response = self.s3_client.generate_presigned_url( - 'get_object', Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey}, + clientmethod, Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey}, ExpiresIn=expiration) preSignedUrl = response except ClientError as e: diff --git a/src/python/CRABInterface/Regexps.py b/src/python/CRABInterface/Regexps.py index 035628f99b..721a033a3e 100644 --- a/src/python/CRABInterface/Regexps.py +++ b/src/python/CRABInterface/Regexps.py @@ -93,6 +93,7 @@ #subresources of Cache resource RX_SUBRES_CACHE = re.compile(r"^(upload|download|retrieve|list|used)$") RX_CACHE_OBJECTTYPE = re.compile(r"^(clientlog|twlog|sandbox|debugfiles|runtimefiles)$") +RX_PRESIGNED_CLIENT_METHOD = re.compile(r"^(get_object|head_object)$") #worker workflow RX_WORKER_NAME = re.compile(r"^[A-Za-z0-9\-\._%]{1,100}$") diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 05797268b4..6a30c01b3c 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -56,6 +56,7 @@ 'test6': {'restHost': 'cmsweb-test6.cern.ch', 'dbInstance': 'dev'}, 'test11': {'restHost': 'cmsweb-test11.cern.ch', 'dbInstance': 'devtwo'}, 'test12': {'restHost': 'cmsweb-test12.cern.ch', 'dbInstance': 'devthree'}, + 'test14': {'restHost': 'cmsweb-test14.cern.ch', 'dbInstance': 'tseethon'}, 'stefanovm': {'restHost': 'stefanovm.cern.ch', 'dbInstance': 'dev'}, 'stefanovm2': {'restHost': 'stefanovm2.cern.ch', 'dbInstance': 'dev'}, 'other': {'restHost': None, 'dbInstance': None}, @@ -750,6 +751,45 @@ def downloadFromS3(crabserver=None, filepath=None, objecttype=None, taskname=Non tarballname=tarballname, logger=logger) downloadFromS3ViaPSU(filepath=filepath, preSignedUrl=preSignedUrl, logger=logger) +def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=None, + logger=None): + """ + Check if file exist in S3. Raise exception if wget is exit with non-zero. + Usually, you will see stderr with http response `404 Not Found` if file does not exists. + Note that presigned url from GetObject API could not use by HeadObject API. + Use 'head -c1000' to fetch few bytes instead, and need to wrap inside bash + with `set -o pipefail` to make it early exit, so exit code from wget can + propagate back to Popen properly. + + :param crabserver: CRABRest object, points to CRAB Server to use + :type crabserver: RESTInteractions.CRABRest + :param objecttype: the kind of object to dowbload: clientlog|twlog|sandbox|debugfiles|runtimefiles + :type objecttype: str + :param username: the username this sandbox belongs to, in case objecttype=sandbox + :type username: str + :param tarballname: for sandbox, taskname is not used but tarballname is needed + :type tarballname: str + + :return: None, but raise exception if wget is exit with non-zero. + """ + preSignedUrl = getDownloadUrlFromS3(crabserver=crabserver, objecttype=objecttype, + username=username, tarballname=tarballname, + clientmethod='head_object', + logger=logger) + downloadCommand = '' + if os.getenv('CRAB_useGoCurl'): + raise NotImplementedError('HEAD with gocurl is not implemented') + downloadCommand += ' wget -Sq -O /dev/null --method=HEAD' + downloadCommand += ' "%s"' % preSignedUrl + + with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess: + logger.debug("Will execute:\n%s", downloadCommand) + stdout, stderr = downloadProcess.communicate() + exitcode = downloadProcess.returncode + logger.debug('exitcode: %s\nstdout: %s', exitcode, stdout) + + if exitcode != 0: + raise Exception('Download command %s failed. stderr is:\n%s' % (downloadCommand, stderr)) def retrieveFromS3(crabserver=None, objecttype=None, taskname=None, username=None, tarballname=None, logger=None): @@ -827,7 +867,8 @@ def uploadToS3(crabserver=None, filepath=None, objecttype=None, taskname=None, def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None, - username=None, tarballname=None, logger=None): + username=None, tarballname=None, clientmethod=None, + logger=None): """ obtains a PreSigned URL to access an existing object in S3 :param crabserver: a RESTInteraction/CRABRest object : points to CRAB Server to use @@ -845,6 +886,8 @@ def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None, dataDict['username'] = username if tarballname: dataDict['tarballname'] = tarballname + if clientmethod: + dataDict['clientmethod'] = clientmethod data = encodeRequest(dataDict) try: # calls to restServer alway return a 3-ple ({'result':a-list}, HTTPcode, HTTPreason) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index c2d3a4fc5b..50a46f696c 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -17,7 +17,7 @@ from ast import literal_eval from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME -from ServerUtilities import getLock, downloadFromS3 +from ServerUtilities import getLock, downloadFromS3, checkS3Object, uploadToS3 import TaskWorker.DataObjects.Result from TaskWorker.Actions.TaskAction import TaskAction @@ -541,8 +541,8 @@ def makeJobSubmit(self, task): info['additional_environment_options'] += ' CRAB_TASKMANAGER_TARBALL=local' else: raise TaskWorkerException("Cannot find TaskManagerRun.tar.gz inside the cwd: %s" % os.getcwd()) - if os.path.exists("sandbox.tar.gz"): - info['additional_input_file'] += ", sandbox.tar.gz" + + info['additional_input_file'] += ", sandbox.tar.gz" # will be available at dag bootstrap time info['additional_input_file'] += ", run_and_lumis.tar.gz" info['additional_input_file'] += ", input_files.tar.gz" info['additional_input_file'] += ", submit_env.sh" @@ -727,6 +727,16 @@ def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags): finally: tf.close() + # also upload InputFiles.tar.gz to s3 + # Wa: Now (2024) I am still not sure if we need to poll uploaded file + # until it available (see #6706). If this still the case, use + # implementation from the old DryRunUploader action. + # https://github.com/dmwm/CRABServer/blob/9b4679d14bb19ccc7373d56c20631eea34d80a69/src/python/TaskWorker/Actions/DryRunUploader.py#L61-L75 + task = kw['task']['tm_taskname'] + uploadToS3(crabserver=self.crabserver, filepath='InputFiles.tar.gz', + objecttype='runtimefiles', taskname=task, + logger=self.logger) + def createSubdag(self, splitterResult, **kwargs): startjobid = kwargs.get('startjobid', 0) @@ -1204,11 +1214,12 @@ def executeInternal(self, *args, **kw): sandboxName = kw['task']['tm_user_sandbox'] dbgFilesName = kw['task']['tm_debug_files'] try: - downloadFromS3(crabserver=self.crabserver, objecttype='sandbox', username=username, - tarballname=sandboxName, filepath=sandboxTarBall, logger=self.logger) + self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") + checkS3Object(crabserver=self.crabserver, objecttype='sandbox', + username=username, tarballname=sandboxName, logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: - raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ + raise TaskWorkerException("The CRAB server backend could not find the input sandbox with your code " + \ "from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \ "(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex)) from ex try: @@ -1233,8 +1244,6 @@ def executeInternal(self, *args, **kw): self.extractMonitorFiles(inputFiles, **kw) - if kw['task'].get('tm_user_sandbox') == 'sandbox.tar.gz': - inputFiles.append('sandbox.tar.gz') if os.path.exists("CMSRunAnalysis.tar.gz"): inputFiles.append("CMSRunAnalysis.tar.gz") if os.path.exists("TaskManagerRun.tar.gz"): diff --git a/src/python/TaskWorker/Actions/DryRun.py b/src/python/TaskWorker/Actions/DryRun.py new file mode 100644 index 0000000000..4d98956718 --- /dev/null +++ b/src/python/TaskWorker/Actions/DryRun.py @@ -0,0 +1,27 @@ +""" + +""" +from urllib.parse import urlencode + +from TaskWorker.DataObjects.Result import Result +from TaskWorker.Actions.TaskAction import TaskAction +from TaskWorker.WorkerExceptions import TaskWorkerException + + +class DryRun(TaskAction): + """ + Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) + """ + def executeInternal(self, *args, **kw): + task = kw['task'] + update = {'workflow': task['tm_taskname'], 'subresource': 'state', 'status': 'UPLOADED'} + self.logger.debug('Updating task status: %s', str(update)) + self.crabserver.post(api='workflowdb', data=urlencode(update)) + return Result(task=task, result=args[0]) + + def execute(self, *args, **kw): + try: + return self.executeInternal(*args, **kw) + except Exception as e: + msg = "Failed to run DryRun action for %s; '%s'" % (task['tm_taskname'], str(e)) + raise TaskWorkerException(msg) from e diff --git a/src/python/TaskWorker/Actions/DryRunUploader.py b/src/python/TaskWorker/Actions/DryRunUploader.py deleted file mode 100644 index 16698729f7..0000000000 --- a/src/python/TaskWorker/Actions/DryRunUploader.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) -""" -import os -import json -import tarfile -import time - -import sys -if sys.version_info >= (3, 0): - from urllib.parse import urlencode # pylint: disable=no-name-in-module -if sys.version_info < (3, 0): - from urllib import urlencode - -from WMCore.DataStructs.LumiList import LumiList - -from TaskWorker.DataObjects.Result import Result -from TaskWorker.Actions.TaskAction import TaskAction -from TaskWorker.WorkerExceptions import TaskWorkerException -from ServerUtilities import uploadToS3, downloadFromS3 - -class DryRunUploader(TaskAction): - """ - Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) - """ - - def packSandbox(self, inputFiles): - dryRunSandbox = tarfile.open('dry-run-sandbox.tar.gz', 'w:gz') - for f in inputFiles: - self.logger.debug('Adding %s to dry run tarball', f) - dryRunSandbox.add(f, recursive=True) - - dryRunSandbox.close() - - def executeInternal(self, *args, **kw): - inputFiles = args[0][2] - splitterResult = args[0][3][0] - - cwd = os.getcwd() - try: - os.chdir(kw['tempDir']) - splittingSummary = SplittingSummary(kw['task']['tm_split_algo']) - for jobgroup in splitterResult: - jobs = jobgroup.getJobs() - splittingSummary.addJobs(jobs) - splittingSummary.dump('splitting-summary.json') - inputFiles.append('splitting-summary.json') - - self.packSandbox(inputFiles) - - self.logger.info('Uploading dry run tarball to the user file cache') - t0 = time.time() - uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz', - objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger) - os.remove('dry-run-sandbox.tar.gz') - self.logger.info('Uploaded dry run tarball to the user file cache') - # wait until tarball is available, S3 may take a few seconds for this (ref. issue #6706 ) - t1 = time.time() - lt1 = time.strftime("%H:%M:%S", time.localtime(t1)) - uploadTime = t1-t0 - self.logger.debug('runtimefiles upload took %s secs and completed at %s', uploadTime, lt1) - self.logger.debug('check if tarball is available') - tarballOK = False - while not tarballOK: - try: - self.logger.debug('download tarball to /dev/null') - downloadFromS3(crabserver=self.crabserver, filepath='/dev/null', objecttype='runtimefiles', - taskname=kw['task']['tm_taskname'], logger=self.logger) - self.logger.debug('OK, it worked') - tarballOK = True - except Exception as e: - self.logger.debug('runtimefiles tarball not ready yet') - self.logger.debug('Exception was raised: %s', e) - self.logger.debug('Sleep 5 sec') - time.sleep(5) - update = {'workflow': kw['task']['tm_taskname'], 'subresource': 'state', 'status': 'UPLOADED'} - self.logger.debug('Updating task status: %s', str(update)) - self.crabserver.post(api='workflowdb', data=urlencode(update)) - - finally: - os.chdir(cwd) - - return Result(task=kw['task'], result=args[0]) - - def execute(self, *args, **kw): - try: - return self.executeInternal(*args, **kw) - except Exception as e: - msg = "Failed to upload dry run tarball for %s; '%s'" % (kw['task']['tm_taskname'], str(e)) - raise TaskWorkerException(msg) - -class SplittingSummary(object): - """ - Class which calculates some summary data about the splitting results. - """ - - def __init__(self, algo): - self.algo = algo - self.lumisPerJob = [] - self.eventsPerJob = [] - self.filesPerJob = [] - - def addJobs(self, jobs): - if self.algo == 'FileBased': - self.lumisPerJob += [sum([x.get('lumiCount', 0) for x in job['input_files']]) for job in jobs] - self.eventsPerJob += [sum([x['events'] for x in job['input_files']]) for job in jobs] - self.filesPerJob += [len(job['input_files']) for job in jobs] - elif self.algo == 'EventBased': - self.lumisPerJob += [job['mask']['LastLumi'] - job['mask']['FirstLumi'] for job in jobs] - self.eventsPerJob += [job['mask']['LastEvent'] - job['mask']['FirstEvent'] for job in jobs] - else: - for job in jobs: - avgEventsPerLumi = sum([f['avgEvtsPerLumi'] for f in job['input_files']])/float(len(job['input_files'])) - lumis = LumiList(compactList=job['mask']['runAndLumis']) - self.lumisPerJob.append(len(lumis.getLumis())) - self.eventsPerJob.append(avgEventsPerLumi * self.lumisPerJob[-1]) - - def dump(self, outname): - """ - Save splitting summary to a json file. - """ - - summary = {'algo': self.algo, - 'total_jobs': len(self.lumisPerJob), - 'total_lumis': sum(self.lumisPerJob), - 'total_events': sum(self.eventsPerJob), - 'max_lumis': max(self.lumisPerJob), - 'max_events': max(self.eventsPerJob), - 'avg_lumis': sum(self.lumisPerJob)/float(len(self.lumisPerJob)), - 'avg_events': sum(self.eventsPerJob)/float(len(self.eventsPerJob)), - 'min_lumis': min(self.lumisPerJob), - 'min_events': min(self.eventsPerJob)} - if len(self.filesPerJob) > 0: - summary.update({'total_files': sum(self.filesPerJob), - 'max_files': max(self.filesPerJob), - 'avg_files': sum(self.filesPerJob)/float(len(self.filesPerJob)), - 'min_files': min(self.filesPerJob)}) - - with open(outname, 'w') as f: - json.dump(summary, f) diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index 6f30a42ac7..8837d7ab74 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -18,7 +18,7 @@ from TaskWorker.Actions.MyProxyLogon import MyProxyLogon from TaskWorker.Actions.DagmanCreator import DagmanCreator from TaskWorker.Actions.StageoutCheck import StageoutCheck -from TaskWorker.Actions.DryRunUploader import DryRunUploader +from TaskWorker.Actions.DryRun import DryRun from TaskWorker.Actions.MakeFakeFileSet import MakeFakeFileSet from TaskWorker.Actions.DagmanSubmitter import DagmanSubmitter from TaskWorker.Actions.DBSDataDiscovery import DBSDataDiscovery @@ -199,7 +199,7 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs): handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(DagmanCreator(config=config, crabserver=crabserver, procnum=procnum, rucioClient=rucioClient)) if task['tm_dry_run'] == 'T': - handler.addWork(DryRunUploader(config=config, crabserver=crabserver, procnum=procnum)) + handler.addWork(DryRun(config=config, crabserver=crabserver, procnum=procnum)) else: handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum))