Skip to content

Commit

Permalink
Merge branch 'fix_raw_records' of github.com:XAMS-nikhef/amstrax into…
Browse files Browse the repository at this point in the history
… fix_raw_records
  • Loading branch information
cfuselli committed Dec 14, 2023
2 parents 4d0084c + 7625872 commit 8109bb9
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 137 deletions.
1 change: 1 addition & 0 deletions amstrax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .common import *
from .rundb import *
from .logging_utils import *

from . import plugins
from .plugins import *
Expand Down
82 changes: 35 additions & 47 deletions amstrax/auto_processing/auto_processing_stoomboot.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import subprocess
from datetime import datetime, timedelta
import logging
from logging.handlers import TimedRotatingFileHandler

import sys

Expand Down Expand Up @@ -43,7 +42,7 @@ def parse_args():
help="Memory per CPU")
parser.add_argument(
'--logs_path',
default='/data/xenon/xams_v2/logs/auto_processing',
default='/data/xenon/xams_v2/logs/',
help="Path where to save logs")
parser.add_argument(
'--production',
Expand All @@ -56,50 +55,21 @@ def parse_args():

return parser.parse_args()


def setup_logging(logs_path):
"""
Setup logging configuration with daily log rotation.
"""
if not os.path.exists(logs_path):
os.makedirs(logs_path)

log_file = os.path.join(logs_path, 'copying.log')

log_formatter = logging.Formatter("%(asctime)s | %(levelname)-5.5s | %(message)s")
logger = logging.getLogger()

logger.setLevel(logging.INFO)

# Setup file handler with daily rotation
file_handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7)
file_handler.setFormatter(log_formatter)
file_handler.suffix = "%Y%m%d"
logger.addHandler(file_handler)

# Setup console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)

def main(args):
"""
Main function to handle auto-processing of xams data.
"""

# Import custom modules
import amstrax

version = '2.1.0'
logging.info(f'Starting autoprocess version {version}...')
log.info(f'Starting autoprocess version {version}...')

# Get configuration and setup
amstrax_dir = amstrax.amstrax_dir
nap_time = int(args.timeout)
output_folder = args.output_folder
targets = " ".join(args.target)
runs_col = amstrax.get_mongo_collection()
logging.info('Correctly connected, starting loop')
log.info('Correctly connected, starting loop')
amstrax_dir = amstrax.amstrax_dir

client = amstrax.get_mongo_client()
Expand All @@ -117,17 +87,17 @@ def main(args):
handle_running_jobs(runs_col, production=args.production)

if not run_docs_to_do:
logging.info(f'I found no runs to process, time to take a nap for {nap_time} seconds')
log.info(f'I found no runs to process, time to take a nap for {nap_time} seconds')
time.sleep(nap_time)
continue

# Submit new jobs if under max limit
submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir=amstrax_dir)

logging.info(f"Waiting {nap_time} seconds before rechecking, press Ctrl+C to quit...")
log.info(f"Waiting {nap_time} seconds before rechecking, press Ctrl+C to quit...")
time.sleep(nap_time)

logging.info('Done!')
log.info('Done!')

# Define additional functions to modularize the script

Expand Down Expand Up @@ -181,8 +151,8 @@ def update_task_list(args, runs_col, auto_processing_on):

# Log the found runs
if run_docs_to_do:
logging.info(f'I found {len(run_docs_to_do)} runs to process, time to get to work!')
logging.info(f'Run numbers: {[run_doc["number"] for run_doc in run_docs_to_do]}')
log.info(f'I found {len(run_docs_to_do)} runs to process, time to get to work!')
log.info(f'Run numbers: {[run_doc["number"] for run_doc in run_docs_to_do]}')
return run_docs_to_do


Expand All @@ -208,7 +178,7 @@ def handle_running_jobs(runs_col, production=False):
if processing_status['status'] in ['running', 'submitted']:
if processing_status['time'] < datetime.now() - timedelta(hours=0, minutes=30):
new_status = 'failed'
logging.info(f'Run {run_number} has a job {processing_status["status"]} for more than 1 hour, marking as {new_status}')
log.info(f'Run {run_number} has a job {processing_status["status"]} for more than 1 hour, marking as {new_status}')

if production:
runs_col.update_one(
Expand All @@ -218,7 +188,7 @@ def handle_running_jobs(runs_col, production=False):
)

else:
logging.info(f'Would have updated run {run_number} to {new_status} in the database')
log.info(f'Would have updated run {run_number} to {new_status} in the database')


def submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir):
Expand All @@ -233,20 +203,20 @@ def submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir):
run_docs_running = list(runs_col.find(query, projection).sort(sort))
num_running_jobs = len(run_docs_running)

logging.info(f'Found {num_running_jobs} runs that are running or submitted')
log.info(f'Found {num_running_jobs} runs that are running or submitted')

for run_doc in run_docs_running:
logging.info(f'Run {run_doc["number"]} is in ststus {run_doc["processing_status"]["status"]}')
log.info(f'Run {run_doc["number"]} is in ststus {run_doc["processing_status"]["status"]}')

if num_running_jobs >= args.max_jobs:
logging.info(f'The number of running jobs ({num_running_jobs}) reached the limit ({args.max_jobs})')
log.info(f'The number of running jobs ({num_running_jobs}) reached the limit ({args.max_jobs})')
return

# Submit new jobs
max_jobs_to_submit = args.max_jobs - num_running_jobs

will_do_run_ids = [int(run_doc['number']) for run_doc in run_docs_to_do[:max_jobs_to_submit]]
logging.info(f'Will submit jobs for runs: {will_do_run_ids}')
log.info(f'Will submit jobs for runs: {will_do_run_ids}')

for run_doc in run_docs_to_do[:max_jobs_to_submit]:
run_id = f'{int(run_doc["number"]):06}'
Expand Down Expand Up @@ -287,12 +257,30 @@ def submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir):
}
)
else:
logging.info(f'Would have submitted job for run {run_id}')
logging.info(f'Would have updated run {run_doc["number"]} to submitted in the database')
log.info(f'Would have submitted job for run {run_id}')
log.info(f'Would have updated run {run_doc["number"]} to submitted in the database')


# Ensure the script is run as the main program
if __name__ == '__main__':
args = parse_args()
setup_logging(args.logs_path)

log_name = "auto_processing_stoomboot"

import amstrax

versions = amstrax.print_versions(
modules="strax amstrax numpy numba".split(),
include_git=False,
return_string=True,
)

log = amstrax.get_daq_logger(
log_name,
log_name,
level=logging.DEBUG,
opening_message=f"I am processing with these software versions: {versions}",
logdir=args.logs_path,
)

main(args)
75 changes: 34 additions & 41 deletions amstrax/auto_processing/copy_live.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import subprocess
import time
import pymongo
import amstrax
import logging
from logging.handlers import TimedRotatingFileHandler
import os

# Define a dictionary for storage locations
Expand Down Expand Up @@ -39,32 +37,6 @@ def parse_args():
return parser.parse_args()


def setup_logging(logs_path):
"""
Setup logging configuration with daily log rotation.
"""
if not os.path.exists(logs_path):
os.makedirs(logs_path)

log_file = os.path.join(logs_path, 'copying.log')

log_formatter = logging.Formatter("%(asctime)s | %(levelname)-5.5s | %(message)s")
logger = logging.getLogger()

logger.setLevel(logging.INFO)

# Setup file handler with daily rotation
file_handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7)
file_handler.setFormatter(log_formatter)
file_handler.suffix = "%Y%m%d"
logger.addHandler(file_handler)

# Setup console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)


def get_rundocs(runsdb, args):
"""
Retrieve run documents from MongoDB collection based on specific criteria.
Expand Down Expand Up @@ -138,7 +110,7 @@ def copy_data(run_id, live_data_path, location, hostname, production, ssh_host):
Copy data to the specified location using rsync.
"""
if not os.path.exists(live_data_path):
logging.error(f"Could not find the data for run {run_id} at {live_data_path}, marking run as abandon")
log.error(f"Could not find the data for run {run_id} at {live_data_path}, marking run as abandon")
# add a tag to the tags array in the database, marking the run as abandon
runsdb.update_one(
{'number': int(run_id)},
Expand All @@ -147,24 +119,24 @@ def copy_data(run_id, live_data_path, location, hostname, production, ssh_host):

return

logging.info(f"Copying run {run_id} to {location}")
log.info(f"Copying run {run_id} to {location}")
copy_cmd = ['rsync', '-av', live_data_path, f'{ssh_host}:{location}']
copy = subprocess.run(copy_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

if copy.returncode != 0:
logging.error(f"Something went wrong copying run {run_id} to {location}")
logging.error(copy.stderr.decode())
log.error(f"Something went wrong copying run {run_id} to {location}")
log.error(copy.stderr.decode())
else:
logging.info(f"Successfully copied run {run_id} to {location}")
logging.info(copy.stdout.decode())
log.info(f"Successfully copied run {run_id} to {location}")
log.info(copy.stdout.decode())

if production:
runsdb.update_one(
{'number': int(run_id)},
{'$push': {'data': {'type': 'live', 'host': hostname, 'location': location,
'by': 'copy_live', 'time': datetime.datetime.now()}}}
)
logging.info(f"Successfully updated the database for run {run_id}")
log.info(f"Successfully updated the database for run {run_id}")

return copy.returncode

Expand All @@ -176,16 +148,18 @@ def handle_runs(rundocs, args):
try:
path = next(d['location'] for d in rd['data'] if d['type'] == 'live' and d['host'] == 'daq')
except StopIteration:
logging.error(f"Could not find the DB entry for live data of run {rd['number']}")
log.error(f"Could not find the DB entry for live data of run {rd['number']}")
continue

live_data_path = os.path.join(path, run_id)

# Check if data is on stoomboot and copy if not
copied_stomboot = False
if not any(d['type'] == 'live' and d['host'] == 'stbc' for d in rd['data']):
copied_stomboot = copy_data(run_id, live_data_path, STORAGE_PATHS['stbc'], 'stbc', args.production, args.ssh_host)
exit_code = copy_data(run_id, live_data_path, STORAGE_PATHS['stbc'], 'stbc', args.production, args.ssh_host)
copied_stomboot = (exit_code == 0)
else:
# it was already on stoomboot
copied_stomboot = True

if copied_stomboot:
Expand All @@ -200,22 +174,41 @@ def main(args):
"""
Main function to automate copying of new runs.
"""
logging.info('Starting to copy new runs...')

log.info('Starting to copy new runs...')
rundocs = get_rundocs(runsdb, args)
print(f"Found {len(rundocs)} runs to copy")
runs_copied = handle_runs(rundocs, args)
logging.info('Finished copying new runs.')
log.info('Finished copying new runs.')

if __name__ == '__main__':
args = parse_args()
setup_logging(args.logs_path)

log_name = "copy_live"

import amstrax

versions = amstrax.print_versions(
modules="strax amstrax numpy numba".split(),
include_git=False,
return_string=True,
)

log = amstrax.get_daq_logger(
log_name,
log_name,
level=logging.DEBUG,
opening_message=f"I am processing with these software versions: {versions}",
logdir=args.logs_path,
)

runsdb = amstrax.get_mongo_collection()

if args.loop_infinite:
while True:
runs_copied = main(args)
sleep_time = 1 if runs_copied else args.sleep_time
logging.info(f"Sleeping for {args.sleep_time} seconds...")
log.info(f"Sleeping for {args.sleep_time} seconds...")
time.sleep(args.sleep_time)
else:
main(args)
Loading

0 comments on commit 8109bb9

Please sign in to comment.