Skip to content

Commit

Permalink
Remove unused python files (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
dachengx authored Sep 21, 2024
1 parent 8518801 commit 5570815
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 269 deletions.
13 changes: 9 additions & 4 deletions outsource/outsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime
import numpy as np
from tqdm import tqdm
import utilix
from utilix import DB, uconfig
from utilix.x509 import _validate_x509_proxy
from utilix.tarball import Tarball
Expand Down Expand Up @@ -162,11 +163,11 @@ def pegasus_config(self):
pconfig["pegasus.gridstart.arguments"] = "-f"
pconfig["pegasus.mode"] = "development"
# give jobs a total of {retry} + 1 tries
pconfig["dagman.retry"] = 2
pconfig["dagman.retry"] = uconfig.getint("Outsource", "dagman_retry", fallback=2)
# make sure we do start too many jobs at the same time
pconfig["dagman.maxidle"] = 5_000
pconfig["dagman.maxidle"] = uconfig.getint("Outsource", "dagman_maxidle", fallback=5_000)
# total number of jobs cap
pconfig["dagman.maxjobs"] = 300
pconfig["dagman.maxjobs"] = uconfig.getint("Outsource", "dagman_maxjobs", fallback=300)
# transfer parallelism
pconfig["pegasus.transfer.threads"] = 1

Expand Down Expand Up @@ -412,7 +413,11 @@ def _generate_workflow(self):

# script to install packages
installsh = File("install.sh")
rc.add_replica("local", "install.sh", f"file://{base_dir}/workflow/install.sh")
rc.add_replica(
"local",
"install.sh",
f"file://{os.path.join(os.path.dirname(utilix.__file__), 'install.sh')}",
)

# Add common data files to the replica catalog
xenon_config = File(".xenon_config")
Expand Down
6 changes: 3 additions & 3 deletions outsource/workflow/combine-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then
fi

# Installing customized packages
. install.sh
. install.sh strax straxen cutax

# Combine the data
time python combine.py ${run_id} ${dtype} --context ${context} --xedocs_version ${xedocs_version} --input data ${combine_extra_args}

# Check data dir again
echo "data dir:"
echo "Here is what is in the data directory after combining:"
ls -l data

# tar up the output
tar -czfv ${output} finished_data
tar czfv ${output} finished_data
36 changes: 0 additions & 36 deletions outsource/workflow/install.sh

This file was deleted.

35 changes: 17 additions & 18 deletions outsource/workflow/process-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ chunks=${args[@]:8}

echo $@

echo "Processing chunks: $chunks"
echo "Processing chunks:"
echo "$chunks"

extraflags=""

Expand Down Expand Up @@ -70,13 +71,13 @@ echo

if [ "X${standalone_download}" = "Xno-download" ]; then
# We are given a tarball from the previous download job
echo "Untaring input data..."
echo "Untar input data:"
tar -xzf *-data*.tar.gz
fi


# Installing customized packages
. install.sh
. install.sh strax straxen cutax


# See if we have any input tarballs
Expand All @@ -86,27 +87,25 @@ if [ -f ./$runid_pad*.tar.gz ]; then
mkdir data
for tarball in $(ls $runid_pad*.tar.gz)
do
echo "Untarring input: $tarball"
echo "Untarr input: $tarball:"
tar -xzf $tarball -C data --strip-components=1
done
fi
echo

echo "Check RunDB API:"
echo "Pinging xenon-runsdb.grid.uchicago.edu"
ping -c 5 xenon-runsdb.grid.uchicago.edu
echo
echo "Checking if we have .dbtoken"
# echo "Check network:"
# echo "ping -c 5 xenon-runsdb.grid.uchicago.edu"
# ping -c 5 xenon-runsdb.grid.uchicago.edu
# echo
echo "Checking if we have .dbtoken:"
echo "ls -lah $HOME/.dbtoken"
ls -lah $HOME/.dbtoken
echo "ls -lah $USERPROFILE/.dbtoken"
la -lah $USERPROFILE/.dbtoken
echo
# echo "nmap xenon-runsdb.grid.uchicago.edu"
# map -p5000 xenon-runsdb.grid.uchicago.edu
# echo

echo "Processing now..."
echo "Processing:"

chunkarg=""
if [ -n "${chunks}" ]
Expand All @@ -128,14 +127,14 @@ echo "We want to find and delete any records or records_nv if existing, to save
find data -type d \( -name "*-records-*" -o -name "*-records_nv-*" \) -exec rm -rf {} +

if [ "X${standalone_download}" = "Xdownload-only" ]; then
echo "We are tarballing the data directory for download-only job."
tar -czfv ${output_tar} data
echo "We are tarballing the data directory for download-only job:"
tar czfv ${output_tar} data
elif [ "X${output_dtype}" = "Xevent_info_double" ] || [ "X${output_dtype}" = "Xevents_mv" ] || [ "X${output_dtype}" = "Xevents_nv" ]; then
echo "We are tarballing the data directory for ${output_dtype} job."
tar -czfv ${output_tar} data
echo "We are tarballing the data directory for ${output_dtype} job:"
tar czfv ${output_tar} data
else
echo "We are tarballing the data directory, but only for those with _temp."
tar -czfv ${output_tar} data/*_temp
echo "We are tarballing the data directory, but only for those with _temp:"
tar czfv ${output_tar} data/*_temp
fi

echo
Expand Down
75 changes: 41 additions & 34 deletions outsource/workflow/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ def get_bottom_dtypes(dtype):
return ("peaklets", "lone_hits")


def find_data_to_download(st, runid, target):
runid_str = f"{runid:06d}"
def find_data_to_download(st, run_id, target):
run_id_str = f"{run_id:06d}"
bottoms = get_bottom_dtypes(target)
bottom_hashes = tuple([st.key_for(runid_str, b).lineage_hash for b in bottoms])
bottom_hashes = tuple([st.key_for(run_id_str, b).lineage_hash for b in bottoms])

to_download = []

# All data entries from the RunDB for certain runid
data = db.get_data(runid, host="rucio-catalogue")
# All data entries from the RunDB for certain run_id
data = db.get_data(run_id, host="rucio-catalogue")

def find_data(_target):
"""Recursively find all the data needed to make the target dtype.
Expand All @@ -120,17 +120,17 @@ def find_data(_target):
return

# Initialize plugin needed for processing
_plugin = st._get_plugins((_target,), runid_str)[_target]
st._set_plugin_config(_plugin, runid_str, tolerant=False)
_plugin = st._get_plugins((_target,), run_id_str)[_target]
st._set_plugin_config(_plugin, run_id_str, tolerant=False)

# Download all the required data_type to produce this output file
for in_dtype in _plugin.depends_on:
# Get hash for this dtype
hash = st.key_for(runid_str, in_dtype).lineage_hash
hash = st.key_for(run_id_str, in_dtype).lineage_hash
rses = [d["location"] for d in data if (d["type"] == in_dtype and hash in d["did"])]
# For checking if local path exists
# Here st.storage[0] is the local storage like ./data
local_path = os.path.join(st.storage[0].path, f"{runid:06d}-{in_dtype}-{hash}")
local_path = os.path.join(st.storage[0].path, f"{run_id:06d}-{in_dtype}-{hash}")

if len(rses) == 0 and not os.path.exists(local_path):
# Need to download data to make ths one
Expand All @@ -145,13 +145,20 @@ def find_data(_target):
return to_download


def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for_strax"):
runid_str = f"{runid:06d}"
def process(run_id, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for_strax"):
run_id_str = f"{run_id:06d}"
t0 = time.time()

# st.make(
# run_id_str,
# out_dtype,
# chunk_number={"raw_records": chunks},
# processor="single_thread",
# )

# Initialize plugin needed for processing this output type
plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype]
st._set_plugin_config(plugin, runid_str, tolerant=False)
plugin = st._get_plugins((out_dtype,), run_id_str)[out_dtype]
st._set_plugin_config(plugin, run_id_str, tolerant=False)
plugin.setup()

# Now move on to processing
Expand All @@ -170,7 +177,7 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for
# We want to be more tolerant on cuts_basic, because sometimes it is ill-defined
if keystring == "cuts_basic":
try:
st.make(runid_str, keystring, save=keystring, processor="single_thread")
st.make(run_id_str, keystring, save=keystring, processor="single_thread")
except Exception as e:
print(
f"Failed to make {keystring}, but it might be "
Expand All @@ -179,19 +186,19 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for
print("Below is the error:")
print(e)
else:
st.make(runid_str, keystring, save=keystring, processor="single_thread")
st.make(run_id_str, keystring, save=keystring, processor="single_thread")
print(f"DONE processing {keystring}")

# Test if the data is complete
try:
print(
"Try loading data in %s to see if it is complete."
% (runid_str + "-" + keystring)
% (run_id_str + "-" + keystring)
)
st.get_array(runid_str, keystring, keep_columns="time", progress_bar=False)
print("Successfully loaded %s! It is complete." % (runid_str + "-" + keystring))
st.get_array(run_id_str, keystring, keep_columns="time", progress_bar=False)
print("Successfully loaded %s! It is complete." % (run_id_str + "-" + keystring))
except Exception as e:
print(f"Data is not complete for {runid_str + '-' + keystring}. Skipping")
print(f"Data is not complete for {run_id_str + '-' + keystring}. Skipping")
print("Below is the error message we get when trying to load the data:")
print(e)
print("--------------------------")
Expand All @@ -202,15 +209,15 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for
savers = dict()
for keystring in plugin.provides:
print(f"Making {keystring}")
key = strax.DataKey(runid_str, keystring, plugin.lineage)
saver = st.storage[0].saver(key, plugin.metadata(runid, keystring))
key = strax.DataKey(run_id_str, keystring, plugin.lineage)
saver = st.storage[0].saver(key, plugin.metadata(run_id, keystring))
saver.is_forked = True
savers[keystring] = saver

# Setup a few more variables
in_dtype = plugin.depends_on[0]
input_metadata = st.get_metadata(runid_str, in_dtype)
input_key = strax.DataKey(runid_str, in_dtype, input_metadata["lineage"])
input_metadata = st.get_metadata(run_id_str, in_dtype)
input_key = strax.DataKey(run_id_str, in_dtype, input_metadata["lineage"])
backend = None
backend_key = None

Expand Down Expand Up @@ -368,12 +375,12 @@ def main():

print("Context is set up!")

runid = args.dataset
runid_str = f"{runid:06d}"
run_id = args.dataset
run_id_str = f"{run_id:06d}"
out_dtype = args.output # eg. ypically for tpc: peaklets/event_info

print("Getting to-download list...")
to_download = find_data_to_download(st, runid, out_dtype)
to_download = find_data_to_download(st, run_id, out_dtype)
print("Got to-download list!")

# See if we have rucio local frontend
Expand All @@ -384,13 +391,13 @@ def main():
for other_dtype in buddies:
if other_dtype == out_dtype:
continue
to_download.extend(find_data_to_download(st, runid, other_dtype))
to_download.extend(find_data_to_download(st, run_id, other_dtype))
# Remove duplicates
to_download = list(set(to_download))

# Initialize plugin needed for processing this output type
plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype]
st._set_plugin_config(plugin, runid_str, tolerant=False)
plugin = st._get_plugins((out_dtype,), run_id_str)[out_dtype]
st._set_plugin_config(plugin, run_id_str, tolerant=False)
plugin.setup()

# Figure out what plugins we need to process/initialize
Expand All @@ -402,7 +409,7 @@ def main():
to_process = list(set(to_process))

# Keep track of the data we can download now -- will be important for the upload step later
available_dtypes = st.available_for_run(runid_str)
available_dtypes = st.available_for_run(run_id_str)
available_dtypes = available_dtypes[available_dtypes.is_stored].target.values.tolist()

missing = set(plugin.depends_on) - set(available_dtypes)
Expand All @@ -413,7 +420,7 @@ def main():
while len(intermediates) > 0:
new_intermediates = []
for _dtype in intermediates:
_plugin = st._get_plugins((_dtype,), runid_str)[_dtype]
_plugin = st._get_plugins((_dtype,), run_id_str)[_dtype]
# Adding missing dependencies to to-process list
for dependency in _plugin.depends_on:
if dependency not in available_dtypes:
Expand Down Expand Up @@ -451,7 +458,7 @@ def main():
_tmp_path = tempfile.mkdtemp()
for dtype in to_process:
close_savers = dtype != args.output
process(runid, dtype, st, args.chunks, close_savers=close_savers, tmp_path=_tmp_path)
process(run_id, dtype, st, args.chunks, close_savers=close_savers, tmp_path=_tmp_path)
gc.collect()

print("Done processing. Now check if we should upload to rucio")
Expand Down Expand Up @@ -603,7 +610,7 @@ def main():
if args.chunks is None:
# Skip if update_db flag is false, or if the rucio upload failed
if args.update_db and succeded_rucio_upload:
md = st.get_meta(runid_str, this_dtype)
md = st.get_meta(run_id_str, this_dtype)
chunk_mb = [chunk["nbytes"] / (1e6) for chunk in md["chunks"]]
data_size_mb = np.sum(chunk_mb)

Expand All @@ -623,7 +630,7 @@ def main():
size_mb=data_size_mb,
)

db.update_data(runid, new_data_dict)
db.update_data(run_id, new_data_dict)
print(f"Database updated for {this_dtype} at {rse}")

# Cleanup the files we uploaded
Expand Down
Loading

0 comments on commit 5570815

Please sign in to comment.