Skip to content

Commit

Permalink
add per-user tape recall policy. Fix #8354 (#8544)
Browse files Browse the repository at this point in the history
* add per-user tape recall policy. Fix #8354

* bump HTC bindings to 23.9.0a3 (#8547)

* fix message

* Update src/python/TaskWorker/Actions/DBSDataDiscovery.py

Co-authored-by: Thanayut Seethongchuen <[email protected]>

* Update src/python/RucioUtils.py

thanks !!

Co-authored-by: Thanayut Seethongchuen <[email protected]>

* activy is TapeRecall, not Tape Recall !

---------

Co-authored-by: Thanayut Seethongchuen <[email protected]>
  • Loading branch information
belforte and novicecpp authored Jul 16, 2024
1 parent 1e31da5 commit 31592fa
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 23 deletions.
25 changes: 24 additions & 1 deletion src/python/RucioUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from TaskWorker.WorkerExceptions import TaskWorkerException
from rucio.client import Client
from rucio.common.exception import RSENotFound
from rucio.common.exception import RSENotFound, RuleNotFound


def getNativeRucioClient(config=None, logger=None):
Expand Down Expand Up @@ -101,3 +101,26 @@ def getWritePFN(rucioClient=None, siteName='', lfn='', # pylint: disable=danger
logger.info(f"Will use {pfn} as stageout location")

return pfn


def getRuleQuota(rucioClient=None, ruleId=None):
""" return quota needed by this rule in Bytes """
size = 0
try:
rule = rucioClient.get_replication_rule(ruleId)
except RuleNotFound:
return 0
files = rucioClient.list_files(scope=rule['scope'], name= rule['name'])
size = sum(file['bytes'] for file in files)
return size


def getTapeRecallUsage(rucioClient=None, account=None):
""" size of ongoing tape recalls for this account """
activity = 'Analysis TapeRecall'
rucioAccount = account
rules = rucioClient.list_replication_rules(
filters={'account': rucioAccount, 'activity': activity})
usage = sum(getRuleQuota(rucioClient, rule['id']) for rule in rules\
if rule['state'] in ['REPLICATING', 'STUCK', 'SUSPENDED']) # in Bytes
return usage
64 changes: 46 additions & 18 deletions src/python/TaskWorker/Actions/DBSDataDiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from WMCore.Configuration import ConfigurationEx
from WMCore import Lexicon

from RucioUtils import getNativeRucioClient
from RucioUtils import getNativeRucioClient, getTapeRecallUsage

from ServerUtilities import MAX_LUMIS_IN_BLOCK, parseDBSInstance, isDatasetUserDataset
from TaskWorker.WorkerExceptions import TaskWorkerException, TapeDatasetException
Expand Down Expand Up @@ -523,29 +523,50 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes):
:rtype: str
"""
dataTier = inputDataset.split('/')[3]
totalSizeTB = int(totalSizeBytes / 1e12)
maxTierToBlockRecallSizeTB = getattr(self.config.TaskWorker, 'maxTierToBlockRecallSizeTB', 0)
maxTierToBlockRecallSize = maxTierToBlockRecallSizeTB * 1e12
maxAnyTierRecallSizeTB = getattr(self.config.TaskWorker, 'maxAnyTierRecallSizeTB', 0)
maxAnyTierRecallSize = maxAnyTierRecallSizeTB * 1e12
if dataTier in getattr(self.config.TaskWorker, 'tiersToRecall', []) or totalSizeBytes < maxAnyTierRecallSize:
maxRecallPerUserTB = getattr(self.config.TaskWorker, 'maxRecallPerUserTB', 100e3) # defaul to 100PB for testing

# how much this user is recalling already
self.logger.debug("Find how much user %s is recalling already", self.username)
usage = getTapeRecallUsage(self.rucioClient, self.username) # in Bytes
usageTB = int(usage / 1e12)
self.logger.debug("User is using %sTB.", usageTB)
# is there room for adding this recall ?
userHasQuota = (usageTB + totalSizeTB) < maxRecallPerUserTB
if not userHasQuota:
# prepare a message to send back
overQmsg = "\n Recall from tape is not possible at the moment. You need to wait until"
overQmsg += "\n enough of your ongoing tape recall are completed"
overQmsg += f"\n used so far: {usageTB}TB, this request: {totalSizeTB}TB, total: "
overQmsg += f"{usageTB + totalSizeTB}TB, max allowed: {maxRecallPerUserTB}TB"
if dataTier in getattr(self.config.TaskWorker, 'tiersToRecall', []) or totalSizeTB < maxAnyTierRecallSizeTB:
msg = f"Task could not be submitted because not all blocks of dataset {inputDataset} are on DISK"
if not userHasQuota:
msg += overQmsg
raise TaskWorkerException(msg)
msg += "\nWill request a full disk copy for you. See"
msg += "\n https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ#crab_submit_fails_with_Task_coul"
elif inputBlocks:
if totalSizeBytes < maxTierToBlockRecallSize:
if totalSizeTB < maxTierToBlockRecallSizeTB:
msg = "Task could not be submitted because blocks specified in 'Data.inputBlocks' are not on disk."
if not userHasQuota:
msg += overQmsg
raise TaskWorkerException(msg)
msg += "\nWill request a disk copy for you. See"
msg += "\n https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ#crab_submit_fails_with_Task_coul"
else:
msg = "Some blocks are on TAPE only and will not be processed."
msg += f"\n'The 'Data.inputBlocks' size is larger than allowed size ({totalSizeBytes/1e12:.3f}TB/{maxTierToBlockRecallSizeTB}TB)"\
"to issue automatically recall from TAPE."
msg += "\n'The 'Data.inputBlocks' size is larger than allowed size"\
" ({totalSizeTB}TB/{maxTierToBlockRecallSizeTB}TB) "\
"to issue automatically recall from TAPE."
msg += "\nIf you need these blocks, contact Data Transfer team via https://its.cern.ch/jira/browse/CMSTRANSF"
raise TaskWorkerException(msg)
else:
msg = "Some blocks are on TAPE only and will not be processed."
msg += "\nThis dataset is too large for automatic recall from TAPE."
msg += "\nIf you can do with only a piece, use Data.inputBlocks configuration."
msg += "\nIf you can do with only a part of the dataset, use Data.inputBlocks configuration."
msg += "\nIf you need the full dataset, contact Data Transfer team via https://its.cern.ch/jira/browse/CMSTRANSF"
raise TaskWorkerException(msg)
return msg
Expand All @@ -557,13 +578,17 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes):
# Usage: python3 DBSDataDiscovery.py dbs_instance dbsDataset [secondaryDataset]
# where dbs_instance should be either prod/global or prod/phys03
#
# Example: /MuonEG/Run2016B-23Sep2016-v3/MINIAOD
# Example: python3 DBSDataDiscovery.py prod/global /MuonEG/Run2016B-23Sep2016-v3/MINIAOD
###
import json
from ServerUtilities import newX509env

dbsInstance = sys.argv[1]
dbsDataset = sys.argv[2]
blockList = []
if '#' in dbsDataset: # accep a block name as input
blockList = [dbsDataset]
dbsDataset = dbsDataset.split('#')[0]
dbsSecondaryDataset = sys.argv[3] if len(sys.argv) == 4 else None # pylint: disable=invalid-name
DBSUrl = f"https://cmsweb.cern.ch/dbs/{dbsInstance}/DBSReader/"

Expand Down Expand Up @@ -598,17 +623,20 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes):

rucioClient = getNativeRucioClient(config=config, logger=logging.getLogger())

fileset = DBSDataDiscovery(config=config, rucioClient=rucioClient)
discovery = DBSDataDiscovery(config=config, rucioClient=rucioClient)
userConfig = {'partialdataset':False,
'inputblocks':[]
'inputblocks':blockList
}
discoveryOutput = (
fileset.execute(task={'tm_nonvalid_input_dataset': 'T', 'tm_use_parent': 0, 'user_proxy': 'None',
'tm_input_dataset': dbsDataset, 'tm_secondary_input_dataset': dbsSecondaryDataset,
'tm_taskname': 'pippo1', 'tm_username':config.Services.Rucio_account,
'tm_split_algo' : 'automatic', 'tm_split_args' : {'runs':[], 'lumis':[]},
'tm_user_config': userConfig,
'tm_dbs_url': DBSUrl}, tempDir=''))
discoveryOutput = discovery.execute(task={'tm_nonvalid_input_dataset': 'T', 'tm_use_parent': 0,
'user_proxy': 'None','tm_input_dataset': dbsDataset,
'tm_secondary_input_dataset': dbsSecondaryDataset,
'tm_taskname': 'dummyTaskName',
'tm_username':config.Services.Rucio_account,
'tm_split_algo' : 'automatic',
'tm_split_args' : {'runs':[], 'lumis':[]},
'tm_user_config': userConfig, 'tm_user_files': [],
'tm_dbs_url': DBSUrl},
tempDir='')
# discoveryOutput.result is a WMCore fileset structure
fileObjects = discoveryOutput.result.getFiles()
fileDictionaries = [file.json() for file in fileObjects]
Expand Down
15 changes: 11 additions & 4 deletions src/python/TaskWorker/Actions/RucioActions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,27 @@ def makeContainerFromBlockList(self, blockList=None, containerDid=None):
raise TaskWorkerException(msg) from e
self.logger.info("Rucio container %s:%s created with %d blocks", scope, containerName, len(blockList))

def createOrReuseRucioRule(self, did=None, grouping=None,
def createOrReuseRucioRule(self, did=None, grouping=None, activity=None,
rseExpression='', comment='', lifetime=0):
""" if Rucio reports duplicate rule exception, reuse existing one """
# Some RSE_EXPR for testing
# rseExpression = 'ddm_quota>0&(tier=1|tier=2)&rse_type=DISK'
# rseExpression = 'T3_IT_Trieste' # for testing
weight = 'ddm_quota' # only makes sense for rules which trigger replicas
# weight = None # for testing
askApproval = False
if activity == "Analysis TapeRecall":
askApproval = True
account = self.username
else:
askApproval = False
account = self.rucioAccount
# askApproval = True # for testing
copies = 1
try:
ruleIds = self.rucioClient.add_replication_rule( # N.B. returns a list
dids=[did], copies=copies, rse_expression=rseExpression,
grouping=grouping, weight=weight, lifetime=lifetime,
account=self.rucioAccount, activity='Analysis Input',
account=account, activity=activity,
comment=comment,
ask_approval=askApproval, asynchronous=True)
ruleId = ruleIds[0]
Expand Down Expand Up @@ -190,7 +195,7 @@ def recallData(self, dataToRecall=None, sizeToRecall=0, tapeLocations=None, msgH

# prepare container to be recalled
if isinstance(dataToRecall, str):
# recalling a full DBS dataset, simple the container already exists
# recalling a full DBS dataset. Simple: the container already exists
myScope = 'cms'
dbsDatasetName = dataToRecall
containerDid = {'scope': myScope, 'name': dbsDatasetName}
Expand Down Expand Up @@ -220,6 +225,7 @@ def recallData(self, dataToRecall=None, sizeToRecall=0, tapeLocations=None, msgH
lifetime = (MAX_DAYS_FOR_TAPERECALL + 7) * 24 * 60 * 60 # in seconds
ruleId = self.createOrReuseRucioRule(did=containerDid, grouping=grouping,
rseExpression=rseExpression,
activity="Analysis TapeRecall",
comment=comment, lifetime=lifetime)
msg = f"Created Rucio rule ID: {ruleId}"
self.logger.info(msg)
Expand Down Expand Up @@ -271,6 +277,7 @@ def lockData(self, dataToLock=None):

# create rule
ruleId = self.createOrReuseRucioRule(did=containerDid, rseExpression='rse_type=DISK',
activity="Analysis Input",
comment=comment, lifetime=TASKLIFETIME)
msg = f"Created Rucio rule ID: {ruleId}"
self.logger.info(msg)
Expand Down

0 comments on commit 31592fa

Please sign in to comment.