diff --git a/outsource/outsource.py b/outsource/outsource.py index c1291c9..5f9eed2 100644 --- a/outsource/outsource.py +++ b/outsource/outsource.py @@ -8,6 +8,7 @@ from datetime import datetime import numpy as np from tqdm import tqdm +import utilix from utilix import DB, uconfig from utilix.x509 import _validate_x509_proxy from utilix.tarball import Tarball @@ -162,11 +163,11 @@ def pegasus_config(self): pconfig["pegasus.gridstart.arguments"] = "-f" pconfig["pegasus.mode"] = "development" # give jobs a total of {retry} + 1 tries - pconfig["dagman.retry"] = 2 + pconfig["dagman.retry"] = uconfig.getint("Outsource", "dagman_retry", fallback=2) # make sure we do start too many jobs at the same time - pconfig["dagman.maxidle"] = 5_000 + pconfig["dagman.maxidle"] = uconfig.getint("Outsource", "dagman_maxidle", fallback=5_000) # total number of jobs cap - pconfig["dagman.maxjobs"] = 300 + pconfig["dagman.maxjobs"] = uconfig.getint("Outsource", "dagman_maxjobs", fallback=300) # transfer parallelism pconfig["pegasus.transfer.threads"] = 1 @@ -412,7 +413,11 @@ def _generate_workflow(self): # script to install packages installsh = File("install.sh") - rc.add_replica("local", "install.sh", f"file://{base_dir}/workflow/install.sh") + rc.add_replica( + "local", + "install.sh", + f"file://{os.path.join(os.path.dirname(utilix.__file__), 'install.sh')}", + ) # Add common data files to the replica catalog xenon_config = File(".xenon_config") diff --git a/outsource/workflow/combine-wrapper.sh b/outsource/workflow/combine-wrapper.sh index ec51204..cd35f7d 100755 --- a/outsource/workflow/combine-wrapper.sh +++ b/outsource/workflow/combine-wrapper.sh @@ -50,14 +50,14 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then fi # Installing customized packages -. install.sh +. install.sh strax straxen cutax # Combine the data time python combine.py ${run_id} ${dtype} --context ${context} --xedocs_version ${xedocs_version} --input data ${combine_extra_args} # Check data dir again -echo "data dir:" +echo "Here is what is in the data directory after combining:" ls -l data # tar up the output -tar -czfv ${output} finished_data +tar czfv ${output} finished_data diff --git a/outsource/workflow/install.sh b/outsource/workflow/install.sh deleted file mode 100644 index 945eb2b..0000000 --- a/outsource/workflow/install.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash - -set -e - -# List of packages -packages=("strax" "straxen" "cutax") - -# Loop through each package -for package in "${packages[@]}" -do - # Check if the tarball exists - if [ ! -f "$package.tar.gz" ]; then - echo "Tarball $package.tar.gz not found. Skipping $package." - echo - continue - fi - - echo "Installing $package:" - - # Create a directory for the package - mkdir -p $package - - # Extract the tarball to the package directory - tar -xzf $package.tar.gz -C $package --strip-components=1 - - # Install the package in very quiet mode by -qq - pip install ./$package --user --no-deps -qq - - # Verify the installation by importing the package - python -c "import $package; print($package.__file__)" - - echo "$package installation complete." - echo -done - -straxen_print_versions diff --git a/outsource/workflow/process-wrapper.sh b/outsource/workflow/process-wrapper.sh index 50da998..4af9780 100755 --- a/outsource/workflow/process-wrapper.sh +++ b/outsource/workflow/process-wrapper.sh @@ -15,7 +15,8 @@ chunks=${args[@]:8} echo $@ -echo "Processing chunks: $chunks" +echo "Processing chunks:" +echo "$chunks" extraflags="" @@ -70,13 +71,13 @@ echo if [ "X${standalone_download}" = "Xno-download" ]; then # We are given a tarball from the previous download job - echo "Untaring input data..." + echo "Untar input data:" tar -xzf *-data*.tar.gz fi # Installing customized packages -. install.sh +. install.sh strax straxen cutax # See if we have any input tarballs @@ -86,27 +87,25 @@ if [ -f ./$runid_pad*.tar.gz ]; then mkdir data for tarball in $(ls $runid_pad*.tar.gz) do - echo "Untarring input: $tarball" + echo "Untarr input: $tarball:" tar -xzf $tarball -C data --strip-components=1 done fi echo -echo "Check RunDB API:" -echo "Pinging xenon-runsdb.grid.uchicago.edu" -ping -c 5 xenon-runsdb.grid.uchicago.edu -echo -echo "Checking if we have .dbtoken" +# echo "Check network:" +# echo "ping -c 5 xenon-runsdb.grid.uchicago.edu" +# ping -c 5 xenon-runsdb.grid.uchicago.edu +# echo +echo "Checking if we have .dbtoken:" echo "ls -lah $HOME/.dbtoken" ls -lah $HOME/.dbtoken -echo "ls -lah $USERPROFILE/.dbtoken" -la -lah $USERPROFILE/.dbtoken echo # echo "nmap xenon-runsdb.grid.uchicago.edu" # map -p5000 xenon-runsdb.grid.uchicago.edu # echo -echo "Processing now..." +echo "Processing:" chunkarg="" if [ -n "${chunks}" ] @@ -128,14 +127,14 @@ echo "We want to find and delete any records or records_nv if existing, to save find data -type d \( -name "*-records-*" -o -name "*-records_nv-*" \) -exec rm -rf {} + if [ "X${standalone_download}" = "Xdownload-only" ]; then - echo "We are tarballing the data directory for download-only job." - tar -czfv ${output_tar} data + echo "We are tarballing the data directory for download-only job:" + tar czfv ${output_tar} data elif [ "X${output_dtype}" = "Xevent_info_double" ] || [ "X${output_dtype}" = "Xevents_mv" ] || [ "X${output_dtype}" = "Xevents_nv" ]; then - echo "We are tarballing the data directory for ${output_dtype} job." - tar -czfv ${output_tar} data + echo "We are tarballing the data directory for ${output_dtype} job:" + tar czfv ${output_tar} data else - echo "We are tarballing the data directory, but only for those with _temp." - tar -czfv ${output_tar} data/*_temp + echo "We are tarballing the data directory, but only for those with _temp:" + tar czfv ${output_tar} data/*_temp fi echo diff --git a/outsource/workflow/process.py b/outsource/workflow/process.py index 4b468c5..fdd89c7 100755 --- a/outsource/workflow/process.py +++ b/outsource/workflow/process.py @@ -98,15 +98,15 @@ def get_bottom_dtypes(dtype): return ("peaklets", "lone_hits") -def find_data_to_download(st, runid, target): - runid_str = f"{runid:06d}" +def find_data_to_download(st, run_id, target): + run_id_str = f"{run_id:06d}" bottoms = get_bottom_dtypes(target) - bottom_hashes = tuple([st.key_for(runid_str, b).lineage_hash for b in bottoms]) + bottom_hashes = tuple([st.key_for(run_id_str, b).lineage_hash for b in bottoms]) to_download = [] - # All data entries from the RunDB for certain runid - data = db.get_data(runid, host="rucio-catalogue") + # All data entries from the RunDB for certain run_id + data = db.get_data(run_id, host="rucio-catalogue") def find_data(_target): """Recursively find all the data needed to make the target dtype. @@ -120,17 +120,17 @@ def find_data(_target): return # Initialize plugin needed for processing - _plugin = st._get_plugins((_target,), runid_str)[_target] - st._set_plugin_config(_plugin, runid_str, tolerant=False) + _plugin = st._get_plugins((_target,), run_id_str)[_target] + st._set_plugin_config(_plugin, run_id_str, tolerant=False) # Download all the required data_type to produce this output file for in_dtype in _plugin.depends_on: # Get hash for this dtype - hash = st.key_for(runid_str, in_dtype).lineage_hash + hash = st.key_for(run_id_str, in_dtype).lineage_hash rses = [d["location"] for d in data if (d["type"] == in_dtype and hash in d["did"])] # For checking if local path exists # Here st.storage[0] is the local storage like ./data - local_path = os.path.join(st.storage[0].path, f"{runid:06d}-{in_dtype}-{hash}") + local_path = os.path.join(st.storage[0].path, f"{run_id:06d}-{in_dtype}-{hash}") if len(rses) == 0 and not os.path.exists(local_path): # Need to download data to make ths one @@ -145,13 +145,20 @@ def find_data(_target): return to_download -def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for_strax"): - runid_str = f"{runid:06d}" +def process(run_id, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for_strax"): + run_id_str = f"{run_id:06d}" t0 = time.time() + # st.make( + # run_id_str, + # out_dtype, + # chunk_number={"raw_records": chunks}, + # processor="single_thread", + # ) + # Initialize plugin needed for processing this output type - plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype] - st._set_plugin_config(plugin, runid_str, tolerant=False) + plugin = st._get_plugins((out_dtype,), run_id_str)[out_dtype] + st._set_plugin_config(plugin, run_id_str, tolerant=False) plugin.setup() # Now move on to processing @@ -170,7 +177,7 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for # We want to be more tolerant on cuts_basic, because sometimes it is ill-defined if keystring == "cuts_basic": try: - st.make(runid_str, keystring, save=keystring, processor="single_thread") + st.make(run_id_str, keystring, save=keystring, processor="single_thread") except Exception as e: print( f"Failed to make {keystring}, but it might be " @@ -179,19 +186,19 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for print("Below is the error:") print(e) else: - st.make(runid_str, keystring, save=keystring, processor="single_thread") + st.make(run_id_str, keystring, save=keystring, processor="single_thread") print(f"DONE processing {keystring}") # Test if the data is complete try: print( "Try loading data in %s to see if it is complete." - % (runid_str + "-" + keystring) + % (run_id_str + "-" + keystring) ) - st.get_array(runid_str, keystring, keep_columns="time", progress_bar=False) - print("Successfully loaded %s! It is complete." % (runid_str + "-" + keystring)) + st.get_array(run_id_str, keystring, keep_columns="time", progress_bar=False) + print("Successfully loaded %s! It is complete." % (run_id_str + "-" + keystring)) except Exception as e: - print(f"Data is not complete for {runid_str + '-' + keystring}. Skipping") + print(f"Data is not complete for {run_id_str + '-' + keystring}. Skipping") print("Below is the error message we get when trying to load the data:") print(e) print("--------------------------") @@ -202,15 +209,15 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for savers = dict() for keystring in plugin.provides: print(f"Making {keystring}") - key = strax.DataKey(runid_str, keystring, plugin.lineage) - saver = st.storage[0].saver(key, plugin.metadata(runid, keystring)) + key = strax.DataKey(run_id_str, keystring, plugin.lineage) + saver = st.storage[0].saver(key, plugin.metadata(run_id, keystring)) saver.is_forked = True savers[keystring] = saver # Setup a few more variables in_dtype = plugin.depends_on[0] - input_metadata = st.get_metadata(runid_str, in_dtype) - input_key = strax.DataKey(runid_str, in_dtype, input_metadata["lineage"]) + input_metadata = st.get_metadata(run_id_str, in_dtype) + input_key = strax.DataKey(run_id_str, in_dtype, input_metadata["lineage"]) backend = None backend_key = None @@ -368,12 +375,12 @@ def main(): print("Context is set up!") - runid = args.dataset - runid_str = f"{runid:06d}" + run_id = args.dataset + run_id_str = f"{run_id:06d}" out_dtype = args.output # eg. ypically for tpc: peaklets/event_info print("Getting to-download list...") - to_download = find_data_to_download(st, runid, out_dtype) + to_download = find_data_to_download(st, run_id, out_dtype) print("Got to-download list!") # See if we have rucio local frontend @@ -384,13 +391,13 @@ def main(): for other_dtype in buddies: if other_dtype == out_dtype: continue - to_download.extend(find_data_to_download(st, runid, other_dtype)) + to_download.extend(find_data_to_download(st, run_id, other_dtype)) # Remove duplicates to_download = list(set(to_download)) # Initialize plugin needed for processing this output type - plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype] - st._set_plugin_config(plugin, runid_str, tolerant=False) + plugin = st._get_plugins((out_dtype,), run_id_str)[out_dtype] + st._set_plugin_config(plugin, run_id_str, tolerant=False) plugin.setup() # Figure out what plugins we need to process/initialize @@ -402,7 +409,7 @@ def main(): to_process = list(set(to_process)) # Keep track of the data we can download now -- will be important for the upload step later - available_dtypes = st.available_for_run(runid_str) + available_dtypes = st.available_for_run(run_id_str) available_dtypes = available_dtypes[available_dtypes.is_stored].target.values.tolist() missing = set(plugin.depends_on) - set(available_dtypes) @@ -413,7 +420,7 @@ def main(): while len(intermediates) > 0: new_intermediates = [] for _dtype in intermediates: - _plugin = st._get_plugins((_dtype,), runid_str)[_dtype] + _plugin = st._get_plugins((_dtype,), run_id_str)[_dtype] # Adding missing dependencies to to-process list for dependency in _plugin.depends_on: if dependency not in available_dtypes: @@ -451,7 +458,7 @@ def main(): _tmp_path = tempfile.mkdtemp() for dtype in to_process: close_savers = dtype != args.output - process(runid, dtype, st, args.chunks, close_savers=close_savers, tmp_path=_tmp_path) + process(run_id, dtype, st, args.chunks, close_savers=close_savers, tmp_path=_tmp_path) gc.collect() print("Done processing. Now check if we should upload to rucio") @@ -603,7 +610,7 @@ def main(): if args.chunks is None: # Skip if update_db flag is false, or if the rucio upload failed if args.update_db and succeded_rucio_upload: - md = st.get_meta(runid_str, this_dtype) + md = st.get_meta(run_id_str, this_dtype) chunk_mb = [chunk["nbytes"] / (1e6) for chunk in md["chunks"]] data_size_mb = np.sum(chunk_mb) @@ -623,7 +630,7 @@ def main(): size_mb=data_size_mb, ) - db.update_data(runid, new_data_dict) + db.update_data(run_id, new_data_dict) print(f"Database updated for {this_dtype} at {rse}") # Cleanup the files we uploaded diff --git a/outsource/workflow/update_run_db.py b/outsource/workflow/update_run_db.py deleted file mode 100644 index 2b94ceb..0000000 --- a/outsource/workflow/update_run_db.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import argparse - -from pprint import pprint - -from utilix import rundb - - -def update_status(run_id, dtype, status): - db = rundb.DB() - data = db.get_data(run_id) - for data_set in data: - if "type" in data_set and data_set["type"] == dtype: - if data_set["checksum"] is None: - data_set["checksum"] = "None" - data_set["status"] = status - print("\nUpdating the following data section to:") - pprint(data_set) - db.update_data(run_id, data_set) - - -def main(): - # top-level parser - parser = argparse.ArgumentParser(prog="update_run_db") - subparsers = parser.add_subparsers( - title="subcommands", dest="cmd", description="valid subcommands", help="sub-command help" - ) - - # sub parser for 'update-status' - parser_a = subparsers.add_parser("update-status", help="Sets the status of a run/data") - parser_a.add_argument("--run-id", type=int, help="The run id") - parser_a.add_argument("--dtype", type=str, help="The dtype of the data record") - parser_a.add_argument("--status", type=str, help="The new status") - - # sub parser for 'add-results' - parser_b = subparsers.add_parser("add-results", help="b help") - parser_b.add_argument("--baz", choices="XYZ", help="baz help") - - args = parser.parse_args() - - print(args) - - if args.cmd == "update-status": - update_status(args.run_id, args.dtype, args.status) - elif args.cmd == "add-results": - print("Not implemented yet...") - sys.exit(1) - else: - print("Unknown command") - parser.print_help() - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/outsource/workflow/upload.py b/outsource/workflow/upload.py deleted file mode 100755 index a06c244..0000000 --- a/outsource/workflow/upload.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python - -import sys -import os -import argparse -import datetime - -# Make sure we don't use any custom paths from e.g. pth files -for p in list(sys.path): - if os.environ.get("HOME", " 0123456789 ") in p: - sys.path.remove(p) - -import strax -import straxen -from utilix import db -import numpy as np -import time - -from admix.interfaces.rucio_summoner import RucioSummoner -from admix.utils.naming import make_did - - -def main(): - parser = argparse.ArgumentParser(description="Upload combined output to rucio") - parser.add_argument("dataset", help="Run number", type=int) - parser.add_argument("dtype", help="dtype to upload") - parser.add_argument("rse", help="Targeted RSE") - parser.add_argument("--context", help="name of context") - parser.add_argument( - "--ignore-db", help="flag to not update runsDB", dest="ignore_rundb", action="store_true" - ) - - args = parser.parse_args() - - runid = args.dataset - runid_str = f"{runid:06d}" - dtype = args.dtype - rse = args.rse - - # Get context - st = eval(f"straxen.contexts.{args.context}()") - st.storage = [strax.DataDirectory("data")] - - plugin = st._get_plugins((dtype,), runid_str)[dtype] - - rc = RucioSummoner() - - for keystring in plugin.provides: - key = strax.DataKey(runid_str, keystring, plugin.lineage) - hash = key.lineage_hash - # TODO: check with utilix DB call that the hashes match? - - dirname = f"{runid_str}-{keystring}-{hash}" - upload_path = os.path.join("data", dirname) - - nfiles = len(os.listdir(upload_path)) - print(f"Uploading {dirname}, which has {nfiles} files") - - # Make a rucio DID - did = make_did(runid, keystring, hash) - - # Check if a rule already exists for this DID - rucio_rule = rc.GetRule(upload_structure=did) - - file_count = len(os.listdir(upload_path)) - # TODO: check number of files is consistent with what we expect - - md = st.get_meta(runid_str, keystring) - - chunk_mb = [chunk["nbytes"] / (1e6) for chunk in md["chunks"]] - data_size_mb = int(np.sum(chunk_mb)) - avg_data_size_mb = int(np.average(chunk_mb)) - lineage_hash = md["lineage_hash"] - - new_data_dict = dict() - new_data_dict["location"] = rse - new_data_dict["did"] = did - new_data_dict["status"] = "uploading" - new_data_dict["host"] = "rucio-catalogue" - new_data_dict["type"] = keystring - new_data_dict["protocol"] = "rucio" - new_data_dict["creation_time"] = datetime.datetime.utcnow().isoformat() - new_data_dict["creation_place"] = "OSG" - new_data_dict["file_count"] = file_count - new_data_dict["meta"] = dict( - lineage=lineage_hash, - avg_chunk_mb=avg_data_size_mb, - file_count=file_count, - size_mb=data_size_mb, - strax_version=strax.__version__, - straxen_version=straxen.__version__, - ) - - # If not in rucio already and no rule exists, upload into rucio - if not rucio_rule["exists"]: - t0 = time.time() - if not args.ignore_rundb: - db.update_data(runid, new_data_dict) - - rc.Upload(did, upload_path, rse, lifetime=None) - - tf = time.time() - upload_time = (tf - t0) / 60 - print(f"=== Upload of {did} took {upload_time} minutes") - # Check that upload was successful - new_rule = rc.GetRule(upload_structure=did, rse=rse) - - if new_rule["state"] == "OK" and not args.ignore_rundb: - new_data_dict["status"] = "transferred" - db.update_data(runid, new_data_dict) - else: - print(f"Rucio rule already exists for {did}") - sys.exit(1) - - -if __name__ == "__main__": - main()