Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused python files #177

Merged
merged 1 commit into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions outsource/outsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime
import numpy as np
from tqdm import tqdm
import utilix
from utilix import DB, uconfig
from utilix.x509 import _validate_x509_proxy
from utilix.tarball import Tarball
Expand Down Expand Up @@ -162,11 +163,11 @@ def pegasus_config(self):
pconfig["pegasus.gridstart.arguments"] = "-f"
pconfig["pegasus.mode"] = "development"
# give jobs a total of {retry} + 1 tries
pconfig["dagman.retry"] = 2
pconfig["dagman.retry"] = uconfig.getint("Outsource", "dagman_retry", fallback=2)
# make sure we do start too many jobs at the same time
pconfig["dagman.maxidle"] = 5_000
pconfig["dagman.maxidle"] = uconfig.getint("Outsource", "dagman_maxidle", fallback=5_000)
# total number of jobs cap
pconfig["dagman.maxjobs"] = 300
pconfig["dagman.maxjobs"] = uconfig.getint("Outsource", "dagman_maxjobs", fallback=300)
# transfer parallelism
pconfig["pegasus.transfer.threads"] = 1

Expand Down Expand Up @@ -412,7 +413,11 @@ def _generate_workflow(self):

# script to install packages
installsh = File("install.sh")
rc.add_replica("local", "install.sh", f"file://{base_dir}/workflow/install.sh")
rc.add_replica(
"local",
"install.sh",
f"file://{os.path.join(os.path.dirname(utilix.__file__), 'install.sh')}",
)

# Add common data files to the replica catalog
xenon_config = File(".xenon_config")
Expand Down
6 changes: 3 additions & 3 deletions outsource/workflow/combine-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then
fi

# Installing customized packages
. install.sh
. install.sh strax straxen cutax

# Combine the data
time python combine.py ${run_id} ${dtype} --context ${context} --xedocs_version ${xedocs_version} --input data ${combine_extra_args}

# Check data dir again
echo "data dir:"
echo "Here is what is in the data directory after combining:"
ls -l data

# tar up the output
tar -czfv ${output} finished_data
tar czfv ${output} finished_data
36 changes: 0 additions & 36 deletions outsource/workflow/install.sh

This file was deleted.

35 changes: 17 additions & 18 deletions outsource/workflow/process-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ chunks=${args[@]:8}

echo $@

echo "Processing chunks: $chunks"
echo "Processing chunks:"
echo "$chunks"

extraflags=""

Expand Down Expand Up @@ -70,13 +71,13 @@ echo

if [ "X${standalone_download}" = "Xno-download" ]; then
# We are given a tarball from the previous download job
echo "Untaring input data..."
echo "Untar input data:"
tar -xzf *-data*.tar.gz
fi


# Installing customized packages
. install.sh
. install.sh strax straxen cutax


# See if we have any input tarballs
Expand All @@ -86,27 +87,25 @@ if [ -f ./$runid_pad*.tar.gz ]; then
mkdir data
for tarball in $(ls $runid_pad*.tar.gz)
do
echo "Untarring input: $tarball"
echo "Untarr input: $tarball:"
tar -xzf $tarball -C data --strip-components=1
done
fi
echo

echo "Check RunDB API:"
echo "Pinging xenon-runsdb.grid.uchicago.edu"
ping -c 5 xenon-runsdb.grid.uchicago.edu
echo
echo "Checking if we have .dbtoken"
# echo "Check network:"
# echo "ping -c 5 xenon-runsdb.grid.uchicago.edu"
# ping -c 5 xenon-runsdb.grid.uchicago.edu
# echo
echo "Checking if we have .dbtoken:"
echo "ls -lah $HOME/.dbtoken"
ls -lah $HOME/.dbtoken
echo "ls -lah $USERPROFILE/.dbtoken"
la -lah $USERPROFILE/.dbtoken
echo
# echo "nmap xenon-runsdb.grid.uchicago.edu"
# map -p5000 xenon-runsdb.grid.uchicago.edu
# echo

echo "Processing now..."
echo "Processing:"

chunkarg=""
if [ -n "${chunks}" ]
Expand All @@ -128,14 +127,14 @@ echo "We want to find and delete any records or records_nv if existing, to save
find data -type d \( -name "*-records-*" -o -name "*-records_nv-*" \) -exec rm -rf {} +

if [ "X${standalone_download}" = "Xdownload-only" ]; then
echo "We are tarballing the data directory for download-only job."
tar -czfv ${output_tar} data
echo "We are tarballing the data directory for download-only job:"
tar czfv ${output_tar} data
elif [ "X${output_dtype}" = "Xevent_info_double" ] || [ "X${output_dtype}" = "Xevents_mv" ] || [ "X${output_dtype}" = "Xevents_nv" ]; then
echo "We are tarballing the data directory for ${output_dtype} job."
tar -czfv ${output_tar} data
echo "We are tarballing the data directory for ${output_dtype} job:"
tar czfv ${output_tar} data
else
echo "We are tarballing the data directory, but only for those with _temp."
tar -czfv ${output_tar} data/*_temp
echo "We are tarballing the data directory, but only for those with _temp:"
tar czfv ${output_tar} data/*_temp
fi

echo
Expand Down
75 changes: 41 additions & 34 deletions outsource/workflow/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ def get_bottom_dtypes(dtype):
return ("peaklets", "lone_hits")


def find_data_to_download(st, runid, target):
runid_str = f"{runid:06d}"
def find_data_to_download(st, run_id, target):
run_id_str = f"{run_id:06d}"
bottoms = get_bottom_dtypes(target)
bottom_hashes = tuple([st.key_for(runid_str, b).lineage_hash for b in bottoms])
bottom_hashes = tuple([st.key_for(run_id_str, b).lineage_hash for b in bottoms])

to_download = []

# All data entries from the RunDB for certain runid
data = db.get_data(runid, host="rucio-catalogue")
# All data entries from the RunDB for certain run_id
data = db.get_data(run_id, host="rucio-catalogue")

def find_data(_target):
"""Recursively find all the data needed to make the target dtype.
Expand All @@ -120,17 +120,17 @@ def find_data(_target):
return

# Initialize plugin needed for processing
_plugin = st._get_plugins((_target,), runid_str)[_target]
st._set_plugin_config(_plugin, runid_str, tolerant=False)
_plugin = st._get_plugins((_target,), run_id_str)[_target]
st._set_plugin_config(_plugin, run_id_str, tolerant=False)

# Download all the required data_type to produce this output file
for in_dtype in _plugin.depends_on:
# Get hash for this dtype
hash = st.key_for(runid_str, in_dtype).lineage_hash
hash = st.key_for(run_id_str, in_dtype).lineage_hash
rses = [d["location"] for d in data if (d["type"] == in_dtype and hash in d["did"])]
# For checking if local path exists
# Here st.storage[0] is the local storage like ./data
local_path = os.path.join(st.storage[0].path, f"{runid:06d}-{in_dtype}-{hash}")
local_path = os.path.join(st.storage[0].path, f"{run_id:06d}-{in_dtype}-{hash}")

if len(rses) == 0 and not os.path.exists(local_path):
# Need to download data to make ths one
Expand All @@ -145,13 +145,20 @@ def find_data(_target):
return to_download


def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for_strax"):
runid_str = f"{runid:06d}"
def process(run_id, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for_strax"):
run_id_str = f"{run_id:06d}"
t0 = time.time()

# st.make(
# run_id_str,
# out_dtype,
# chunk_number={"raw_records": chunks},
# processor="single_thread",
# )

# Initialize plugin needed for processing this output type
plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype]
st._set_plugin_config(plugin, runid_str, tolerant=False)
plugin = st._get_plugins((out_dtype,), run_id_str)[out_dtype]
st._set_plugin_config(plugin, run_id_str, tolerant=False)
plugin.setup()

# Now move on to processing
Expand All @@ -170,7 +177,7 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for
# We want to be more tolerant on cuts_basic, because sometimes it is ill-defined
if keystring == "cuts_basic":
try:
st.make(runid_str, keystring, save=keystring, processor="single_thread")
st.make(run_id_str, keystring, save=keystring, processor="single_thread")
except Exception as e:
print(
f"Failed to make {keystring}, but it might be "
Expand All @@ -179,19 +186,19 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for
print("Below is the error:")
print(e)
else:
st.make(runid_str, keystring, save=keystring, processor="single_thread")
st.make(run_id_str, keystring, save=keystring, processor="single_thread")
print(f"DONE processing {keystring}")

# Test if the data is complete
try:
print(
"Try loading data in %s to see if it is complete."
% (runid_str + "-" + keystring)
% (run_id_str + "-" + keystring)
)
st.get_array(runid_str, keystring, keep_columns="time", progress_bar=False)
print("Successfully loaded %s! It is complete." % (runid_str + "-" + keystring))
st.get_array(run_id_str, keystring, keep_columns="time", progress_bar=False)
print("Successfully loaded %s! It is complete." % (run_id_str + "-" + keystring))
except Exception as e:
print(f"Data is not complete for {runid_str + '-' + keystring}. Skipping")
print(f"Data is not complete for {run_id_str + '-' + keystring}. Skipping")
print("Below is the error message we get when trying to load the data:")
print(e)
print("--------------------------")
Expand All @@ -202,15 +209,15 @@ def process(runid, out_dtype, st, chunks, close_savers=False, tmp_path=".tmp_for
savers = dict()
for keystring in plugin.provides:
print(f"Making {keystring}")
key = strax.DataKey(runid_str, keystring, plugin.lineage)
saver = st.storage[0].saver(key, plugin.metadata(runid, keystring))
key = strax.DataKey(run_id_str, keystring, plugin.lineage)
saver = st.storage[0].saver(key, plugin.metadata(run_id, keystring))
saver.is_forked = True
savers[keystring] = saver

# Setup a few more variables
in_dtype = plugin.depends_on[0]
input_metadata = st.get_metadata(runid_str, in_dtype)
input_key = strax.DataKey(runid_str, in_dtype, input_metadata["lineage"])
input_metadata = st.get_metadata(run_id_str, in_dtype)
input_key = strax.DataKey(run_id_str, in_dtype, input_metadata["lineage"])
backend = None
backend_key = None

Expand Down Expand Up @@ -368,12 +375,12 @@ def main():

print("Context is set up!")

runid = args.dataset
runid_str = f"{runid:06d}"
run_id = args.dataset
run_id_str = f"{run_id:06d}"
out_dtype = args.output # eg. ypically for tpc: peaklets/event_info

print("Getting to-download list...")
to_download = find_data_to_download(st, runid, out_dtype)
to_download = find_data_to_download(st, run_id, out_dtype)
print("Got to-download list!")

# See if we have rucio local frontend
Expand All @@ -384,13 +391,13 @@ def main():
for other_dtype in buddies:
if other_dtype == out_dtype:
continue
to_download.extend(find_data_to_download(st, runid, other_dtype))
to_download.extend(find_data_to_download(st, run_id, other_dtype))
# Remove duplicates
to_download = list(set(to_download))

# Initialize plugin needed for processing this output type
plugin = st._get_plugins((out_dtype,), runid_str)[out_dtype]
st._set_plugin_config(plugin, runid_str, tolerant=False)
plugin = st._get_plugins((out_dtype,), run_id_str)[out_dtype]
st._set_plugin_config(plugin, run_id_str, tolerant=False)
plugin.setup()

# Figure out what plugins we need to process/initialize
Expand All @@ -402,7 +409,7 @@ def main():
to_process = list(set(to_process))

# Keep track of the data we can download now -- will be important for the upload step later
available_dtypes = st.available_for_run(runid_str)
available_dtypes = st.available_for_run(run_id_str)
available_dtypes = available_dtypes[available_dtypes.is_stored].target.values.tolist()

missing = set(plugin.depends_on) - set(available_dtypes)
Expand All @@ -413,7 +420,7 @@ def main():
while len(intermediates) > 0:
new_intermediates = []
for _dtype in intermediates:
_plugin = st._get_plugins((_dtype,), runid_str)[_dtype]
_plugin = st._get_plugins((_dtype,), run_id_str)[_dtype]
# Adding missing dependencies to to-process list
for dependency in _plugin.depends_on:
if dependency not in available_dtypes:
Expand Down Expand Up @@ -451,7 +458,7 @@ def main():
_tmp_path = tempfile.mkdtemp()
for dtype in to_process:
close_savers = dtype != args.output
process(runid, dtype, st, args.chunks, close_savers=close_savers, tmp_path=_tmp_path)
process(run_id, dtype, st, args.chunks, close_savers=close_savers, tmp_path=_tmp_path)
gc.collect()

print("Done processing. Now check if we should upload to rucio")
Expand Down Expand Up @@ -603,7 +610,7 @@ def main():
if args.chunks is None:
# Skip if update_db flag is false, or if the rucio upload failed
if args.update_db and succeded_rucio_upload:
md = st.get_meta(runid_str, this_dtype)
md = st.get_meta(run_id_str, this_dtype)
chunk_mb = [chunk["nbytes"] / (1e6) for chunk in md["chunks"]]
data_size_mb = np.sum(chunk_mb)

Expand All @@ -623,7 +630,7 @@ def main():
size_mb=data_size_mb,
)

db.update_data(runid, new_data_dict)
db.update_data(run_id, new_data_dict)
print(f"Database updated for {this_dtype} at {rse}")

# Cleanup the files we uploaded
Expand Down
Loading