From b68442233f4b8e84c45ef745287f55f69aa726f4 Mon Sep 17 00:00:00 2001 From: dachengx Date: Sat, 14 Sep 2024 23:07:21 -0500 Subject: [PATCH] Move `Shell` to utilix, move all RSE related codes to `config.py` --- bin/outsource | 14 +- log_digest/log_digest.pl | 6 +- outsource/config.py | 231 ++++++---- outsource/outsource.py | 626 ++++++++++---------------- outsource/shell.py | 91 ---- outsource/workflow/combine-wrapper.sh | 19 +- outsource/workflow/combine.py | 42 +- outsource/workflow/pegasus.conf | 15 +- outsource/workflow/process-wrapper.sh | 30 +- outsource/workflow/process.py | 183 ++++---- outsource/workflow/upload.py | 12 +- 11 files changed, 539 insertions(+), 730 deletions(-) delete mode 100644 outsource/shell.py diff --git a/bin/outsource b/bin/outsource index 3ca4b2f..175c461 100644 --- a/bin/outsource +++ b/bin/outsource @@ -4,14 +4,14 @@ import argparse import os import pymongo import numpy as np +from utilix import uconfig, xent_collection from utilix.io import load_runlist -from utilix import xent_collection from utilix.rundb import cmt_local_valid_range import straxen import cutax from outsource.outsource import Outsource -from outsource.config import uconfig, DETECTOR_DTYPES +from outsource.config import DETECTOR_DTYPES coll = xent_collection() @@ -45,9 +45,7 @@ def data_find( if number_to is not None: max_run_number = min(number_to, max_run_number) - hashes = {key: val["hash"] for key, val in st.provided_dtypes().items()} - - # setup queries for different detectors + # Setup queries for different detectors basic_queries = [] basic_queries_w_raw = [] basic_queries_wo_to_process = [] @@ -57,7 +55,7 @@ def data_find( print(f"Skipping {det} data") continue - # check if gain model is valid + # Check if gain model is valid if det == "tpc": gain_model = "pmt_000_gain_xenonnt" straxen_opt = "gain_model" @@ -101,7 +99,7 @@ def data_find( "host": "rucio-catalogue", "type": dtype, "status": "transferred", - "did": {"$regex": hashes[dtype]}, + "did": {"$regex": st.key_for("0", dtype).lineage_hash}, } } } @@ -288,7 +286,7 @@ def main(): if args.run and args.runlist: raise RuntimeError("Cannot pass both --run and --runlist. Please choose one.") - # subset of runs to consider during data find + # Subset of runs to consider during data find _runlist = None if args.run: diff --git a/log_digest/log_digest.pl b/log_digest/log_digest.pl index 0841d84..59f6213 100755 --- a/log_digest/log_digest.pl +++ b/log_digest/log_digest.pl @@ -10,7 +10,7 @@ my @known_error_patterns = ( qr/(\bERROR\b.+)/, ); -# help message +# Help message sub display_help { print "Usage: $0 [options] \n"; print "Options:\n"; @@ -30,11 +30,11 @@ sub display_help { my $args_string = join(" ", @ARGV); print "\nDigesting: $args_string\n\n"; -# initalization of hashes +# Initalization of hashes my %errors = (); my %headers = (); my %known_error_stats = (); -# nuisance variables +# Nuisance variables my $first_line = 1; my $current_file = "0"; diff --git a/outsource/config.py b/outsource/config.py index 8a64fed..fb792f5 100644 --- a/outsource/config.py +++ b/outsource/config.py @@ -1,17 +1,14 @@ import os import time -from utilix.config import Config -from utilix import DB, xent_collection +from utilix import DB, uconfig, xent_collection import admix -uconfig = Config() - base_dir = os.path.abspath(os.path.dirname(__file__)) -# these are the developer decided dependencies of data_type +# These are the developer decided dependencies of data_type DEPENDS_ON = { "records": ["raw_records"], "peaklets": ["raw_records"], @@ -27,7 +24,7 @@ "led_calibration": ["raw_records"], } -# these are datetypes to look for in RunDB +# These are datetypes to look for in RunDB ACTUALLY_STORED = { "event_info_double": [ "peak_basics", @@ -54,7 +51,7 @@ "led_calibration": ["led_calibration"], } -# do a query to see if these data types are present +# Do a query to see if these data types are present DETECTOR_DTYPES = { "tpc": { "raw": "raw_records", @@ -91,85 +88,74 @@ "muon_veto": {"raw": "raw_records_mv", "to_process": ["events_mv"], "possible": ["events_mv"]}, } -# these modes have particular data_type we care about +PER_CHUNK_DTYPES = ["records", "peaklets", "hitlets_nv", "afterpulses", "led_calibration"] +NEED_RAW_DATA_DTYPES = [ + "peaklets", + "peak_basics_he", + "hitlets_nv", + "events_mv", + "afterpulses", + "led_calibration", +] + +# LED calibration modes have particular data_type we care about LED_MODES = { "tpc_pmtap": ["afterpulses"], "tpc_commissioning_pmtap": ["afterpulses"], "tpc_pmtgain": ["led_calibration"], } -LED_DTYPES = [] -for mode, dtypes in LED_MODES.items(): - for d in dtypes: - if d not in LED_DTYPES: - LED_DTYPES.append(d) +# LED calibration particular data_type we care about +LED_DTYPES = list(set().union(*LED_MODES.values())) db = DB() coll = xent_collection() -def get_hashes(st): - return {key: val["hash"] for key, val in st.provided_dtypes().items()} - - class RunConfig: - """Object that gets passed to outsource for each run/workflow. + """The configuration of how a run will be processed. - This base class has essentially the same info as a dictionary passed - as input + The class will focus on the RSE and instruction to the outsource + submitter. """ - _force = False - _standalone_download = False - _chunks_per_job = uconfig.getint("Outsource", "chunks_per_job") - - def __init__(self, **kwargs): - # default job priority - workflows will be given priority + # Data availability to site selection map. + # desired_sites mean condor will try to run the job on those sites + rse_site_map = { + "UC_OSG_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, + "UC_DALI_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, + "UC_MIDWAY_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, + "CCIN2P3_USERDISK": {"desired_sites": "CCIN2P3", "expr": 'GLIDEIN_Site == "CCIN2P3"'}, + "CNAF_TAPE_USERDISK": {}, + "CNAF_USERDISK": {}, + "LNGS_USERDISK": {}, + "NIKHEF2_USERDISK": {"desired_sites": "NIKHEF", "expr": 'GLIDEIN_Site == "NIKHEF"'}, + "NIKHEF_USERDISK": {"desired_sites": "NIKHEF", "expr": 'GLIDEIN_Site == "NIKHEF"'}, + "SURFSARA_USERDISK": {"desired_sites": "SURFsara", "expr": 'GLIDEIN_Site == "SURFsara"'}, + "WEIZMANN_USERDISK": {"desired_sites": "Weizmann", "expr": 'GLIDEIN_Site == "Weizmann"'}, + "SDSC_USERDISK": {"expr": 'GLIDEIN_ResourceName == "SDSC-Expanse"'}, + "SDSC_NSDF_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, + } + + chunks_per_job = uconfig.getint("Outsource", "chunks_per_job") + + def __init__(self, context, run_id, force=False, standalone_download=False): + self.context = context + self.run_id = run_id + self.force = force + self.standalone_download = standalone_download + + # Default job priority - workflows will be given priority # in the order they were submitted. - self._priority = 2250000000 - int(time.time()) - assert self._priority > 0 - - for key, val in kwargs.items(): - setattr(self, "_" + key, val) - - @property - def priority(self): - return self._priority - - @property - def chunks_per_job(self): - return self._chunks_per_job - - @property - def force(self): - return self._force - - @property - def standalone_download(self): - return self._standalone_download - - @property - def input_location(self): - return self._input_location - - @property - def output_location(self): - return self._output_location - + self.priority = 2250000000 - int(time.time()) + assert self.priority > 0 -class DBConfig(RunConfig): - """Uses RunDB to build _dbcfgs info.""" + self.run_data = db.get_data(self.run_id) + self.set_requirements_base() - def __init__(self, number, st, **kwargs): - self.number = number - self.run_data = db.get_data(self.number) - - self.context = st - self.hashes = get_hashes(st) - - # get the detectors and start time of this run + # Get the detectors and start time of this run cursor = coll.find_one( - {"number": self.number}, {"detectors": 1, "start": 1, "_id": 0, "mode": 1} + {"number": self.run_id}, {"detectors": 1, "start": 1, "_id": 0, "mode": 1} ) self.detectors = cursor["detectors"] self.start = cursor["start"] @@ -178,46 +164,96 @@ def __init__(self, number, st, **kwargs): self.detectors, list ), f"Detectors needs to be a list, not a {type(self.detectors)}" - super().__init__(**kwargs) - # get the data_type that need to be processed + # Get the data_type that need to be processed self.needs_processed = self.get_needs_processed() - # determine which rse the input data is on - self.rses = self.get_dependencies_rses() + # Determine which rse the input data is on + self.dependencies_rses = self.get_dependencies_rses() + + def set_requirements_base(self): + requirements_base = "HAS_SINGULARITY && HAS_CVMFS_xenon_opensciencegrid_org" + requirements_base += " && PORT_2880 && PORT_8000 && PORT_27017" + requirements_base += ' && (Microarch >= "x86_64-v3")' + requirements_base_us = requirements_base + ' && GLIDEIN_Country == "US"' + if uconfig.getboolean("Outsource", "us_only", fallback=False): + requirements_base = requirements_base_us + + # hs06_test_run limits the run_id to a set of compute nodes + # at UChicago with a known HS06 factor + if uconfig.getboolean("Outsource", "hs06_test_run", fallback=False): + requirements_base += ( + ' && GLIDEIN_ResourceName == "MWT2" && regexp("uct2-c4[1-7]", Machine)' + ) + # this_site_only limits the run_id to a set of compute nodes at UChicago for testing + this_site_only = uconfig.get("Outsource", "this_site_only", fallback="") + if this_site_only: + requirements_base += f' && GLIDEIN_ResourceName == "{this_site_only}"' + self.requirements_base = requirements_base + self.requirements_base_us = requirements_base_us + + @property + def _exclude_sites(self): + """Exclude sites from the user _dbcfgs file.""" + + if not uconfig.has_option("Outsource", "exclude_sites"): + return "" + + sites = uconfig.get_list("Outsource", "exclude_sites") + exprs = [] + for site in sites: + exprs.append(f'GLIDEIN_Site =!= "{site}"') + return " && ".join(exprs) + + def get_requirements(self, rses): + # Determine the job requirements based on the data locations + sites_expression, desired_sites = self._determine_target_sites(rses) + requirements = self.requirements_base if len(rses) > 0 else self.requirements_base_us + if sites_expression: + requirements += f" && ({sites_expression})" + # us nodes + requirements_us = self.requirements_base_us + # Add excluded nodes + if self._exclude_sites: + requirements += f" && ({self._exclude_sites})" + requirements_us += f" && ({self._exclude_sites})" + return requirements, requirements_us def depends_on(self, dtype): return DEPENDS_ON[dtype] + def key_for(self, dtype): + return self.context.key_for(f"{self.run_id:06d}", dtype) + def get_needs_processed(self): """Returns the list of data_type we need to process.""" - # do we need to process? read from xenon_config + # Do we need to process? read from xenon_config requested_dtypes = uconfig.get_list("Outsource", "dtypes") if self.mode in LED_MODES: - # if we are using LED data, only process those dtypes - # for this context, see if we have that data yet + # If we are using LED data, only process those dtypes + # For this context, see if we have that data yet requested_dtypes = [ dtype for dtype in requested_dtypes if dtype in LED_MODES[self.mode] ] else: - # if we are not, don't process those dtypes + # If we are not, don't process those dtypes requested_dtypes = list(set(requested_dtypes) - set(LED_DTYPES)) - # get all possible dtypes we can process for this run + # Get all possible dtypes we can process for this run possible_dtypes = [] for detector in self.detectors: possible_dtypes.extend(DETECTOR_DTYPES[detector]["possible"]) - # modify requested_dtypes to only consider the possible ones + # Modify requested_dtypes to only consider the possible ones requested_dtypes = [dtype for dtype in requested_dtypes if dtype in possible_dtypes] ret = [] for category in requested_dtypes: dtypes_already_processed = [] for dtype in ACTUALLY_STORED[category]: - hash = self.hashes[dtype] - rses = db.get_rses(self.number, dtype, hash) - # if this data is not on any rse, reprocess it, or we are asking for a rerun + hash = self.context.key_for(f"{self.run_id:06d}", dtype).lineage_hash + rses = db.get_rses(self.run_id, dtype, hash) + # If this data is not on any rse, reprocess it, or we are asking for a rerun dtypes_already_processed.append(len(rses) > 0) if not all(dtypes_already_processed) or self.force: ret.append(category) @@ -233,24 +269,24 @@ def get_dependencies_rses(self): input_dtypes = self.depends_on(dtype) _rses_tmp = [] for input_dtype in input_dtypes: - hash = self.hashes[input_dtype] - _rses_tmp.extend(db.get_rses(self.number, input_dtype, hash)) + hash = self.context.key_for(f"{self.run_id:06d}", input_dtype).lineage_hash + _rses_tmp.extend(db.get_rses(self.run_id, input_dtype, hash)) rses[dtype] = list(set(_rses_tmp)) return rses def nchunks(self, dtype): - # get the dtype this one depends on + # Get the dtype this one depends on dtype = self.depends_on(dtype)[0] - hash = self.hashes[dtype] - did = f"xnt_{self.number:06d}:{dtype}-{hash}" + hash = self.context.key_for(f"{self.run_id:06d}", dtype).lineage_hash + did = f"xnt_{self.run_id:06d}:{dtype}-{hash}" files = admix.rucio.list_files(did) - # subtract 1 for metadata + # Subtract 1 for metadata return len(files) - 1 def _raw_data_exists(self, raw_type="raw_records"): """Returns a boolean for whether or not raw data exists in rucio and is accessible.""" - # it's faster to just go through RunDB + # It's faster to just go through RunDB for data in self.run_data: if ( @@ -262,3 +298,24 @@ def _raw_data_exists(self, raw_type="raw_records"): ): return True return False + + def _determine_target_sites(self, rses): + """Given a list of RSEs, limit the runs for sites for those + locations.""" + + exprs = [] + sites = [] + for rse in rses: + if rse in self.rse_site_map: + if "expr" in self.rse_site_map[rse]: + exprs.append(self.rse_site_map[rse]["expr"]) + if "desired_sites" in self.rse_site_map[rse]: + sites.append(self.rse_site_map[rse]["desired_sites"]) + + # make sure we do not request XENON1T sites we do not need + if len(sites) == 0: + sites.append("NONE") + + final_expr = " || ".join(exprs) + desired_sites = ",".join(sites) + return final_expr, desired_sites diff --git a/outsource/outsource.py b/outsource/outsource.py index 170632a..a08e72d 100644 --- a/outsource/outsource.py +++ b/outsource/outsource.py @@ -1,21 +1,18 @@ #!/usr/bin/env python3 import os +import sys import getpass import time import shutil from datetime import datetime import numpy as np from tqdm import tqdm -from utilix.rundb import DB +from utilix import DB, uconfig +from utilix.x509 import _validate_x509_proxy from utilix.config import setup_logger, set_logging_level import cutax -from outsource.config import uconfig, base_dir, DBConfig -from outsource.shell import Shell - - -# Pegasus environment from Pegasus.api import ( Operation, Namespace, @@ -31,6 +28,8 @@ ReplicaCatalog, ) +from outsource.config import base_dir, RunConfig, PER_CHUNK_DTYPES, NEED_RAW_DATA_DTYPES + IMAGE_PREFIX = "/cvmfs/singularity.opensciencegrid.org/xenonnt/base-environment:" COMBINE_WRAPPER = "combine-wrapper.sh" PROCESS_WRAPPER = "process-wrapper.sh" @@ -41,39 +40,11 @@ EVENTS_MEMORY = uconfig.getint("Outsource", "events_memory") EVENTS_DISK = uconfig.getint("Outsource", "events_disk") -PER_CHUNK_DTYPES = ["records", "peaklets", "hitlets_nv", "afterpulses", "led_calibration"] -NEED_RAW_DATA_DTYPES = [ - "peaklets", - "peak_basics_he", - "hitlets_nv", - "events_mv", - "afterpulses", - "led_calibration", -] - db = DB() class Outsource: - # Data availability to site selection map. - # desired_sites mean condor will try to run the job on those sites - _rse_site_map = { - "UC_OSG_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, - "UC_DALI_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, - "UC_MIDWAY_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, - "CCIN2P3_USERDISK": {"desired_sites": "CCIN2P3", "expr": 'GLIDEIN_Site == "CCIN2P3"'}, - "CNAF_TAPE_USERDISK": {}, - "CNAF_USERDISK": {}, - "LNGS_USERDISK": {}, - "NIKHEF2_USERDISK": {"desired_sites": "NIKHEF", "expr": 'GLIDEIN_Site == "NIKHEF"'}, - "NIKHEF_USERDISK": {"desired_sites": "NIKHEF", "expr": 'GLIDEIN_Site == "NIKHEF"'}, - "SURFSARA_USERDISK": {"desired_sites": "SURFsara", "expr": 'GLIDEIN_Site == "SURFsara"'}, - "WEIZMANN_USERDISK": {"desired_sites": "Weizmann", "expr": 'GLIDEIN_Site == "Weizmann"'}, - "SDSC_USERDISK": {"expr": 'GLIDEIN_ResourceName == "SDSC-Expanse"'}, - "SDSC_NSDF_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, - } - - # transformation map (high level name -> script) + # Transformation map (high level name -> script) _transformations_map = { "combine": COMBINE_WRAPPER, "download": PROCESS_WRAPPER, @@ -90,7 +61,7 @@ class Outsource: "led": PROCESS_WRAPPER, } - # jobs details for a given datatype + # Jobs details for a given datatype # disk is in KB, memory in MB job_kwargs = { "combine": dict(name="combine", memory=COMBINE_MEMORY, disk=COMBINE_DISK), @@ -109,8 +80,6 @@ class Outsource: "led_calibration": dict(name="led", memory=PEAKLETS_MEMORY, disk=PEAKLETS_DISK), } - _x509_proxy = os.getenv("X509_USER_PROXY") - def __init__( self, runlist, @@ -123,42 +92,46 @@ def __init__( force=False, debug=True, ): - """Creates a new Outsource object. + self.logger = setup_logger( + "outsource", uconfig.get("Outsource", "logging_level", fallback="WARNING") + ) + # Reduce the logging of request and urllib3 + set_logging_level( + "urllib3", uconfig.get("Outsource", "db_logging_level", fallback="WARNING") + ) - Specifying a list of DBConfig objects required. - """ + # Assume that if the image is not a full path, it is a name + if not os.path.exists(image): + self.image_tag = image + self.singularity_image = f"{IMAGE_PREFIX}{image}" + else: + self.image_tag = image.split(":")[-1] + self.singularity_image = image + + # Check if the environment used to run this script is consistent with the container + if self.image_tag not in sys.executable: + raise EnvironmentError( + f"The current environment's python: {sys.executable} " + f"is not consistent with the aimed container: {self.image_tag}. " + "Please use the following command to activate the correct environment: \n" + f"source /cvmfs/xenon.opensciencegrid.org/releases/nT/{self.image_tag}/setup.sh" + ) if not isinstance(runlist, list): raise RuntimeError("Outsource expects a list of DBConfigs to run") - self._runlist = runlist - # setup context + # Setup context self.context_name = context_name self.xedocs_version = xedocs_version self.context = getattr(cutax.contexts, context_name)(xedocs_version=self.xedocs_version) - # Load from xenon_config self.debug = debug - # Assume that if the image is not a full path, it is a name - if not os.path.exists(image): - self.image_tag = image - self.singularity_image = f"{IMAGE_PREFIX}{image}" - else: - self.image_tag = image.split(":")[-1] - self.singularity_image = image self.force = force self.upload_to_rucio = upload_to_rucio self.update_db = update_db - self.logger = setup_logger( - "outsource", uconfig.get("Outsource", "logging_level", fallback="WARNING") - ) - # Reduce the logging of request and urllib3 - set_logging_level( - "urllib3", uconfig.get("Outsource", "db_logging_level", fallback="WARNING") - ) - + # Load from xenon_config self.work_dir = uconfig.get("Outsource", "work_dir") # User can provide a name for the workflow, otherwise it will be the current time @@ -170,6 +143,42 @@ def __init__( self.outputs_dir = os.path.join(self.workflow_dir, "outputs") self.scratch_dir = os.path.join(self.workflow_dir, "scratch") + @property + def workflow(self): + return os.path.join(self.generated_dir, "workflow.yml") + + @property + def runlist(self): + return os.path.join(self.generated_dir, "runlist.txt") + + def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000_000): + """Wrapper for a Pegasus job, also sets resource requirement profiles. + + Memory in unit of MB, and disk in unit of MB. + """ + job = Job(name) + + if run_on_submit_node: + job.add_selector_profile(execution_site="local") + # No other attributes on a local job + return job + + job.add_profiles(Namespace.CONDOR, "request_cpus", cores) + + # Increase memory/disk if the first attempt fails + memory = ( + "ifthenelse(isundefined(DAGNodeRetry) || " + f"DAGNodeRetry == 0, {memory}, (DAGNodeRetry + 1)*{memory})" + ) + disk_str = ( + "ifthenelse(isundefined(DAGNodeRetry) || " + f"DAGNodeRetry == 0, {disk}, (DAGNodeRetry + 1)*{disk})" + ) + job.add_profiles(Namespace.CONDOR, "request_disk", disk_str) + job.add_profiles(Namespace.CONDOR, "request_memory", memory) + + return job + def _setup_workflow_id(self, workflow_id): """Set up the workflow ID.""" # Determine a unique id for the workflow. If none passed, looks at the runlist. @@ -194,55 +203,115 @@ def _setup_workflow_id(self, workflow_id): ) self.workflow_id = "-".join(workflow_id) - @property - def x509_proxy(self): - assert self._x509_proxy, "Please provide a valid X509_USER_PROXY environment variable." - return self._x509_proxy + def _generate_sc(self): + sc = SiteCatalog() - @property - def workflow(self): - return os.path.join(self.generated_dir, "workflow.yml") + # local site - this is the submit host + local = Site("local") + scratch_dir = Directory(Directory.SHARED_SCRATCH, path=f"{self.scratch_dir}") + scratch_dir.add_file_servers(FileServer(f"file:///{self.scratch_dir}", Operation.ALL)) + storage_dir = Directory(Directory.LOCAL_STORAGE, path=self.outputs_dir) + storage_dir.add_file_servers(FileServer(f"file:///{self.outputs_dir}", Operation.ALL)) + local.add_directories(scratch_dir, storage_dir) - @property - def runlist(self): - return os.path.join(self.generated_dir, "runlist.txt") + local.add_profiles(Namespace.ENV, HOME=os.environ["HOME"]) + local.add_profiles(Namespace.ENV, GLOBUS_LOCATION="") + local.add_profiles( + Namespace.ENV, + PATH="/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/envs/XENONnT_development/bin:/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/condabin:/usr/bin:/bin", # noqa + ) + local.add_profiles( + Namespace.ENV, + LD_LIBRARY_PATH="/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/envs/XENONnT_development/lib64:/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/envs/XENONnT_development/lib", # noqa + ) + local.add_profiles(Namespace.ENV, PEGASUS_SUBMITTING_USER=os.environ["USER"]) + local.add_profiles(Namespace.ENV, X509_USER_PROXY=os.environ["X509_USER_PROXY"]) + # local.add_profiles(Namespace.ENV, RUCIO_LOGGING_FORMAT="%(asctime)s %(levelname)s %(message)s") # noqa + if not self.debug: + local.add_profiles(Namespace.ENV, RUCIO_ACCOUNT="production") + # Improve python logging / suppress depreciation warnings (from gfal2 for example) + local.add_profiles(Namespace.ENV, PYTHONUNBUFFERED="1") + local.add_profiles(Namespace.ENV, PYTHONWARNINGS="ignore::DeprecationWarning") - def submit(self, force=False): - """Main interface to submitting a new workflow.""" + # staging site + staging = Site("staging") + scratch_dir = Directory( + Directory.SHARED_SCRATCH, + path=f"/ospool/uc-shared/project/xenon/wf-scratch/{getpass.getuser()}", + ) + scratch_dir.add_file_servers( + FileServer( + f"osdf:///ospool/uc-shared/project/xenon/wf-scratch/{getpass.getuser()}", + Operation.ALL, + ) + ) + staging.add_directories(scratch_dir) - # does workflow already exist? - if os.path.exists(self.workflow_dir): - if force: - self.logger.warning( - f"Overwriting workflow at {self.workflow_dir}. CTRL+C now to stop." - ) - time.sleep(10) - shutil.rmtree(self.workflow_dir) - else: - raise RuntimeError(f"Workflow already exists at {self.workflow_dir}.") + # staging site - davs + staging_davs = Site("staging-davs") + scratch_dir = Directory( + Directory.SHARED_SCRATCH, path=f"/xenon/scratch/{getpass.getuser()}" + ) + scratch_dir.add_file_servers( + FileServer( + f"gsidavs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/{getpass.getuser()}", + Operation.ALL, + ) + ) + staging_davs.add_directories(scratch_dir) - # work dirs - os.makedirs(self.generated_dir, 0o755, exist_ok=True) - os.makedirs(self.runs_dir, 0o755, exist_ok=True) - os.makedirs(self.outputs_dir, 0o755, exist_ok=True) + # output on davs + output_dir = Directory(Directory.LOCAL_STORAGE, path=f"/xenon/output/{getpass.getuser()}") + output_dir.add_file_servers( + FileServer( + f"gsidavs://xenon-gridftp.grid.uchicago.edu:2880/xenon/output/{getpass.getuser()}", + Operation.ALL, + ) + ) + staging_davs.add_directories(output_dir) - # ensure we have a proxy with enough time left - self._validate_x509_proxy() + # condorpool + condorpool = Site("condorpool") + condorpool.add_profiles(Namespace.PEGASUS, style="condor") + condorpool.add_profiles(Namespace.CONDOR, universe="vanilla") + # We need the x509 proxy for Rucio transfers + condorpool.add_profiles( + Namespace.CONDOR, key="x509userproxy", value=os.environ["X509_USER_PROXY"] + ) + condorpool.add_profiles( + Namespace.CONDOR, key="+SingularityImage", value=f'"{self.singularity_image}"' + ) - # generate the workflow - wf = self._generate_workflow() + # Ignore the site settings - the container will set all this up inside + condorpool.add_profiles(Namespace.ENV, OSG_LOCATION="") + condorpool.add_profiles(Namespace.ENV, GLOBUS_LOCATION="") + condorpool.add_profiles(Namespace.ENV, PYTHONPATH="") + condorpool.add_profiles(Namespace.ENV, PERL5LIB="") + condorpool.add_profiles(Namespace.ENV, LD_LIBRARY_PATH="") - if len(wf.jobs): - # submit the workflow - self._plan_and_submit(wf) + condorpool.add_profiles(Namespace.ENV, PEGASUS_SUBMITTING_USER=os.environ["USER"]) + condorpool.add_profiles( + Namespace.ENV, RUCIO_LOGGING_FORMAT="%(asctime)s %(levelname)s %(message)s" + ) + if not self.debug: + condorpool.add_profiles(Namespace.ENV, RUCIO_ACCOUNT="production") - if self.debug: - wf.graph( - output=os.path.join(self.generated_dir, "workflow_graph.dot"), label="xform-id" - ) - wf.graph( - output=os.path.join(self.generated_dir, "workflow_graph.svg"), label="xform-id" - ) + # Improve python logging / suppress depreciation warnings (from gfal2 for example) + condorpool.add_profiles(Namespace.ENV, PYTHONUNBUFFERED="1") + condorpool.add_profiles(Namespace.ENV, PYTHONWARNINGS="ignore::DeprecationWarning") + + sc.add_sites( + local, + staging_davs, + condorpool, + ) + return sc + + def _generate_tc(self): + return TransformationCatalog() + + def _generate_rc(self): + return ReplicaCatalog() def _generate_workflow(self): """Use the Pegasus API to build an abstract graph of the workflow.""" @@ -254,7 +323,7 @@ def _generate_workflow(self): tc = self._generate_tc() rc = self._generate_rc() - # add executables to the wf-level transformation catalog + # Add executables to the wf-level transformation catalog for job_type, script in self._transformations_map.items(): t = Transformation( job_type, @@ -270,7 +339,7 @@ def _generate_workflow(self): combinepy = File("combine.py") rc.add_replica("local", "combine.py", f"file://{base_dir}/workflow/combine.py") - # add common data files to the replica catalog + # Add common data files to the replica catalog xenon_config = File(".xenon_config") rc.add_replica("local", ".xenon_config", f"file://{uconfig.config_path}") @@ -295,96 +364,74 @@ def _generate_workflow(self): # runs iterator = self._runlist if len(self._runlist) == 1 else tqdm(self._runlist) - # keep track of what runs we submit, useful for bookkeeping + # Keep track of what runs we submit, useful for bookkeeping runlist = [] - for run in iterator: - dbcfg = DBConfig(run, self.context, force=self.force) + for run_id in iterator: + dbcfg = RunConfig(self.context, run_id, force=self.force) - # check if this run needs to be processed + # Check if this run_id needs to be processed if len(dbcfg.needs_processed) > 0: - self.logger.debug(f"Adding run {dbcfg.number:06d} to the workflow") + self.logger.debug(f"Adding run_id {dbcfg.run_id:06d} to the workflow") else: self.logger.debug( - f"Run {dbcfg.number:06d} is already processed with context {self.context_name}" + f"Run {dbcfg.run_id:06d} is already processed with context {self.context_name}" ) continue - requirements_base = "HAS_SINGULARITY && HAS_CVMFS_xenon_opensciencegrid_org" - requirements_base += " && PORT_2880 && PORT_8000 && PORT_27017" - requirements_base += ' && (Microarch >= "x86_64-v3")' - requirements_base_us = requirements_base + ' && GLIDEIN_Country == "US"' - if uconfig.getboolean("Outsource", "us_only", fallback=False): - requirements_base = requirements_base_us - - # hs06_test_run limits the run to a set of compute nodes - # at UChicago with a known HS06 factor - if uconfig.getboolean("Outsource", "hs06_test_run", fallback=False): - requirements_base += ( - ' && GLIDEIN_ResourceName == "MWT2" && regexp("uct2-c4[1-7]", Machine)' - ) - # this_site_only limits the run to a set of compute nodes at UChicago for testing - this_site_only = uconfig.get("Outsource", "this_site_only", fallback="") - if this_site_only: - requirements_base += f' && GLIDEIN_ResourceName == "{this_site_only}"' - - # will have combine jobs for all the PER_CHUNK_DTYPES we passed + # Will have combine jobs for all the PER_CHUNK_DTYPES we passed combine_jobs = {} - # get dtypes to process + # Get dtypes to process for dtype_i, dtype in enumerate(dbcfg.needs_processed): - # these dtypes need raw data + # These dtypes need raw data if dtype in NEED_RAW_DATA_DTYPES: - # check that raw data exist for this run + # Check that raw data exist for this run_id if not all( [dbcfg._raw_data_exists(raw_type=d) for d in dbcfg.depends_on(dtype)] ): self.logger.warning( - f"Doesn't have raw data for {dtype} of run {run}, skipping" + f"Doesn't have raw data for {dtype} of run_id {run_id}, skipping" ) continue - self.logger.debug(f"Adding {dbcfg.number:06d}-{dtype}") - if dbcfg.number not in runlist: - runlist.append(dbcfg.number) - rses = dbcfg.rses[dtype] + self.logger.debug(f"Adding {dbcfg.key_for(dtype)}") + if dbcfg.run_id not in runlist: + runlist.append(dbcfg.run_id) + rses = dbcfg.dependencies_rses[dtype] if len(rses) == 0: if dtype == "raw_records": raise RuntimeError( - f"Unable to find a raw records location for {dbcfg.number:06d}" + f"Unable to find a raw records location for {dbcfg.run_id:06d}" ) else: self.logger.warning( - f"No data found for {dbcfg.number:06d}-{dtype}... " - f"hopefully those will be created by the workflow" + f"No data found as the dependency of {dbcfg.key_for(dtype)}. " + f"Hopefully those will be created by the workflow" ) - # determine the job requirements based on the data locations rses_specified = uconfig.get("Outsource", "raw_records_rse").split(",") - # for standalone downloads, only target us + # For standalone downloads, only target us if dbcfg.standalone_download: rses = rses_specified - # For low level data, we only want to run on sites + # For low level data, we only want to run_id on sites # that we specified for raw_records_rse if dtype in NEED_RAW_DATA_DTYPES: rses = list(set(rses) & set(rses_specified)) assert len(rses) > 0, ( - f"No sites found for {dbcfg.number:06d}-{dtype}, " + f"No sites found for {dbcfg.key_for(dtype)}, " "since no intersection between the available rses " f"{rses} and the specified raw_records_rses {rses_specified}" ) - sites_expression, desired_sites = self._determine_target_sites(rses) - # general compute jobs - requirements = requirements_base if len(rses) > 0 else requirements_base_us - if sites_expression: - requirements += f" && ({sites_expression})" - # us nodes - requirements_us = requirements_base_us - # add excluded nodes - if self._exclude_sites: - requirements += f" && ({self._exclude_sites})" - requirements_us += f" && ({self._exclude_sites})" + sites_expression, desired_sites = dbcfg._determine_target_sites(rses) + self.logger.debug(f"Site expression from RSEs list: {sites_expression}") + self.logger.debug( + "XENON_DESIRED_Sites from RSEs list " + f"(mostly used for European sites): {desired_sites}" + ) + + requirements, requirements_us = dbcfg.get_requirements(rses) if dtype in PER_CHUNK_DTYPES: # Set up the combine job first - @@ -398,13 +445,13 @@ def _generate_workflow(self): # priority is given in the order they were submitted combine_job.add_profiles(Namespace.CONDOR, "priority", f"{dbcfg.priority}") combine_job.add_inputs(combinepy, xenon_config, cutax_tarball, token) - combine_output_tar_name = f"{dbcfg.number:06d}-{dtype}-combined.tar.gz" + combine_output_tar_name = f"{dbcfg.key_for(dtype)}-combined.tar.gz" combine_output_tar = File(combine_output_tar_name) combine_job.add_outputs( combine_output_tar, stage_out=(not self.upload_to_rucio) ) combine_job.add_args( - f"{dbcfg.number:06d}", + dbcfg.run_id, dtype, self.context_name, self.xedocs_version, @@ -416,7 +463,7 @@ def _generate_workflow(self): wf.add_jobs(combine_job) combine_jobs[dtype] = (combine_job, combine_output_tar) - # add jobs, one for each input file + # Add jobs, one for each input file n_chunks = dbcfg.nchunks(dtype) chunk_list = np.arange(n_chunks) @@ -431,11 +478,11 @@ def _generate_workflow(self): self.logger.debug(f" ... adding job for chunk files: {chunk_str}") - # standalone download is a special case where we download data + # standalone_download is a special case where we download data # from rucio first, which is useful for testing and when using # dedicated clusters with storage if dbcfg.standalone_download: - data_tar = File(f"{dbcfg.number:06d}-{dtype}-data-{job_i:04d}.tar.gz") + data_tar = File(f"{dbcfg.key_for(dtype)}-data-{job_i:04d}.tar.gz") download_job = self._job( "download", disk=self.job_kwargs["download"]["disk"] ) @@ -446,7 +493,7 @@ def _generate_workflow(self): Namespace.CONDOR, "priority", f"{dbcfg.priority}" ) download_job.add_args( - f"{dbcfg.number:06d}", + dbcfg.run_id, self.context_name, self.xedocs_version, dtype, @@ -461,10 +508,8 @@ def _generate_workflow(self): wf.add_jobs(download_job) # output files - job_output_tar = File( - f"{dbcfg.number:06d}-{dtype}-output-{job_i:04d}.tar.gz" - ) - # do we already have a local copy? + job_output_tar = File(f"{dbcfg.key_for(dtype)}-output-{job_i:04d}.tar.gz") + # Do we already have a local copy? job_output_tar_local_path = os.path.join( self.outputs_dir, f"{job_output_tar}" ) @@ -478,8 +523,8 @@ def _generate_workflow(self): # Add job job = self._job(**self.job_kwargs[dtype]) - if desired_sites and len(desired_sites) > 0: - # give a hint to glideinWMS for the sites + if desired_sites: + # Give a hint to glideinWMS for the sites # we want(mostly useful for XENONVO in Europe) job.add_profiles( Namespace.CONDOR, "+XENON_DESIRED_Sites", f'"{desired_sites}"' @@ -489,7 +534,7 @@ def _generate_workflow(self): # job.add_profiles(Namespace.CONDOR, 'periodic_remove', periodic_remove) job.add_args( - f"{dbcfg.number:06d}", + dbcfg.run_id, self.context_name, self.xedocs_version, dtype, @@ -504,13 +549,13 @@ def _generate_workflow(self): job.add_outputs(job_output_tar, stage_out=(not self.upload_to_rucio)) wf.add_jobs(job) - # all strax jobs depend on the pre-flight or a download job, + # All strax jobs depend on the pre-flight or a download job, # but pre-flight jobs have been outdated so it is not necessary. if dbcfg.standalone_download: job.add_inputs(data_tar) wf.add_dependency(job, parents=[download_job]) - # update combine job + # Update combine job combine_job.add_inputs(job_output_tar) wf.add_dependency(job, children=[combine_job]) @@ -522,9 +567,9 @@ def _generate_workflow(self): if len(parent_combines): wf.add_dependency(job, parents=parent_combines) else: - # high level data.. we do it all on one job + # High level data.. we do it all on one job # output files - job_output_tar = File(f"{dbcfg.number:06d}-{dtype}-output.tar.gz") + job_output_tar = File(f"{dbcfg.key_for(dtype)}-output.tar.gz") # Add job job = self._job(**self.job_kwargs[dtype], cores=2) @@ -535,7 +580,7 @@ def _generate_workflow(self): # Note that any changes to this argument list, # also means process-wrapper.sh has to be updated job.add_args( - f"{dbcfg.number:06d}", + dbcfg.run_id, self.context_name, self.xedocs_version, dtype, @@ -546,12 +591,11 @@ def _generate_workflow(self): ) job.add_inputs(processpy, xenon_config, token, cutax_tarball) - job.add_outputs( - job_output_tar, stage_out=True - ) # as long as we are giving outputs + # As long as we are giving outputs + job.add_outputs(job_output_tar, stage_out=True) wf.add_jobs(job) - # if there are multiple levels to the workflow, + # If there are multiple levels to the workflow, # need to have current process-wrapper.sh depend on previous combine-wrapper.sh for d in dbcfg.depends_on(dtype): @@ -566,7 +610,7 @@ def _generate_workflow(self): wf.add_site_catalog(sc) wf.write(file=self.workflow) - # save the runlist + # Save the runlist np.savetxt(self.runlist, runlist, fmt="%0d") return wf @@ -577,221 +621,47 @@ def _plan_and_submit(self, wf): wf.plan( conf=f"{base_dir}/workflow/pegasus.conf", submit=not self.debug, + cleanup="none", sites=["condorpool"], + verbose=3 if self.debug else 0, staging_sites={"condorpool": "staging-davs"}, output_sites=["staging-davs"], dir=os.path.dirname(self.runs_dir), relative_dir=os.path.basename(self.runs_dir), ) - self.logger.info(f"Worfklow written to \n\n\t{self.runs_dir}\n\n") - - def _validate_x509_proxy(self, min_valid_hours=20): - """Ensure $X509_USER_PROXY exists and has enough time left. - - This is necessary only if you are going to use Rucio. - """ - x509_user_proxy = os.getenv("X509_USER_PROXY") - assert x509_user_proxy, "Please provide a valid X509_USER_PROXY environment variable." - - self.logger.debug("Verifying that the X509_USER_PROXY proxy has enough lifetime") - shell = Shell(f"grid-proxy-info -timeleft -file {x509_user_proxy}") - shell.run() - valid_hours = int(shell.get_outerr()) / 60 / 60 - if valid_hours < min_valid_hours: - raise RuntimeError( - f"User proxy is only valid for {valid_hours} hours. " - f"Minimum required is {min_valid_hours} hours." - ) - - def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000_000): - """Wrapper for a Pegasus job, also sets resource requirement profiles. - - Memory in unit of MB, and disk in unit of MB. - """ - job = Job(name) - - if run_on_submit_node: - job.add_selector_profile(execution_site="local") - # no other attributes on a local job - return job - - job.add_profiles(Namespace.CONDOR, "request_cpus", cores) - - # increase memory/disk if the first attempt fails - memory = ( - "ifthenelse(isundefined(DAGNodeRetry) || " - f"DAGNodeRetry == 0, {memory}, (DAGNodeRetry + 1)*{memory})" - ) - disk_str = ( - "ifthenelse(isundefined(DAGNodeRetry) || " - f"DAGNodeRetry == 0, {disk}, (DAGNodeRetry + 1)*{disk})" - ) - job.add_profiles(Namespace.CONDOR, "request_disk", disk_str) - job.add_profiles(Namespace.CONDOR, "request_memory", memory) - - return job - - def _data_find_chunks(self, rc, rucio_dataset): - """ - Look up which chunk files are in the dataset - return a dict where the keys are the - chunks, and the values a dict of locations - """ - - self.logger.debug("Querying Rucio for files in the data set " + rucio_dataset) - result = rc.ListFiles(rucio_dataset) - chunks_files = [f["name"] for f in result if "json" not in f["name"]] - return chunks_files - - def _determine_target_sites(self, rses): - """Given a list of RSEs, limit the runs for sites for those - locations.""" - - exprs = [] - sites = [] - for rse in rses: - if rse in self._rse_site_map: - if "expr" in self._rse_site_map[rse]: - exprs.append(self._rse_site_map[rse]["expr"]) - if "desired_sites" in self._rse_site_map[rse]: - sites.append(self._rse_site_map[rse]["desired_sites"]) - - # make sure we do not request XENON1T sites we do not need - if len(sites) == 0: - sites.append("NONE") - - final_expr = " || ".join(exprs) - desired_sites = ",".join(sites) - self.logger.debug(f"Site expression from RSEs list: {final_expr}") - self.logger.debug( - f"XENON_DESIRED_Sites from RSEs list (mostly used for European sites): {desired_sites}" - ) - return final_expr, desired_sites - - @property - def _exclude_sites(self): - """Exclude sites from the user _dbcfgs file.""" - - if not uconfig.has_option("Outsource", "exclude_sites"): - return "" + def submit(self, force=False): + """Main interface to submitting a new workflow.""" - sites = uconfig.get_list("Outsource", "exclude_sites") - exprs = [] - for site in sites: - exprs.append(f'GLIDEIN_Site =!= "{site}"') - return " && ".join(exprs) + # Does workflow already exist? + if os.path.exists(self.workflow_dir): + if force: + self.logger.warning( + f"Overwriting workflow at {self.workflow_dir}. CTRL+C now to stop." + ) + time.sleep(10) + shutil.rmtree(self.workflow_dir) + else: + raise RuntimeError(f"Workflow already exists at {self.workflow_dir}.") - def _generate_sc(self): - sc = SiteCatalog() + # Ensure we have a proxy with enough time left + _validate_x509_proxy() - # local site - this is the submit host - local = Site("local") - scratch_dir = Directory(Directory.SHARED_SCRATCH, path=f"{self.scratch_dir}") - scratch_dir.add_file_servers(FileServer(f"file:///{self.scratch_dir}", Operation.ALL)) - storage_dir = Directory(Directory.LOCAL_STORAGE, path=self.outputs_dir) - storage_dir.add_file_servers(FileServer(f"file:///{self.outputs_dir}", Operation.ALL)) - local.add_directories(scratch_dir, storage_dir) + os.makedirs(self.generated_dir, 0o755, exist_ok=True) + os.makedirs(self.runs_dir, 0o755, exist_ok=True) + os.makedirs(self.outputs_dir, 0o755, exist_ok=True) - local.add_profiles(Namespace.ENV, HOME=os.environ["HOME"]) - local.add_profiles(Namespace.ENV, GLOBUS_LOCATION="") - local.add_profiles( - Namespace.ENV, - PATH="/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/envs/XENONnT_development/bin:/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/condabin:/usr/bin:/bin", # noqa - ) - local.add_profiles( - Namespace.ENV, - LD_LIBRARY_PATH="/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/envs/XENONnT_development/lib64:/cvmfs/xenon.opensciencegrid.org/releases/nT/development/anaconda/envs/XENONnT_development/lib", # noqa - ) - local.add_profiles(Namespace.ENV, PEGASUS_SUBMITTING_USER=os.environ["USER"]) - local.add_profiles(Namespace.ENV, X509_USER_PROXY=self.x509_proxy) - # local.add_profiles(Namespace.ENV, RUCIO_LOGGING_FORMAT="%(asctime)s %(levelname)s %(message)s") # noqa - if not self.debug: - local.add_profiles(Namespace.ENV, RUCIO_ACCOUNT="production") - # improve python logging / suppress depreciation warnings (from gfal2 for example) - local.add_profiles(Namespace.ENV, PYTHONUNBUFFERED="1") - local.add_profiles(Namespace.ENV, PYTHONWARNINGS="ignore::DeprecationWarning") + # Generate the workflow + wf = self._generate_workflow() - # staging site - staging = Site("staging") - scratch_dir = Directory( - Directory.SHARED_SCRATCH, - path="/ospool/uc-shared/project/xenon/wf-scratch/{}".format(getpass.getuser()), - ) - scratch_dir.add_file_servers( - FileServer( - "osdf:///ospool/uc-shared/project/xenon/wf-scratch/{}".format(getpass.getuser()), - Operation.ALL, - ) - ) - staging.add_directories(scratch_dir) + if len(wf.jobs): + # Submit the workflow + self._plan_and_submit(wf) - # staging site - davs - staging_davs = Site("staging-davs") - scratch_dir = Directory( - Directory.SHARED_SCRATCH, path="/xenon/scratch/{}".format(getpass.getuser()) - ) - scratch_dir.add_file_servers( - FileServer( - "gsidavs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/{}".format( - getpass.getuser() - ), - Operation.ALL, + if self.debug: + wf.graph( + output=os.path.join(self.generated_dir, "workflow_graph.dot"), label="xform-id" ) - ) - staging_davs.add_directories(scratch_dir) - - # output on davs - output_dir = Directory( - Directory.LOCAL_STORAGE, path="/xenon/output/{}".format(getpass.getuser()) - ) - output_dir.add_file_servers( - FileServer( - "gsidavs://xenon-gridftp.grid.uchicago.edu:2880/xenon/output/{}".format( - getpass.getuser() - ), - Operation.ALL, + wf.graph( + output=os.path.join(self.generated_dir, "workflow_graph.svg"), label="xform-id" ) - ) - staging_davs.add_directories(output_dir) - - # condorpool - condorpool = Site("condorpool") - condorpool.add_profiles(Namespace.PEGASUS, style="condor") - condorpool.add_profiles(Namespace.CONDOR, universe="vanilla") - # we need the x509 proxy for Rucio transfers - condorpool.add_profiles(Namespace.CONDOR, key="x509userproxy", value=self.x509_proxy) - condorpool.add_profiles( - Namespace.CONDOR, key="+SingularityImage", value=f'"{self.singularity_image}"' - ) - - # ignore the site settings - the container will set all this up inside - condorpool.add_profiles(Namespace.ENV, OSG_LOCATION="") - condorpool.add_profiles(Namespace.ENV, GLOBUS_LOCATION="") - condorpool.add_profiles(Namespace.ENV, PYTHONPATH="") - condorpool.add_profiles(Namespace.ENV, PERL5LIB="") - condorpool.add_profiles(Namespace.ENV, LD_LIBRARY_PATH="") - - condorpool.add_profiles(Namespace.ENV, PEGASUS_SUBMITTING_USER=os.environ["USER"]) - condorpool.add_profiles( - Namespace.ENV, RUCIO_LOGGING_FORMAT="%(asctime)s %(levelname)s %(message)s" - ) - if not self.debug: - condorpool.add_profiles(Namespace.ENV, RUCIO_ACCOUNT="production") - - # improve python logging / suppress depreciation warnings (from gfal2 for example) - condorpool.add_profiles(Namespace.ENV, PYTHONUNBUFFERED="1") - condorpool.add_profiles(Namespace.ENV, PYTHONWARNINGS="ignore::DeprecationWarning") - - sc.add_sites( - local, - staging_davs, - # output, - condorpool, - ) - return sc - - def _generate_tc(self): - return TransformationCatalog() - - def _generate_rc(self): - return ReplicaCatalog() diff --git a/outsource/shell.py b/outsource/shell.py deleted file mode 100644 index 8780eaa..0000000 --- a/outsource/shell.py +++ /dev/null @@ -1,91 +0,0 @@ -import subprocess -import os -import threading -import tempfile -import time - - -class Shell(object): - """Provides a shell callout with buffered stdout/stderr, error handling and - timeout.""" - - def __init__(self, cmd, timeout_secs=1 * 60 * 60, log_cmd=False, log_outerr=False): - self._cmd = cmd - self._timeout_secs = timeout_secs - self._log_cmd = log_cmd - self._log_outerr = log_outerr - self._process = None - self._out_file = None - self._outerr = "" - self._duration = 0.0 - - def run(self): - def target(): - - self._process = subprocess.Popen( - self._cmd, - shell=True, - stdout=self._out_file, - stderr=subprocess.STDOUT, - preexec_fn=os.setpgrp, - ) - self._process.communicate() - - if self._log_cmd: - print(self._cmd) - - # temp file for the stdout/stderr - self._out_file = tempfile.TemporaryFile(prefix="outsource-", suffix=".out") - - ts_start = time.time() - - thread = threading.Thread(target=target) - thread.start() - - thread.join(self._timeout_secs) - if thread.is_alive(): - # do our best to kill the whole process group - try: - kill_cmd = "kill -TERM -%d" % (os.getpgid(self._process.pid)) - kp = subprocess.Popen(kill_cmd, shell=True) - kp.communicate() - self._process.terminate() - except Exception: - pass - thread.join() - # log the output - self._out_file.seek(0) - stdout = self._out_file.read().decode("utf-8").strip() - if self._log_outerr and len(stdout) > 0: - print(stdout) - self._out_file.close() - raise RuntimeError( - "Command timed out after %d seconds: %s" % (self._timeout_secs, self._cmd) - ) - - self._duration = time.time() - ts_start - - # log the output - self._out_file.seek(0) - self._outerr = self._out_file.read().decode("utf-8").strip() - if self._log_outerr and len(self._outerr) > 0: - print(self._outerr) - self._out_file.close() - - if self._process.returncode != 0: - raise RuntimeError( - "Command exited with non-zero exit code (%d): %s\n%s" - % (self._process.returncode, self._cmd, self._outerr) - ) - - def get_outerr(self): - """Returns the combined stdout and stderr from the command.""" - return self._outerr - - def get_exit_code(self): - """Returns the exit code from the process.""" - return self._process.returncode - - def get_duration(self): - """Returns the timing of the command (seconds)""" - return self._duration diff --git a/outsource/workflow/combine-wrapper.sh b/outsource/workflow/combine-wrapper.sh index 38f44e9..0cdcd25 100755 --- a/outsource/workflow/combine-wrapper.sh +++ b/outsource/workflow/combine-wrapper.sh @@ -2,7 +2,7 @@ set -e -runid=$1 +run_id=$1 dtype=$2 context=$3 xedocs_version=$4 @@ -23,7 +23,7 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then combine_extra_args="$combine_extra_args --upload-to-rucio" fi -# the rest of the arguments are the inputs +# The rest of the arguments are the inputs START=$(date +%s) for TAR in `ls *.tar.gz`; do tar -xzf $TAR @@ -38,7 +38,7 @@ ls -l data echo echo -#echo "Total amount of data before combine: "`du -s --si .` +echo "Total amount of data before combine: "`du -s --si . | cut -f1` echo echo @@ -49,18 +49,17 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then export RUCIO_ACCOUNT=production fi -echo "--- Installing cutax ---" +echo "Installing cutax:" mkdir cutax tar -xzf cutax.tar.gz -C cutax --strip-components=1 -pip install ./cutax --user --no-deps -qq +# Install in a very quiet mode by -qq +pip install ./cutax --user --no-deps --qq python -c "import cutax; print(cutax.__file__)" -chmod +x combine.py +# Combine the data +time python combine.py ${run_id} ${dtype} --context ${context} --xedocs_version ${xedocs_version} --input data ${combine_extra_args} -# combine the data -time ./combine.py ${runid} ${dtype} --input data --context ${context} --xedocs_version ${xedocs_version} ${combine_extra_args} - -# check data dir again +# Check data dir again echo "data dir:" ls -l data diff --git a/outsource/workflow/combine.py b/outsource/workflow/combine.py index 6e297cf..cf0f637 100755 --- a/outsource/workflow/combine.py +++ b/outsource/workflow/combine.py @@ -19,20 +19,16 @@ admix.clients._init_clients() -def get_hashes(st): - return {dt: item["hash"] for dt, item in st.provided_dtypes().items()} - - def merge( runid_str, # run number padded with 0s dtype, # data type 'level' e.g. records, peaklets st, # strax context path, # path where the data is stored ): - # get the storage path, since will need to reset later + # Get the storage path, since will need to reset later _storage_paths = [storage.path for storage in st.storage] - # initialize plugin needed for processing + # Initialize plugin needed for processing plugin = st._get_plugins((dtype,), runid_str)[dtype] st._set_plugin_config(plugin, runid_str, tolerant=False) plugin.setup() @@ -44,19 +40,19 @@ def merge( continue key = strax.DataKey(runid_str, keystring, plugin.lineage) saver = st.storage[0].saver(key, plugin.metadata(runid_str, keystring)) - # monkey patch the saver + # Monkey patch the saver tmpname = os.path.split(saver.tempdirname)[1] dirname = os.path.split(saver.dirname)[1] saver.tempdirname = os.path.join(path, tmpname) saver.dirname = os.path.join(path, dirname) saver.is_forked = True - # merge the jsons + # Merge the jsons saver.close() - # change the storage frontend to use the merged data + # Change the storage frontend to use the merged data st.storage[0] = strax.DataDirectory(path) - # rechunk the data if we can + # Rechunk the data if we can for keystring in plugin.provides: if keystring not in to_merge: continue @@ -78,7 +74,7 @@ def merge( dest = os.path.join(st.storage[1].path, f"{key}") shutil.copytree(src, dest) - # reset in case we need to merge more data + # Reset in case we need to merge more data st.storage = [strax.DataDirectory(path) for path in _storage_paths] @@ -93,7 +89,7 @@ def check_chunk_n(directory): if n_chunks != 0: n_metadata_chunks = len(metadata["chunks"]) - # check that the number of chunks in storage + # Check that the number of chunks in storage # is less than or equal to the number of chunks in metadata assert n_chunks == n_metadata_chunks or n_chunks == n_metadata_chunks - 1, ( "For directory %s, \ @@ -106,7 +102,7 @@ def check_chunk_n(directory): compressor = metadata["compressor"] dtype = eval(metadata["dtype"]) - # check that the chunk length is agreed with promise in metadata + # Check that the chunk length is agreed with promise in metadata for i in range(n_chunks): chunk = strax.load_file(files[i], compressor=compressor, dtype=dtype) if metadata["chunks"][i]["n"] != len(chunk): @@ -115,14 +111,14 @@ def check_chunk_n(directory): f"but metadata says {metadata['chunks'][i]['n']}" ) - # check that the last chunk is empty + # Check that the last chunk is empty if n_chunks == n_metadata_chunks - 1: assert ( metadata["chunks"][n_chunks]["n"] == 0 ), "Empty chunk has non-zero length in metadata!" else: - # check that the number of chunks in metadata is 1 + # Check that the number of chunks in metadata is 1 assert ( len(metadata["chunks"]) == 1 ), "There are %s chunks in storage, but metadata says %s" % ( @@ -157,14 +153,14 @@ def main(): final_path = "finished_data" - # get context + # Get context st = getattr(cutax.contexts, args.context)(xedocs_version=args.xedocs_version) st.storage = [ strax.DataDirectory("./"), strax.DataDirectory(final_path), # where we are copying data to ] - # check what data is in the output folder + # Check what data is in the output folder dtypes = [d.split("-")[1] for d in os.listdir(path)] if any([d in dtypes for d in ["lone_hits", "pulse_counts", "veto_regions"]]): @@ -178,7 +174,7 @@ def main(): else: plugin_levels = ["peaklets"] - # merge + # Merge for dtype in plugin_levels: print(f"Merging {dtype} level") merge(runid_str, dtype, st, path) @@ -186,17 +182,17 @@ def main(): # print(f"Current contents of {final_path}:") # print(os.listdir(final_path)) - # now upload the merged metadata - # setup the rucio client(s) + # Now upload the merged metadata + # Setup the rucio client(s) if not args.upload_to_rucio: print("Ignoring rucio upload. Exiting") return - # need to patch the storage one last time + # Need to patch the storage one last time st.storage = [strax.DataDirectory(final_path)] for this_dir in os.listdir(final_path): - # prepare list of dicts to be uploaded + # Prepare list of dicts to be uploaded _run, keystring, straxhash = this_dir.split("-") # We don't want to upload records to rucio @@ -207,7 +203,7 @@ def main(): dataset_did = admix.utils.make_did(runid, keystring, straxhash) scope, dset_name = dataset_did.split(":") - # based on the dtype and the utilix config, where should this data go? + # Based on the dtype and the utilix config, where should this data go? if keystring in ["records", "pulse_counts", "veto_regions"]: rse = uconfig.get("Outsource", "records_rse") elif keystring in ["peaklets", "lone_hits", "merged_s2s", "hitlets_nv"]: diff --git a/outsource/workflow/pegasus.conf b/outsource/workflow/pegasus.conf index c89f089..22d19bb 100644 --- a/outsource/workflow/pegasus.conf +++ b/outsource/workflow/pegasus.conf @@ -3,22 +3,21 @@ pegasus.metrics.app = XENON pegasus.data.configuration = nonsharedfs -# provide a full kickstart record, including the environment. even for -# successful jobs -#pegasus.gridstart.arguments = -f +# Provide a full kickstart record, including the environment. even for successful jobs +# pegasus.gridstart.arguments = -f # pegasus.mode = development -# give jobs a total of 1+{retry} tries +# Give jobs a total of 1+{retry} tries dagman.retry = 2 -# make sure we do start too many jobs at the same time +# Make sure we do start too many jobs at the same time dagman.maxidle = 5000 -# total number of jobs cap -#dagman.maxjobs = 300 +# Total number of jobs cap +# dagman.maxjobs = 300 -# transfer parallelism +# Transfer parallelism pegasus.transfer.threads = 1 # Help Pegasus developers by sharing performance data (optional) diff --git a/outsource/workflow/process-wrapper.sh b/outsource/workflow/process-wrapper.sh index ca7fbdf..1a51599 100755 --- a/outsource/workflow/process-wrapper.sh +++ b/outsource/workflow/process-wrapper.sh @@ -15,8 +15,7 @@ chunks=${args[@]:8} echo $@ -echo "Chunks: $chunks" -start_dir=$PWD +echo "Processing chunks: $chunks" extraflags="" @@ -36,7 +35,7 @@ fi . /opt/XENONnT/setup.sh -# sleep random amount of time to spread out e.g. API calls and downloads +# Sleep random amount of time to spread out e.g. API calls and downloads sleep $(( RANDOM % 20 + 1 ))s @@ -51,17 +50,17 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then export RUCIO_ACCOUNT=production fi -echo "Start dir is $start_dir. Here's whats inside:" +echo "Current dir is $PWD. Here's whats inside:" ls -lah unset http_proxy export HOME=$PWD export XENON_CONFIG=$PWD/.xenon_config -# do we still neeed these? -export XDG_CACHE_HOME=${start_dir}/.cache -export XDG_CONFIG_HOME=${start_dir}/.config +# Do we still neeed these? +export XDG_CACHE_HOME=$PWD/.cache +export XDG_CONFIG_HOME=$PWD/.config -echo "--- RUCIO/X509 Stuff ---" +echo "RUCIO/X509 Stuff:" env | grep X509 env | grep RUCIO @@ -70,21 +69,21 @@ rucio whoami echo if [ "X${standalone_download}" = "Xno-download" ]; then - # we are given a tarball from the previous download job + # We are given a tarball from the previous download job echo "Untaring input data..." tar -xzf *-data*.tar.gz fi -echo "--- Installing cutax ---" +echo "Installing cutax:" mkdir cutax tar -xzf cutax.tar.gz -C cutax --strip-components=1 pip install ./cutax --user --no-deps -qq python -c "import cutax; print(cutax.__file__)" -# see if we have any input tarballs -echo "--- Checking if we have any input tarballs ---" +# See if we have any input tarballs +echo "Checking if we have any input tarballs:" runid_pad=`printf %06d $run_id` if [ -f ./$runid_pad*.tar.gz ]; then mkdir data @@ -96,7 +95,7 @@ if [ -f ./$runid_pad*.tar.gz ]; then fi echo -echo "--- Check RunDB API ---" +echo "Check RunDB API:" echo "Pinging xenon-runsdb.grid.uchicago.edu" ping -c 5 xenon-runsdb.grid.uchicago.edu echo @@ -118,12 +117,11 @@ then chunkarg="--chunks ${chunks}" fi -chmod +x process.py -./process.py ${run_id} --output ${output_dtype} --context ${context} --xedocs_version ${xedocs_version} ${extraflags} ${chunkarg} +time python process.py ${run_id} --context ${context} --xedocs_version ${xedocs_version} --output ${output_dtype} ${extraflags} ${chunkarg} if [[ $? -ne 0 ]]; then - echo "exiting with status 25" + echo "Exiting with status 25" exit 25 fi diff --git a/outsource/workflow/process.py b/outsource/workflow/process.py index 736e9b6..4b468c5 100755 --- a/outsource/workflow/process.py +++ b/outsource/workflow/process.py @@ -21,18 +21,12 @@ import cutax straxen.Events.save_when = strax.SaveWhen.TARGET -print("We have forced events to save always.") -print("Initiing clients...") admix.clients._init_clients() -print("Clients initiated") - -print("Initiing DB...") db = DB() -print("DB initiated") -# these dtypes we need to rechunk, so don't upload to rucio here! -rechunk_dtypes = [ +# These dtypes we need to rechunk, so don't upload to rucio here! +RECHUNK_DTYPES = [ "pulse_counts", "veto_regions", "peaklets", @@ -42,8 +36,8 @@ "led_calibration", ] -# these dtypes will not be uploaded to rucio, and will be removed after processing -ignore_dtypes = [ +# These dtypes will not be uploaded to rucio, and will be removed after processing +IGNORE_DTYPES = [ "records", "records_nv", "lone_raw_records_nv", @@ -56,8 +50,8 @@ "lone_hites", # added to avoid duplicating upload/staging ] -# these dtypes should always be made at the same time: -buddy_dtypes = [ +# These dtypes should always be made at the same time: +BUDDY_DTYPES = [ ("veto_regions_nv", "event_positions_nv"), ( "event_info_double", @@ -75,7 +69,7 @@ ] # These are the dtypes we want to make first if any of them is in to-process list -priority_rank = [ +PRIORITY_RANK = [ "peaklet_classification", "merged_s2s", "peaks", @@ -104,25 +98,14 @@ def get_bottom_dtypes(dtype): return ("peaklets", "lone_hits") -def get_hashes(st): - """Get the hashes for all the data_type in this context.""" - return {dt: item["hash"] for dt, item in st.provided_dtypes().items()} - - -def find_data_to_download(runid, target, st): +def find_data_to_download(st, runid, target): runid_str = f"{runid:06d}" - hashes = get_hashes(st) bottoms = get_bottom_dtypes(target) - - for bottom in bottoms: - if bottom not in hashes: - raise ValueError(f"The dtype {bottom} is not in this context!") - - bottom_hashes = tuple([hashes[b] for b in bottoms]) + bottom_hashes = tuple([st.key_for(runid_str, b).lineage_hash for b in bottoms]) to_download = [] - # all data entries from the RunDB for certain runid + # All data entries from the RunDB for certain runid data = db.get_data(runid, host="rucio-catalogue") def find_data(_target): @@ -132,32 +115,32 @@ def find_data(_target): to download is. Returns a list of tuples (dtype, hash) that need to be downloaded. """ - # check if we have the data already + # Check if we have the data already if all([(d, h) in to_download for d, h in zip(bottoms, bottom_hashes)]): return - # initialize plugin needed for processing + # Initialize plugin needed for processing _plugin = st._get_plugins((_target,), runid_str)[_target] st._set_plugin_config(_plugin, runid_str, tolerant=False) - # download all the required data_type to produce this output file + # Download all the required data_type to produce this output file for in_dtype in _plugin.depends_on: - # get hash for this dtype - hash = hashes.get(in_dtype) + # Get hash for this dtype + hash = st.key_for(runid_str, in_dtype).lineage_hash rses = [d["location"] for d in data if (d["type"] == in_dtype and hash in d["did"])] - # for checking if local path exists - # here st.storage[0] is the local storage like ./data + # For checking if local path exists + # Here st.storage[0] is the local storage like ./data local_path = os.path.join(st.storage[0].path, f"{runid:06d}-{in_dtype}-{hash}") if len(rses) == 0 and not os.path.exists(local_path): - # need to download data to make ths one + # Need to download data to make ths one find_data(in_dtype) else: info = (in_dtype, hash) if info not in to_download: to_download.append(info) - # fills the to_download list + # Fills the to_download list find_data(target) return to_download @@ -166,22 +149,22 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for runid_str = f"{runid:06d}" t0 = time.time() - # initialize plugin needed for processing this output type + # Initialize plugin needed for processing this output type plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype] st._set_plugin_config(plugin, runid_str, tolerant=False) plugin.setup() - # now move on to processing - # if we didn't pass any chunks, we process the whole thing -- + # Now move on to processing + # If we didn't pass any chunks, we process the whole thing -- # otherwise just do the chunks we listed if chunks is None: - # check if we need to save anything, if not, skip this plugin + # Check if we need to save anything, if not, skip this plugin if plugin.save_when[out_dtype] == strax.SaveWhen.NEVER: print("This plugin is not saving anything. Skipping.") return print("Chunks is none -- processing whole thing!") - # then we just process the whole thing + # Then we just process the whole thing for keystring in plugin.provides: print(f"Making {keystring}") # We want to be more tolerant on cuts_basic, because sometimes it is ill-defined @@ -213,9 +196,9 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for print(e) print("--------------------------") - # process chunk-by-chunk + # Process chunk-by-chunk else: - # setup savers + # Setup savers savers = dict() for keystring in plugin.provides: print(f"Making {keystring}") @@ -224,7 +207,7 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for saver.is_forked = True savers[keystring] = saver - # setup a few more variables + # Setup a few more variables in_dtype = plugin.depends_on[0] input_metadata = st.get_metadata(runid_str, in_dtype) input_key = strax.DataKey(runid_str, in_dtype, input_metadata["lineage"]) @@ -249,7 +232,7 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for ) for chunk in chunks: - # read in the input data for this chunk + # Read in the input data for this chunk chunk_info = None for chunk_md in input_metadata["chunks"]: if chunk_md["chunk_i"] == int(chunk): @@ -264,22 +247,22 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for time_range=None, chunk_construction_kwargs=chunk_kwargs, ) - # process this chunk + # Process this chunk output_data = plugin.do_compute(chunk_i=chunk, **{in_dtype: in_data}) if isinstance(output_data, dict): - # save the output -- you have to loop because there could be > 1 output dtypes + # Save the output -- you have to loop because there could be > 1 output dtypes for keystring, strax_chunk in output_data.items(): savers[keystring].save(strax_chunk, chunk_i=int(chunk)) elif isinstance(output_data, strax.Chunk): - # save the output -- you have to loop because there could be > 1 output dtypes + # Save the output -- you have to loop because there could be > 1 output dtypes savers[keystring].save(output_data, chunk_i=int(chunk)) else: raise TypeError("Unknown datatype %s for output" % (type(output_data))) if close_savers: for dtype, saver in savers.items(): - # copy the metadata to a tmp directory + # Copy the metadata to a tmp directory tmpdir = os.path.join(tmp_path, os.path.split(saver.tempdirname)[1]) os.makedirs(tmpdir, exist_ok=True) for file in os.listdir(saver.tempdirname): @@ -303,7 +286,7 @@ def check_chunk_n(directory): if n_chunks != 0: n_metadata_chunks = len(metadata["chunks"]) - # check that the number of chunks in storage + # Check that the number of chunks in storage # is less than or equal to the number of chunks in metadata assert n_chunks == n_metadata_chunks or n_chunks == n_metadata_chunks - 1, ( "For directory %s, \ @@ -316,7 +299,7 @@ def check_chunk_n(directory): compressor = metadata["compressor"] dtype = eval(metadata["dtype"]) - # check that the chunk length is agreed with promise in metadata + # Check that the chunk length is agreed with promise in metadata for i in range(n_chunks): chunk = strax.load_file(files[i], compressor=compressor, dtype=dtype) if metadata["chunks"][i]["n"] != len(chunk): @@ -325,14 +308,14 @@ def check_chunk_n(directory): f"but metadata says {metadata['chunks'][i]['n']}" ) - # check that the last chunk is empty + # Check that the last chunk is empty if n_chunks == n_metadata_chunks - 1: assert ( metadata["chunks"][n_chunks]["n"] == 0 ), "Empty chunk has non-zero length in metadata!" else: - # check that the number of chunks in metadata is 1 + # Check that the number of chunks in metadata is 1 assert ( len(metadata["chunks"]) == 1 ), "There are %s chunks in storage, but metadata says %s" % ( @@ -345,9 +328,9 @@ def check_chunk_n(directory): def main(): parser = argparse.ArgumentParser(description="(Re)Processing With Outsource") parser.add_argument("dataset", help="Run number", type=int) - parser.add_argument("--output", help="desired strax(en) output") parser.add_argument("--context", help="name of context") parser.add_argument("--xedocs_version", help="xedocs global version") + parser.add_argument("--output", help="desired strax(en) output") parser.add_argument("--chunks", nargs="*", help="chunk ids to download", type=int) parser.add_argument("--upload-to-rucio", action="store_true", dest="upload_to_rucio") parser.add_argument("--update-db", action="store_true", dest="update_db") @@ -356,14 +339,14 @@ def main(): args = parser.parse_args() - # directory where we will be putting everything + # Directory where we will be putting everything data_dir = "./data" - # make sure this is empty + # Make sure this is empty # if os.path.exists(data_dir): # rmtree(data_dir) - # get context + # Get context st = getattr(cutax.contexts, args.context)(xedocs_version=args.xedocs_version) # st.storage = [ # strax.DataDirectory(data_dir), @@ -376,8 +359,8 @@ def main(): straxen.storage.RucioRemoteFrontend(download_heavy=True), ] - # add local frontend if we can - # this is a temporary hack + # Add local frontend if we can + # This is a temporary hack try: st.storage.append(straxen.storage.RucioLocalFrontend()) except KeyError: @@ -390,35 +373,35 @@ def main(): out_dtype = args.output # eg. ypically for tpc: peaklets/event_info print("Getting to-download list...") - to_download = find_data_to_download(runid, out_dtype, st) + to_download = find_data_to_download(st, runid, out_dtype) print("Got to-download list!") - # see if we have rucio local frontend - # if we do, it's probably more efficient to download data through the rucio frontend + # See if we have rucio local frontend + # If we do, it's probably more efficient to download data through the rucio frontend - for buddies in buddy_dtypes: + for buddies in BUDDY_DTYPES: if out_dtype in buddies: for other_dtype in buddies: if other_dtype == out_dtype: continue - to_download.extend(find_data_to_download(runid, other_dtype, st)) - # remove duplicates + to_download.extend(find_data_to_download(st, runid, other_dtype)) + # Remove duplicates to_download = list(set(to_download)) - # initialize plugin needed for processing this output type + # Initialize plugin needed for processing this output type plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype] st._set_plugin_config(plugin, runid_str, tolerant=False) plugin.setup() - # figure out what plugins we need to process/initialize + # Figure out what plugins we need to process/initialize to_process = [args.output] - for buddies in buddy_dtypes: + for buddies in BUDDY_DTYPES: if args.output in buddies: to_process = list(buddies) - # remove duplicates + # Remove duplicates to_process = list(set(to_process)) - # keep track of the data we can download now -- will be important for the upload step later + # Keep track of the data we can download now -- will be important for the upload step later available_dtypes = st.available_for_run(runid_str) available_dtypes = available_dtypes[available_dtypes.is_stored].target.values.tolist() @@ -426,12 +409,12 @@ def main(): intermediates = missing.copy() to_process = list(intermediates) + to_process - # now we need to figure out what intermediate data we need to make + # Now we need to figure out what intermediate data we need to make while len(intermediates) > 0: new_intermediates = [] for _dtype in intermediates: _plugin = st._get_plugins((_dtype,), runid_str)[_dtype] - # adding missing dependencies to to-process list + # Adding missing dependencies to to-process list for dependency in _plugin.depends_on: if dependency not in available_dtypes: if dependency not in to_process: @@ -439,7 +422,7 @@ def main(): new_intermediates.append(dependency) intermediates = new_intermediates - # remove any raw data + # Remove any raw data to_process = [dtype for dtype in to_process if dtype not in admix.utils.RAW_DTYPES] missing = [d for d in to_process if d != args.output] @@ -454,14 +437,14 @@ def main(): if args.download_only: sys.exit(0) - # If to-process has anything in priority_rank, we process them first - if len(set(priority_rank) & set(to_process)) > 0: - # remove any prioritized dtypes that are not in to_process - filtered_priority_rank = [dtype for dtype in priority_rank if dtype in to_process] - # remove the priority_rank dtypes from to_process, as low priority data_type which we don't + # If to-process has anything in PRIORITY_RANK, we process them first + if len(set(PRIORITY_RANK) & set(to_process)) > 0: + # Remove any prioritized dtypes that are not in to_process + filtered_priority_rank = [dtype for dtype in PRIORITY_RANK if dtype in to_process] + # Remove the PRIORITY_RANK dtypes from to_process, as low priority data_type which we don't # rigorously care their order to_process_low_priority = [dt for dt in to_process if dt not in filtered_priority_rank] - # sort the priority by their dependencies + # Sort the priority by their dependencies to_process = filtered_priority_rank + to_process_low_priority print(f"To process: {', '.join(to_process)}") @@ -473,7 +456,7 @@ def main(): print("Done processing. Now check if we should upload to rucio") - # now we move the tmpfiles back to main directory, if needed + # Now we move the tmpfiles back to main directory, if needed # this is for cases where we went from raw_records-->records-->peaklets in one go if os.path.exists(_tmp_path): for dtype_path_thing in os.listdir(_tmp_path): @@ -485,14 +468,14 @@ def main(): os.rename(merged_dir, os.path.join(data_dir, dtype_path_thing)) - # if we processed the entire run, we upload everything including metadata - # otherwise, we just upload the chunks + # If we processed the entire run, we upload everything including metadata + # Otherwise, we just upload the chunks upload_meta = args.chunks is None - # remove rucio directory + # Remove rucio directory rmtree(st.storage[1]._get_backend("RucioRemoteBackend").staging_dir) - # now loop over data_type we just made and upload the data + # Now loop over data_type we just made and upload the data processed_data = [d for d in os.listdir(data_dir) if "_temp" not in d] print("---- Processed data ----") for d in processed_data: @@ -500,11 +483,11 @@ def main(): print("------------------------\n") for dirname in processed_data: - # get rucio dataset + # Get rucio dataset this_run, this_dtype, this_hash = dirname.split("-") - # remove data we do not want to upload - if this_dtype in ignore_dtypes: + # Remove data we do not want to upload + if this_dtype in IGNORE_DTYPES: print(f"Removing {this_dtype} instead of uploading") shutil.rmtree(os.path.join(data_dir, dirname)) continue @@ -513,7 +496,7 @@ def main(): print("Ignoring rucio upload") continue - # based on the dtype and the utilix config, where should this data go? + # Based on the dtype and the utilix config, where should this data go? if this_dtype in ["records", "pulse_counts", "veto_regions", "records_nv", "records_he"]: rse = uconfig.get("Outsource", "records_rse") elif this_dtype in ["peaklets", "lone_hits", "merged_s2s", "hitlets_nv"]: @@ -521,11 +504,11 @@ def main(): else: rse = uconfig.get("Outsource", "events_rse") - if this_dtype in rechunk_dtypes: + if this_dtype in RECHUNK_DTYPES: print(f"Skipping upload of {this_dtype} since we need to rechunk it") continue - # remove the _temp if we are processing chunks in parallel + # Remove the _temp if we are processing chunks in parallel if args.chunks is not None: this_hash = this_hash.replace("_temp", "") dataset_did = admix.utils.make_did(int(this_run), this_dtype, this_hash) @@ -537,7 +520,7 @@ def main(): if not upload_meta: files = [f for f in files if not f.endswith(".json")] - # check that the output number of files is what we expect + # Check that the output number of files is what we expect if len(files) != len(args.chunks): processed_chunks = set([int(f.split("-")[-1]) for f in files]) expected_chunks = set(args.chunks) @@ -548,12 +531,12 @@ def main(): f"{missing_chunks}" ) - # if there are no files, we can't upload them + # If there are no files, we can't upload them if len(files) == 0: print(f"No files to upload in {dirname}. Skipping.") continue - # get list of files that have already been uploaded + # Get list of files that have already been uploaded # this is to allow us re-run workflow for some chunks try: existing_files = [ @@ -563,10 +546,10 @@ def main(): existing_files_in_dataset = admix.rucio.list_files(dataset_did) - # for some reason files get uploaded but not attached correctly + # For some reason files get uploaded but not attached correctly need_attached = list(set(existing_files) - set(existing_files_in_dataset)) - # only consider the chunks here + # Only consider the chunks here if args.chunks: need_attached = [ f for f in need_attached if str(int(f.split("-")[-1])) in args.chunks @@ -616,15 +599,15 @@ def main(): # TODO: check rucio that the files are there? print(f"Upload of {len(files)} files in {dirname} finished successfully") - # if we processed the whole thing, add a rule at DALI update the RunDB here + # If we processed the whole thing, add a rule at DALI update the RunDB here if args.chunks is None: - # skip if update_db flag is false, or if the rucio upload failed + # Skip if update_db flag is false, or if the rucio upload failed if args.update_db and succeded_rucio_upload: md = st.get_meta(runid_str, this_dtype) chunk_mb = [chunk["nbytes"] / (1e6) for chunk in md["chunks"]] data_size_mb = np.sum(chunk_mb) - # update RunDB + # Update RunDB new_data_dict = dict() new_data_dict["location"] = rse new_data_dict["did"] = dataset_did @@ -643,8 +626,8 @@ def main(): db.update_data(runid, new_data_dict) print(f"Database updated for {this_dtype} at {rse}") - # cleanup the files we uploaded - # this is likely only done for records data because we will rechunk the others + # Cleanup the files we uploaded + # This is likely only done for records data because we will rechunk the others for f in files: print(f"Removing {f}") os.remove(os.path.join(data_dir, dirname, f)) diff --git a/outsource/workflow/upload.py b/outsource/workflow/upload.py index 0de80e4..a06c244 100755 --- a/outsource/workflow/upload.py +++ b/outsource/workflow/upload.py @@ -5,7 +5,7 @@ import argparse import datetime -# make sure we don't use any custom paths from e.g. pth files +# Make sure we don't use any custom paths from e.g. pth files for p in list(sys.path): if os.environ.get("HOME", " 0123456789 ") in p: sys.path.remove(p) @@ -37,7 +37,7 @@ def main(): dtype = args.dtype rse = args.rse - # get context + # Get context st = eval(f"straxen.contexts.{args.context}()") st.storage = [strax.DataDirectory("data")] @@ -56,10 +56,10 @@ def main(): nfiles = len(os.listdir(upload_path)) print(f"Uploading {dirname}, which has {nfiles} files") - # make a rucio DID + # Make a rucio DID did = make_did(runid, keystring, hash) - # check if a rule already exists for this DID + # Check if a rule already exists for this DID rucio_rule = rc.GetRule(upload_structure=did) file_count = len(os.listdir(upload_path)) @@ -91,7 +91,7 @@ def main(): straxen_version=straxen.__version__, ) - # if not in rucio already and no rule exists, upload into rucio + # If not in rucio already and no rule exists, upload into rucio if not rucio_rule["exists"]: t0 = time.time() if not args.ignore_rundb: @@ -102,7 +102,7 @@ def main(): tf = time.time() upload_time = (tf - t0) / 60 print(f"=== Upload of {did} took {upload_time} minutes") - # check that upload was successful + # Check that upload was successful new_rule = rc.GetRule(upload_structure=did, rse=rse) if new_rule["state"] == "OK" and not args.ignore_rundb: