diff --git a/src/python/RucioUtils.py b/src/python/RucioUtils.py index e51fc4c158..bfb9de9211 100644 --- a/src/python/RucioUtils.py +++ b/src/python/RucioUtils.py @@ -3,7 +3,7 @@ from TaskWorker.WorkerExceptions import TaskWorkerException from rucio.client import Client -from rucio.common.exception import RSENotFound +from rucio.common.exception import RSENotFound, RuleNotFound def getNativeRucioClient(config=None, logger=None): @@ -101,3 +101,26 @@ def getWritePFN(rucioClient=None, siteName='', lfn='', # pylint: disable=danger logger.info(f"Will use {pfn} as stageout location") return pfn + + +def getRuleQuota(rucioClient=None, ruleId=None): + """ return quota needed by this rule in Bytes """ + size = 0 + try: + rule = rucioClient.get_replication_rule(ruleId) + except RuleNotFound: + return 0 + files = rucioClient.list_files(scope=rule['scope'], name= rule['name']) + size = sum(file['bytes'] for file in files) + return size + + +def getTapeRecallUsage(rucioClient=None, account=None): + """ size of ongoing tape recalls for this account """ + activity = 'Analysis TapeRecall' + rucioAccount = account + rules = rucioClient.list_replication_rules( + filters={'account': rucioAccount, 'activity': activity}) + usage = sum(getRuleQuota(rucioClient, rule['id']) for rule in rules\ + if rule['state'] in ['REPLICATING', 'STUCK', 'SUSPENDED']) # in Bytes + return usage diff --git a/src/python/TaskWorker/Actions/DBSDataDiscovery.py b/src/python/TaskWorker/Actions/DBSDataDiscovery.py index b98dcefe03..cda04fc11f 100644 --- a/src/python/TaskWorker/Actions/DBSDataDiscovery.py +++ b/src/python/TaskWorker/Actions/DBSDataDiscovery.py @@ -12,7 +12,7 @@ from WMCore.Configuration import ConfigurationEx from WMCore import Lexicon -from RucioUtils import getNativeRucioClient +from RucioUtils import getNativeRucioClient, getTapeRecallUsage from ServerUtilities import MAX_LUMIS_IN_BLOCK, parseDBSInstance, isDatasetUserDataset from TaskWorker.WorkerExceptions import TaskWorkerException, TapeDatasetException @@ -523,29 +523,50 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes): :rtype: str """ dataTier = inputDataset.split('/')[3] + totalSizeTB = int(totalSizeBytes / 1e12) maxTierToBlockRecallSizeTB = getattr(self.config.TaskWorker, 'maxTierToBlockRecallSizeTB', 0) - maxTierToBlockRecallSize = maxTierToBlockRecallSizeTB * 1e12 maxAnyTierRecallSizeTB = getattr(self.config.TaskWorker, 'maxAnyTierRecallSizeTB', 0) - maxAnyTierRecallSize = maxAnyTierRecallSizeTB * 1e12 - if dataTier in getattr(self.config.TaskWorker, 'tiersToRecall', []) or totalSizeBytes < maxAnyTierRecallSize: + maxRecallPerUserTB = getattr(self.config.TaskWorker, 'maxRecallPerUserTB', 100e3) # defaul to 100PB for testing + + # how much this user is recalling already + self.logger.debug("Find how much user %s is recalling already", self.username) + usage = getTapeRecallUsage(self.rucioClient, self.username) # in Bytes + usageTB = int(usage / 1e12) + self.logger.debug("User is using %sTB.", usageTB) + # is there room for adding this recall ? + userHasQuota = (usageTB + totalSizeTB) < maxRecallPerUserTB + if not userHasQuota: + # prepare a message to send back + overQmsg = "\n Recall from tape is not possible at the moment. You need to wait until" + overQmsg += "\n enough of your ongoing tape recall are completed" + overQmsg += f"\n used so far: {usageTB}TB, this request: {totalSizeTB}TB, total: " + overQmsg += f"{usageTB + totalSizeTB}TB, max allowed: {maxRecallPerUserTB}TB" + if dataTier in getattr(self.config.TaskWorker, 'tiersToRecall', []) or totalSizeTB < maxAnyTierRecallSizeTB: msg = f"Task could not be submitted because not all blocks of dataset {inputDataset} are on DISK" + if not userHasQuota: + msg += overQmsg + raise TaskWorkerException(msg) msg += "\nWill request a full disk copy for you. See" msg += "\n https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ#crab_submit_fails_with_Task_coul" elif inputBlocks: - if totalSizeBytes < maxTierToBlockRecallSize: + if totalSizeTB < maxTierToBlockRecallSizeTB: msg = "Task could not be submitted because blocks specified in 'Data.inputBlocks' are not on disk." + if not userHasQuota: + msg += overQmsg + raise TaskWorkerException(msg) msg += "\nWill request a disk copy for you. See" msg += "\n https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ#crab_submit_fails_with_Task_coul" else: msg = "Some blocks are on TAPE only and will not be processed." - msg += f"\n'The 'Data.inputBlocks' size is larger than allowed size ({totalSizeBytes/1e12:.3f}TB/{maxTierToBlockRecallSizeTB}TB)"\ - "to issue automatically recall from TAPE." + msg += "\n'The 'Data.inputBlocks' size is larger than allowed size"\ + " ({totalSizeTB}TB/{maxTierToBlockRecallSizeTB}TB) "\ + "to issue automatically recall from TAPE." msg += "\nIf you need these blocks, contact Data Transfer team via https://its.cern.ch/jira/browse/CMSTRANSF" raise TaskWorkerException(msg) else: msg = "Some blocks are on TAPE only and will not be processed." msg += "\nThis dataset is too large for automatic recall from TAPE." - msg += "\nIf you can do with only a piece, use Data.inputBlocks configuration." + msg += "\nIf you can do with only a part of the dataset, use Data.inputBlocks configuration." msg += "\nIf you need the full dataset, contact Data Transfer team via https://its.cern.ch/jira/browse/CMSTRANSF" raise TaskWorkerException(msg) return msg @@ -557,13 +578,17 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes): # Usage: python3 DBSDataDiscovery.py dbs_instance dbsDataset [secondaryDataset] # where dbs_instance should be either prod/global or prod/phys03 # - # Example: /MuonEG/Run2016B-23Sep2016-v3/MINIAOD + # Example: python3 DBSDataDiscovery.py prod/global /MuonEG/Run2016B-23Sep2016-v3/MINIAOD ### import json from ServerUtilities import newX509env dbsInstance = sys.argv[1] dbsDataset = sys.argv[2] + blockList = [] + if '#' in dbsDataset: # accep a block name as input + blockList = [dbsDataset] + dbsDataset = dbsDataset.split('#')[0] dbsSecondaryDataset = sys.argv[3] if len(sys.argv) == 4 else None # pylint: disable=invalid-name DBSUrl = f"https://cmsweb.cern.ch/dbs/{dbsInstance}/DBSReader/" @@ -598,17 +623,20 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes): rucioClient = getNativeRucioClient(config=config, logger=logging.getLogger()) - fileset = DBSDataDiscovery(config=config, rucioClient=rucioClient) + discovery = DBSDataDiscovery(config=config, rucioClient=rucioClient) userConfig = {'partialdataset':False, - 'inputblocks':[] + 'inputblocks':blockList } - discoveryOutput = ( - fileset.execute(task={'tm_nonvalid_input_dataset': 'T', 'tm_use_parent': 0, 'user_proxy': 'None', - 'tm_input_dataset': dbsDataset, 'tm_secondary_input_dataset': dbsSecondaryDataset, - 'tm_taskname': 'pippo1', 'tm_username':config.Services.Rucio_account, - 'tm_split_algo' : 'automatic', 'tm_split_args' : {'runs':[], 'lumis':[]}, - 'tm_user_config': userConfig, - 'tm_dbs_url': DBSUrl}, tempDir='')) + discoveryOutput = discovery.execute(task={'tm_nonvalid_input_dataset': 'T', 'tm_use_parent': 0, + 'user_proxy': 'None','tm_input_dataset': dbsDataset, + 'tm_secondary_input_dataset': dbsSecondaryDataset, + 'tm_taskname': 'dummyTaskName', + 'tm_username':config.Services.Rucio_account, + 'tm_split_algo' : 'automatic', + 'tm_split_args' : {'runs':[], 'lumis':[]}, + 'tm_user_config': userConfig, 'tm_user_files': [], + 'tm_dbs_url': DBSUrl}, + tempDir='') # discoveryOutput.result is a WMCore fileset structure fileObjects = discoveryOutput.result.getFiles() fileDictionaries = [file.json() for file in fileObjects] diff --git a/src/python/TaskWorker/Actions/RucioActions.py b/src/python/TaskWorker/Actions/RucioActions.py index 7d1ee7152e..1b995f517e 100644 --- a/src/python/TaskWorker/Actions/RucioActions.py +++ b/src/python/TaskWorker/Actions/RucioActions.py @@ -59,7 +59,7 @@ def makeContainerFromBlockList(self, blockList=None, containerDid=None): raise TaskWorkerException(msg) from e self.logger.info("Rucio container %s:%s created with %d blocks", scope, containerName, len(blockList)) - def createOrReuseRucioRule(self, did=None, grouping=None, + def createOrReuseRucioRule(self, did=None, grouping=None, activity=None, rseExpression='', comment='', lifetime=0): """ if Rucio reports duplicate rule exception, reuse existing one """ # Some RSE_EXPR for testing @@ -67,14 +67,19 @@ def createOrReuseRucioRule(self, did=None, grouping=None, # rseExpression = 'T3_IT_Trieste' # for testing weight = 'ddm_quota' # only makes sense for rules which trigger replicas # weight = None # for testing - askApproval = False + if activity == "Analysis TapeRecall": + askApproval = True + account = self.username + else: + askApproval = False + account = self.rucioAccount # askApproval = True # for testing copies = 1 try: ruleIds = self.rucioClient.add_replication_rule( # N.B. returns a list dids=[did], copies=copies, rse_expression=rseExpression, grouping=grouping, weight=weight, lifetime=lifetime, - account=self.rucioAccount, activity='Analysis Input', + account=account, activity=activity, comment=comment, ask_approval=askApproval, asynchronous=True) ruleId = ruleIds[0] @@ -190,7 +195,7 @@ def recallData(self, dataToRecall=None, sizeToRecall=0, tapeLocations=None, msgH # prepare container to be recalled if isinstance(dataToRecall, str): - # recalling a full DBS dataset, simple the container already exists + # recalling a full DBS dataset. Simple: the container already exists myScope = 'cms' dbsDatasetName = dataToRecall containerDid = {'scope': myScope, 'name': dbsDatasetName} @@ -220,6 +225,7 @@ def recallData(self, dataToRecall=None, sizeToRecall=0, tapeLocations=None, msgH lifetime = (MAX_DAYS_FOR_TAPERECALL + 7) * 24 * 60 * 60 # in seconds ruleId = self.createOrReuseRucioRule(did=containerDid, grouping=grouping, rseExpression=rseExpression, + activity="Analysis TapeRecall", comment=comment, lifetime=lifetime) msg = f"Created Rucio rule ID: {ruleId}" self.logger.info(msg) @@ -271,6 +277,7 @@ def lockData(self, dataToLock=None): # create rule ruleId = self.createOrReuseRucioRule(did=containerDid, rseExpression='rse_type=DISK', + activity="Analysis Input", comment=comment, lifetime=TASKLIFETIME) msg = f"Created Rucio rule ID: {ruleId}" self.logger.info(msg)