diff --git a/dispatcher/DAQController.py b/dispatcher/DAQController.py index 292345c1..caaa3300 100644 --- a/dispatcher/DAQController.py +++ b/dispatcher/DAQController.py @@ -1,22 +1,30 @@ import datetime import json import enum +import pytz from daqnt import DAQ_STATUS -''' +from .MongoConnect import NO_NEW_RUN + +""" DAQ Controller Brain Class D. Coderre, 12. Mar. 2019 D. Masson, 06 Apr 2020 +S. di Pede, 17 Mar 2021 Brief: This code handles the logic of what the dispatcher does when. It takes in aggregated status updates and commands from the mongo connector and decides if any action needs to be taken to get the DAQ into the target state. It also handles the resetting of runs (the ~hourly stop/start) during normal operations. -''' +""" + +def now(): + return datetime.datetime.now(pytz.utc) + #return datetime.datetime.utcnow() # wrong? class DAQController(): - def __init__(self, config, mongo_connector, log, hypervisor): + def __init__(self, config, daq_config, mongo_connector, log, hypervisor): self.mongo = mongo_connector self.hypervisor = hypervisor @@ -26,15 +34,15 @@ def __init__(self, config, mongo_connector, log, hypervisor): # Timeouts. There are a few things that we want to wait for that might take time. # The keys for these dicts will be detector identifiers. - detectors = list(config['MasterDAQConfig'].keys()) + detectors = list(daq_config.keys()) self.last_command = {} for k in ['arm', 'start', 'stop']: self.last_command[k] = {} for d in detectors: - self.last_command[k][d] = datetime.datetime.utcnow() + self.last_command[k][d] = now() self.error_stop_count = {d : 0 for d in detectors} self.max_arm_cycles = int(config['MaxArmCycles']) - self.missed_arm_cycles={k:0 for k in config['MasterDAQConfig'].keys()} + self.missed_arm_cycles={k:0 for k in detectors} # Timeout properties come from config self.timeouts = { @@ -45,12 +53,13 @@ def __init__(self, config, mongo_connector, log, hypervisor): self.log = log self.time_between_commands = int(config['TimeBetweenCommands']) self.can_force_stop={k:True for k in detectors} + self.one_detector_arming = False self.start_cmd_delay = float(config['StartCmdDelay']) self.stop_cmd_delay = float(config['StopCmdDelay']) def solve_problem(self, latest_status, goal_state): - ''' + """ This is sort of the whole thing that all the other code is supporting We get the status from the DAQ and the command from the user Then one of three things can happen: @@ -64,15 +73,15 @@ def solve_problem(self, latest_status, goal_state): run in the neutron veto but it is already running a combined run and therefore unavailable. The frontend should prevent many of these cases though. - The way that works is this. We do everything iteratively. Like if we see that - some detector needs to be stopped in order to proceed we issue the stop command - then move on. Everything is then re-evaluated once that command runs through. - - I wrote this very verbosely since it's got quite a few different possibilities and - after rewriting once I am convinced longer, clearer code is better than terse, efficient - code for this particular function. Also I'm hardcoding the detector names. - ''' - + The way that works is this: + A) the detector should be INACTIVE (i.e., IDLE), we stop the detector + if the status is in one of the active states + B) the detector should be ACTIVE (i.e, RUNNING), we issue the necessary + commands to put the system in the RUNNING status + C) we deal separately with the ERROR and TIMEOUT statuses, as in the + first time we need to promptly stop the detector, and in the second + case we need to handle the timeouts. + """ # cache these so other functions can see them self.goal_state = goal_state self.latest_status = latest_status @@ -85,167 +94,92 @@ def solve_problem(self, latest_status, goal_state): if latest_status[det]['status'] in [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED]: self.one_detector_arming = True - ''' - CASE 1: DETECTORS ARE INACTIVE - - In our case 'inactive' means 'stopped'. An inactive detector is in its goal state as - long as it isn't doing anything, i.e. not ARMING, ARMED, or RUNNING. We don't care if - it's idle, or in error, or if there's no status at all. We will care about that later - if we try to activate it. - ''' - # 1a - deal with TPC and also with MV and NV, but only if they're linked - active_states = [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED, DAQ_STATUS.RUNNING, - DAQ_STATUS.ERROR, DAQ_STATUS.UNKNOWN] - if goal_state['tpc']['active'] == 'false': - - # Send stop command if we have to - if ( - # TPC not in Idle, error, timeout - (latest_status['tpc']['status'] in active_states) or - # MV linked and not in Idle, error, timeout - (latest_status['muon_veto']['status'] in active_states and - goal_state['tpc']['link_mv'] == 'true') or - # NV linked and not in Idle, error, timeout - (latest_status['neutron_veto']['status'] in active_states and - goal_state['tpc']['link_nv'] == 'true') - ): - self.stop_detector_gently(detector='tpc') - elif latest_status['tpc']['status'] == DAQ_STATUS.TIMEOUT: - self.check_timeouts('tpc') - - # 1b - deal with MV but only if MV not linked to TPC - if goal_state['tpc']['link_mv'] == 'false' and goal_state['muon_veto']['active'] == 'false': - if latest_status['muon_veto']['status'] in active_states: - self.stop_detector_gently(detector='muon_veto') - elif latest_status['muon_veto']['status'] == DAQ_STATUS.TIMEOUT: - self.check_timeouts('muon_veto') - # 1c - deal with NV but only if NV not linked to TPC - if goal_state['tpc']['link_nv'] == 'false' and goal_state['neutron_veto']['active'] == 'false': - if latest_status['neutron_veto']['status'] in active_states: - self.stop_detector_gently(detector='neutron_veto') - elif latest_status['neutron_veto']['status'] == DAQ_STATUS.TIMEOUT: - self.check_timeouts('neutron_veto') - - ''' - CASE 2: DETECTORS ARE ACTIVE - - This will be more complicated. - There are now 4 possibilities (each with sub-possibilities and each for different - combinations of linked or unlinked detectors): - 1. The detectors were already running. Here we have to check if the run needs to - be reset but otherwise maybe we can just do nothing. - 2. The detectors were not already running. We have to start them. - 3. The detectors are in some failed state. We should periodically complain - 4. The detectors are in some in-between state (i.e. ARMING) and we just need to - wait for some seconds to allow time for the thing to sort itself out. - ''' - # 2a - again we consider the TPC first, as well as the cases where the NV/MV are linked - if goal_state['tpc']['active'] == 'true': - - # Maybe we have nothing to do except check the run turnover - if ( - # TPC running! - (latest_status['tpc']['status'] == DAQ_STATUS.RUNNING) and - # MV either unlinked or running - (latest_status['muon_veto']['status'] == DAQ_STATUS.RUNNING or - goal_state['tpc']['link_mv'] == 'false') and - # NV either unlinked or running - (latest_status['neutron_veto']['status'] == DAQ_STATUS.RUNNING or - goal_state['tpc']['link_nv'] == 'false') - ): - self.check_run_turnover('tpc') - - # Maybe we're already ARMED and should start a run - elif ( - # TPC ARMED - (latest_status['tpc']['status'] == DAQ_STATUS.ARMED) and - # MV ARMED or UNLINKED - (latest_status['muon_veto']['status'] == DAQ_STATUS.ARMED or - goal_state['tpc']['link_mv'] == 'false') and - # NV ARMED or UNLINKED - (latest_status['neutron_veto']['status'] == DAQ_STATUS.ARMED or - goal_state['tpc']['link_nv'] == 'false')): - self.log.info("Starting TPC") - self.control_detector(command='start', detector='tpc') - - # Maybe we're IDLE and should arm a run - elif ( - # TPC IDLE - (latest_status['tpc']['status'] == DAQ_STATUS.IDLE) and - # MV IDLE or UNLINKED - (latest_status['muon_veto']['status'] == DAQ_STATUS.IDLE or - goal_state['tpc']['link_mv'] == 'false') and - # NV IDLE or UNLINKED - (latest_status['neutron_veto']['status'] == DAQ_STATUS.IDLE or - goal_state['tpc']['link_nv'] == 'false')): - self.log.info("Arming TPC") - self.control_detector(command='arm', detector='tpc') - - elif ( - # TPC ARMING - (latest_status['tpc']['status'] == DAQ_STATUS.ARMING) and - # MV ARMING or UNLINKED - (latest_status['muon_veto']['status'] == DAQ_STATUS.ARMING or - goal_state['tpc']['link_mv'] == 'false') and - # NV ARMING or UNLINKED - (latest_status['neutron_veto']['status'] == DAQ_STATUS.ARMING or - goal_state['tpc']['link_nv'] == 'false')): - self.check_timeouts(detector='tpc', command='arm') - - elif ( - # TPC ERROR - (latest_status['tpc']['status'] == DAQ_STATUS.ERROR) and - # MV ERROR or UNLINKED - (latest_status['muon_veto']['status'] == DAQ_STATUS.ERROR or - goal_state['tpc']['link_mv'] == 'false') and - # NV ERROR or UNLINKED - (latest_status['neutron_veto']['status'] == DAQ_STATUS.ERROR or - goal_state['tpc']['link_nv'] == 'false')): - self.log.info("TPC has error!") - self.control_detector(command='stop', detector='tpc', - force=self.can_force_stop['tpc']) - self.can_force_stop['tpc']=False - - # Maybe someone is timing out or we're in some weird mixed state - # I think this can just be an 'else' because if we're not in some state we're happy - # with we should probably check if a reset is in order. - # Note that this will be triggered nearly every run during ARMING so it's not a - # big deal - else: - self.log.debug("Checking TPC timeouts") - self.check_timeouts('tpc') - - # 2b, 2c. In case the MV and/or NV are UNLINKED and ACTIVE we can treat them - # in basically the same way. - for detector in ['muon_veto', 'neutron_veto']: - linked = goal_state['tpc']['link_mv'] - if detector == 'neutron_veto': - linked = goal_state['tpc']['link_nv'] - - # Active/unlinked. You your own detector now. - if (goal_state[detector]['active'] == 'true' and linked == 'false'): - - # Same logic as before but simpler cause we don't have to check for links - if latest_status[detector]['status'] == DAQ_STATUS.RUNNING: - self.check_run_turnover(detector) - elif latest_status[detector]['status'] == DAQ_STATUS.ARMED: - self.control_detector(command='start', detector=detector) - elif latest_status[detector]['status'] == DAQ_STATUS.IDLE: - self.control_detector(command='arm', detector=detector) - elif latest_status[detector]['status'] == DAQ_STATUS.ERROR: - self.control_detector(command='stop', detector=detector, - force=self.can_force_stop[detector]) - self.can_force_stop[detector] = False + active_states = [DAQ_STATUS.RUNNING, DAQ_STATUS.ARMED, DAQ_STATUS.ARMING, DAQ_STATUS.UNKNOWN] + + for det in latest_status.keys(): + # The detector should be INACTIVE + if goal_state[det]['active'] == 'false': + # The detector is not in IDLE, ERROR or TIMEOUT: it needs to be stopped + if latest_status[det]['status'] in active_states: + # Check before if the status is UNKNOWN and it is maybe timing out + if latest_status[det]['status'] == DAQ_STATUS.UNKNOWN: + self.log.info(f"The status of {det} is unknown, check timeouts") + self.check_timeouts(detector=det) + # Otherwise stop the detector + else: + self.log.info(f"Sending stop command to {det}") + self.stop_detector_gently(detector=det) + # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed + elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT: + self.log.info(f"The {det} is in timeout, check timeouts") + # TODO update + self.handle_timeout(detector=det) + + elif latest_status[det]['status'] == DAQ_STATUS.ERROR: + self.log.info(f"The {det} has error, sending stop command") + self.control_detector(command='stop', detector=det, force=self.can_force_stop[det]) + self.can_force_stop[det]=False + else: + # the only remaining option is 'idle', which is fine + pass + + # The detector should be ACTIVE (RUNNING) + else: #goal_state[det]['active'] == 'true': + if latest_status[det]['status'] == DAQ_STATUS.RUNNING: + self.log.info(f"The {det} is running") + self.check_run_turnover(detector=det) + # TODO does this work properly? + if latest_status[det]['mode'] != goal_state[det]['mode']: + self.control_detector(command='stop', detector=det) + # ARMED, start the run + elif latest_status[det]['status'] == DAQ_STATUS.ARMED: + self.log.info(f"The {det} is armed, sending start command") + self.control_detector(command='start', detector=det) + # ARMING, check if it is timing out + elif latest_status[det]['status'] == DAQ_STATUS.ARMING: + self.log.info(f"The {det} is arming, check timeouts") + self.log.debug(f"Checking the {det} timeouts") + self.check_timeouts(detector=det, command='arm') + # UNKNOWN, check if it is timing out + elif latest_status[det]['status'] == DAQ_STATUS.UNKNOWN: + self.log.info(f"The status of {det} is unknown, check timeouts") + self.log.debug(f"Checking the {det} timeouts") + self.check_timeouts(detector=det) + + # Maybe the detector is IDLE, we should arm a run + elif latest_status[det]['status'] == DAQ_STATUS.IDLE: + self.log.info(f"The {det} is in idle, sending arm command") + self.control_detector(command='arm', detector=det) + + # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed + elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT: + self.log.info(f"The {det} is in timeout, check timeouts") + self.log.debug("Checking %s timeouts", det) + self.handle_timeout(detector=det) + + elif latest_status[det]['status'] == DAQ_STATUS.ERROR: + self.log.info(f"The {det} has error, sending stop command") + self.control_detector(command='stop', detector=det, force=self.can_force_stop[det]) + self.can_force_stop[det]=False else: - self.check_timeouts(detector) + # shouldn't be able to get here + pass + + return + + def handle_timeout(self, detector): + """ + Detector already in the TIMEOUT status are directly stopped. + """ + self.control_detector(command='stop', detector=detector) return def stop_detector_gently(self, detector): - ''' + """ Stops the detector, unless we're told to wait for the current run to end - ''' + """ if ( # Running normally (not arming, error, timeout, etc) self.latest_status[detector]['status'] == DAQ_STATUS.RUNNING and @@ -256,65 +190,64 @@ def stop_detector_gently(self, detector): self.control_detector(detector=detector, command='stop') def control_detector(self, command, detector, force=False): - ''' + """ Issues the command to the detector if allowed by the timeout - ''' - now = datetime.datetime.utcnow() + """ + time_now = now() try: - dt = (now - self.last_command[command][detector]).total_seconds() + dt = (time_now - self.last_command[command][detector]).total_seconds() except (KeyError, TypeError): dt = 2*self.timeouts[command] # make sure we don't rush things if command == 'start': - dt_last = (now - self.last_command['arm'][detector]).total_seconds() + dt_last = (time_now - self.last_command['arm'][detector]).total_seconds() elif command == 'arm': - dt_last = (now - self.last_command['stop'][detector]).total_seconds() + dt_last = (time_now - self.last_command['stop'][detector]).total_seconds() else: dt_last = self.time_between_commands*2 if (dt > self.timeouts[command] and dt_last > self.time_between_commands) or force: - run_mode = self.goal_state[detector]['mode'] + ls = self.latest_status + gs = self.goal_state if command == 'arm': if self.one_detector_arming: self.log.info('Another detector already arming, can\'t arm %s' % detector) # this leads to run number overlaps return - readers, cc = self.mongo.get_hosts_for_mode(run_mode) + readers, cc = self.mongo.get_hosts_for_mode(gs[detector]['mode']) hosts = (cc, readers) delay = 0 self.one_detector_arming = True elif command == 'start': - readers, cc = self.mongo.get_hosts_for_mode(run_mode) - hosts = (readers, cc) # we want the cc to delay by 1s + readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode']) + hosts = (readers, cc) # we can safely short the logic here and buy an extra logic cycle self.one_detector_arming = False delay = self.start_cmd_delay #Reset arming timeout counter self.missed_arm_cycles[detector]=0 else: # stop - readers, cc = self.mongo.get_configured_nodes(detector, - self.goal_state['tpc']['link_mv'], self.goal_state['tpc']['link_nv']) + readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode']) hosts = (cc, readers) - if force or self.latest_status[detector]['status'] not in [DAQ_STATUS.RUNNING]: + if force or ls[detector]['status'] not in [DAQ_STATUS.RUNNING]: delay = 0 else: delay = self.stop_cmd_delay - # TODO smart delay? - if self.latest_status[detector]['status'] in [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED]: + if ls[detector]['status'] in [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED]: # this was the arming detector self.one_detector_arming = False self.log.debug(f'Sending {command.upper()} to {detector}') - if self.mongo.send_command(command, hosts, self.goal_state[detector]['user'], - detector, self.goal_state[detector]['mode'], delay, force): + if self.mongo.send_command(command, hosts, gs[detector]['user'], + detector, gs[detector]['mode'], delay, force): # failed return - self.last_command[command][detector] = now - if command == 'start' and self.mongo.insert_run_doc(detector, self.goal_state): + self.last_command[command][detector] = time_now + if command == 'start' and self.mongo.insert_run_doc(detector): # db having a moment return - if (command == 'stop' and 'number' in self.latest_status[detector] and - self.mongo.set_stop_time(self.latest_status[detector]['number'], detector, force)): + if (command == 'stop' and ls[detector]['number'] != NO_NEW_RUN and + self.mongo.set_stop_time(ls[detector]['number'], detector, force)): # db having a moment return @@ -322,21 +255,20 @@ def control_detector(self, command, detector, force=False): self.log.debug('Can\'t send %s to %s, timeout at %i/%i' % ( command, detector, dt, self.timeouts[command])) - def check_timeouts(self, detector, command = None): - ''' + def check_timeouts(self, detector, command=None): + """ This one is invoked if we think we need to change states. Either a stop command needs to be sent, or we've detected an anomaly and want to decide what to do. Basically this function decides: - We are not in any timeouts: send the normal stop command - We are waiting for something: do nothing - We were waiting for something but it took too long: attempt reset - ''' + """ - sendstop = False - nowtime = datetime.datetime.utcnow() + time_now = now() - #First check how often we have been timing out, if it happend to often something - # bad happend and we start from scratch again + #First check how often we have been timing out, if it happened to often + # something bad happened and we start from scratch again if self.missed_arm_cycles[detector]>self.max_arm_cycles and detector=='tpc': self.hypervisor.tactical_nuclear_option() return @@ -348,7 +280,7 @@ def check_timeouts(self, detector, command = None): else: self.log.debug(f'Checking {command} timeout for {detector}') - dt = (nowtime - self.last_command[command][detector]).total_seconds() + dt = (time_now - self.last_command[command][detector]).total_seconds() local_timeouts = dict(self.timeouts.items()) local_timeouts['stop'] = self.timeouts['stop']*(self.error_stop_count[detector]+1) @@ -367,7 +299,8 @@ def check_timeouts(self, detector, command = None): 'ERROR', "STOP_TIMEOUT") # also invoke the nuclear option - self.hypervisor.tactical_nuclear_option() + if detector == 'tpc': + self.hypervisor.tactical_nuclear_option() self.error_stop_count[detector] = 0 else: self.control_detector(detector=detector, command='stop') @@ -385,43 +318,30 @@ def check_timeouts(self, detector, command = None): return - def throw_error(self): - ''' + """ Throw a general error that the DAQ is stuck - ''' + """ self.mongo.log_error( "Dispatcher control loop can't get DAQ out of stuck state", 'ERROR', "GENERAL_ERROR") def check_run_turnover(self, detector): - ''' + """ During normal operation we want to run for a certain number of minutes, then automatically stop and restart the run. No biggie. We check the time here to see if it's something we have to do. - ''' - # If no stop after configured, return - try: - _ = int(self.goal_state[detector]['stop_after']) - except Exception as e: - self.log.info(f'No run duration specified for {detector}? {type(e)}, {e}') - return + """ - try: - number = self.latest_status[detector]['number'] - except Exception as e: - self.log.debug(f'Could not get number {type(e)}, let\'s resort to workaround. {e}') - # dirty workaround just in case there was a dispatcher crash - number = self.latest_status[detector]['number'] = self.mongo.get_next_run_number() - 1 - if number == -2: # db issue - return + number = self.latest_status[detector]['number'] start_time = self.mongo.get_run_start(number) if start_time is None: + self.log.debug(f'No start time for {number}?') return - nowtime = datetime.datetime.utcnow() + time_now = now() run_length = int(self.goal_state[detector]['stop_after'])*60 - run_duration = (nowtime - start_time).total_seconds() + run_duration = (time_now - start_time).total_seconds() self.log.debug('Checking run turnover for %s: %i/%i' % (detector, run_duration, run_length)) if run_duration > run_length: self.log.info('Stopping run for %s' % detector) diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index f1053352..fd763466 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -1,9 +1,10 @@ -iquote_plus(os.environ['MONGO_PASSWORD_DAQ'])mport datetime +import datetime from daqnt import DAQ_STATUS import threading import time +import pytz -''' +""" MongoDB Connectivity Class for XENONnT DAQ Dispatcher D. Coderre, 12. Mar. 2019 @@ -21,17 +22,29 @@ } The environment variables MONGO_PASSWORD and RUNS_MONGO_PASSWORD must be set! -''' +""" def _all(values, target): ret = len(values) > 0 for value in values: - ret &= (value == target) - return ret + if value != target: + return False + return True + +def now(): + return datetime.datetime.now(pytz.utc) + #return datetime.datetime.utcnow() # wrong? + +# Communicate between various parts of dispatcher that no new run was determined +NO_NEW_RUN = -1 + + +def now(): + return datetime.datetime.now(pytz.utc) class MongoConnect(): - def __init__(self, config, log, control_mc, runs_mc, hypervisor, testing=False): + def __init__(self, config, daq_config, log, control_mc, runs_mc, hypervisor, testing=False): # Define DB connectivity. Log is separate to make it easier to split off if needed dbn = config['ControlDatabaseName'] @@ -70,6 +83,9 @@ def __init__(self, config, log, control_mc, runs_mc, hypervisor, testing=False): # How long a node can be timing out or missed an ack before it gets fixed (TPC only) self.timeout_take_action = int(config['TimeoutActionThreshold']) + # how long to give the CC to start the run + self.cc_start_wait = int(config['StartCmdDelay']) + 1 + # Which control keys do we look for? self.control_keys = config['ControlKeys'].split() @@ -83,22 +99,17 @@ def __init__(self, config, log, control_mc, runs_mc, hypervisor, testing=False): # 'status': {enum}, # 'mode': {string} run mode if any, # 'rate': {int} aggregate rate if any, - # 'pulses': {int} pulse rate, - # 'blt': {float} blt rate, # 'readers': { # 'reader_0_reader_0': { # 'status': {enum}, - # 'checkin': {int}, # 'rate': {float}, - # 'pulses': {int}, - # 'blt': {int} # }, # 'controller': {} # } # } self.latest_status = {} self.host_config = {} - self.dc = config['MasterDAQConfig'] + self.dc = daq_config for detector in self.dc: self.latest_status[detector] = {'readers': {}, 'controller': {}} for reader in self.dc[detector]['readers']: @@ -108,7 +119,6 @@ def __init__(self, config, log, control_mc, runs_mc, hypervisor, testing=False): self.latest_status[detector]['controller'][controller] = {} self.host_config[controller] = detector - self.command_oid = {d:{c:None} for c in ['start','stop','arm'] for d in self.dc} self.log = log self.run = True self.event = threading.Event() @@ -126,86 +136,77 @@ def quit(self): def __del__(self): self.quit() - def get_update(self): - + def get_update(self, dc): + """ + Gets the latest documents from the database for + each node we know about + """ try: - for detector in self.latest_status.keys(): - for host in self.latest_status[detector]['readers'].keys(): + for detector in dc.keys(): + for host in dc[detector]['readers'].keys(): doc = self.collections['node_status'].find_one({'host': host}, sort=[('_id', -1)]) - self.latest_status[detector]['readers'][host] = doc - for host in self.latest_status[detector]['controller'].keys(): + dc[detector]['readers'][host] = doc + for host in dc[detector]['controller'].keys(): doc = self.collections['node_status'].find_one({'host': host}, sort=[('_id', -1)]) - self.latest_status[detector]['controller'][host] = doc + dc[detector]['controller'][host] = doc except Exception as e: self.log.error(f'Got error while getting update: {type(e)}: {e}') return True + self.latest_status = dc + # Now compute aggregate status return self.aggregate_status() is not None def clear_error_timeouts(self): self.error_sent = {} - def update_aggregate_status(self): - ''' - Put current aggregate status into DB - ''' - for detector in self.latest_status.keys(): - doc = { - "status": self.latest_status[detector]['status'].value, - "detector": detector, - "rate": self.latest_status[detector]['rate'], - "readers": len(self.latest_status[detector]['readers'].keys()), - "time": datetime.datetime.utcnow(), - "buff": self.latest_status[detector]['buffer'], - "mode": self.latest_status[detector]['mode'], - "pll_unlocks": self.latest_status[detector]["pll_unlocks"], - } - if 'number' in self.latest_status[detector].keys(): - doc['number'] = self.latest_status[detector]['number'] - else: - doc['number'] = None - try: - self.collections['aggregate_status'].insert(doc) - except Exception as e: - self.log.error('RunsDB snafu') - self.log.debug(f'That snafu was {type(e)} {str(e)}') - return - def aggregate_status(self): - - # Compute the total status of each detector based on the most recent updates - # of its individual nodes. Here are some general rules: - # - Usually all nodes have the same status (i.e. 'running') and this is - # simply the aggregate - # - During changes of state (i.e. starting a run) some nodes might be faster - # than others. In this case the status can be 'unknown'. The main program should - # interpret whether 'unknown' is a reasonable thing, like was a command - # sent recently? If so then sure, a 'unknown' status will happpen. - # - If any single node reports error then the whole thing is in error - # - If any single node times out then the whole thing is in timeout - - now = time.time() + """ + Compute the total status of each "detector" based on the most recent + updates of its individual nodes. Here are some general rules: + - Usually all nodes have the same status (i.e. 'running') and this is + not very complicated + - During changes of state (i.e. starting a run) some nodes might + be faster than others. In this case the status can be 'unknown'. + The main program should interpret whether 'unknown' is a reasonable + thing, like was a command sent recently? If so then sure, a 'unknown' + status will happpen. + - If any single node reports error then the whole thing is in error + - If any single node times out then the whole thing is in timeout + - Rates, buffer usage, and PLL counters only apply to the physical + detector, not the logical detector, while status and run number + apply to both + """ + time_time = time.time() ret = None + aggstat = { + k:{ 'status': -1, + 'detector': k, + 'rate': 0, + 'time': now(), + 'buff': 0, + 'mode': None, + 'pll_unlocks': 0, + 'number': -1} + for k in self.dc} for detector in self.latest_status.keys(): + # detector = logical statuses = {} status = None - mode = 'none' - rate = 0 - buff = 0 - pll = 0 - run_num = -1 + modes = [] + run_nums = [] for doc in self.latest_status[detector]['readers'].values(): + phys_det = self.host_config[doc['host']] try: - rate += doc['rate'] - buff += doc['buffer_size'] - pll += doc.get('pll', 0) + aggstat[phys_det]['rate'] += doc['rate'] + aggstat[phys_det]['buff'] += doc['buffer_size'] + aggstat[phys_det]['pll_unlocks'] += doc.get('pll', 0) except Exception as e: - # This is not really important it's nice if we have - # it but not essential. - self.log.debug(f'Rate calculation ran into {type(e)}') + # This is not really important but it's nice if we have it + self.log.debug(f'Rate calculation ran into {type(e)}: {e}') pass try: @@ -213,15 +214,14 @@ def aggregate_status(self): if self.is_timeout(doc, now): self.status = DAQ_STATUS.TIMEOUT except Exception as e: + self.log.debug(f'Ran into {type(e)}, daq is in timeout. {e}') status = DAQ_STATUS.UNKNOWN statuses[doc['host']] = status - # If we have a crate controller check on it too for doc in self.latest_status[detector]['controller'].values(): - # Copy above. I guess it would be possible to have no readers + phys_det = self.host_config[doc['host']] try: - mode = doc['mode'] status = DAQ_STATUS(doc['status']) if self.is_timeout(doc, now): @@ -231,8 +231,24 @@ def aggregate_status(self): status = DAQ_STATUS.UNKNOWN statuses[doc['host']] = status - mode = doc.get('mode', 'none') - run_num = doc.get('number', -1) + modes.append(doc.get('mode', 'none')) + run_nums.append(doc.get('number', None)) + aggstat[phys_det]['status'] = status + aggstat[phys_det]['mode'] = modes[-1] + aggstat[phys_det]['number'] = run_nums[-1] + + mode = modes[0] + run_num = run_nums[0] + if not _all(modes, mode): + self.log.error(f'Got differing modes: {modes}') + # TODO handle better? + ret = 1 + continue + if not _all(run_nums, run_num): + self.log.error(f'Got differing run numbers: {run_nums}') + # TODO handle better? + ret = 1 + continue if mode != 'none': # readout is "active": a,b = self.get_hosts_for_mode(mode) @@ -242,25 +258,30 @@ def aggregate_status(self): status_list = list(statuses.values()) # Now we aggregate the statuses + # First, the "or" statuses for stat in ['ARMING','ERROR','TIMEOUT','UNKNOWN']: if DAQ_STATUS[stat] in status_list: status = DAQ_STATUS[stat] break else: + # then the "and" statuses for stat in ['IDLE','ARMED','RUNNING']: if _all(status_list, DAQ_STATUS[stat]): status = DAQ_STATUS[stat] break else: + # otherwise status = DAQ_STATUS.UNKNOWN self.latest_status[detector]['status'] = status - self.latest_status[detector]['rate'] = rate - self.latest_status[detector]['mode'] = mode - self.latest_status[detector]['buffer'] = buff self.latest_status[detector]['number'] = run_num - self.latest_status[detector]['pll_unlocks'] = pll + self.latest_status[detector]['mode'] = mode + try: + self.collections['aggregate_status'].insert_many(aggstat.values()) + except Exception as e: + self.log.error(f'DB snafu? Couldn\'t update aggregate status. ' + f'{type(e)}, {e}') return ret def is_timeout(self, doc, t): @@ -270,71 +291,127 @@ def is_timeout(self, doc, t): host = doc['host'] dt = t - int(str(doc['_id'])[:8], 16) has_ackd = self.host_ackd_command(host) + ret = False if dt > self.timeout: self.log.debug(f'{host} last reported {int(dt)} sec ago') - return True + ret |= True if has_ackd is not None and t - has_ackd > self.timeout_take_action: self.log.debug(f'{host} hasn\'t ackd a command from {int(t-has_ackd)} sec ago') if self.host_config[host] == 'tpc': self.hypervisor.handle_timeout(host) - return True - return False + ret |= True + return ret def get_wanted_state(self): - # Aggregate the wanted state per detector from the DB and return a dict + """ + Figure out what the system is supposed to be doing right now + """ try: latest_settings = {} - for detector in 'tpc muon_veto neutron_veto'.split(): + for detector in self.dc: latest = None latest_settings[detector] = {} for key in self.control_keys: doc = self.collections['incoming_commands'].find_one( {'key': f'{detector}.{key}'}, sort=[('_id', -1)]) if doc is None: - self.log.error('No key %s for %s???' % (key, detector)) + self.log.error(f'No key {key} for {detector}???') return None latest_settings[detector][doc['field']] = doc['value'] if latest is None or doc['time'] > latest: latest = doc['time'] latest_settings[detector]['user'] = doc['user'] - self.latest_settings = latest_settings - return self.latest_settings + self.goal_state = latest_settings + return self.goal_state except Exception as e: self.log.debug(f'get_wanted_state failed due to {type(e)} {e}') return None - def get_configured_nodes(self, detector, link_mv, link_nv): - ''' - Get the nodes we want from the config file - ''' - retnodes = [] - retcc = [] - retnodes = list(self.latest_status[detector]['readers'].keys()) - retcc = list(self.latest_status[detector]['controller'].keys()) - if detector == 'tpc' and link_nv == 'true': - retnodes += list(self.latest_status['neutron_veto']['readers'].keys()) - retcc += list(self.latest_status['neutron_veto']['controllers'].keys()) - if detector == 'tpc' and link_mv == 'true': - retnodes += list(self.latest_status['muon_veto']['readers'].keys()) - retcc += list(self.latest_status['muon_veto']['controllers'].keys()) - return retnodes, retcc + def is_linked(self, a, b): + """ + Check if the detectors are in a compatible linked configuration. + """ + mode_a = self.goal_state[a]["mode"] + mode_b = self.goal_state[b]["mode"] + doc_a = self.collections['options'].find_one({'name': mode_a}) + doc_b = self.collections['options'].find_one({'name': mode_b}) + detectors_a = doc_a['detector'] + detectors_b = doc_b['detector'] + + # Check if the linked detectors share the same run mode and + # if they are both present in the detectors list of that mode + return mode_a == mode_b and a in detectors_b and b in detectors_a + + def get_super_detector(self): + """ + Get the Super Detector configuration + if the detectors are in a compatible linked mode. + - case A: tpc, mv and nv all linked + - case B: tpc, mv and nv all un-linked + - case C: tpc and mv linked, nv un-linked + - case D: tpc and nv linked, mv un-linked + - case E: tpc unlinked, mv and nv linked + We will check the compatibility of the linked mode for a pair of detectors per time. + """ + ret = {'tpc': {'controller': list(self.dc['tpc']['controller'].keys()), + 'readers': list(self.dc['tpc']['readers'].keys()), + 'detectors': ['tpc']}} + mv = self.dc['muon_veto'] + nv = self.dc['neutron_veto'] + + tpc_mv = self.is_linked('tpc', 'muon_veto') + tpc_nv = self.is_linked('tpc', 'neutron_veto') + mv_nv = self.is_linked('muon_veto', 'neutron_veto') + + # tpc and muon_veto linked mode + if tpc_mv: + # case A or C + ret['tpc']['controller'] += list(mv['controller'].keys()) + ret['tpc']['readers'] += list(mv['readers'].keys()) + ret['tpc']['detectors'] += ['muon_veto'] + else: + # case B or E + ret['muon_veto'] = {'controller': list(mv['controller'].keys()), + 'readers': list(mv['readers'].keys()), + 'detectors': ['muon_veto']} + if tpc_nv: + # case A or D + ret['tpc']['controller'] += list(nv['controller'].keys()) + ret['tpc']['readers'] += list(nv['readers'].keys()) + ret['tpc']['detectors'] += ['neutron_veto'] + elif mv_nv and not tpc_mv: + # case E + ret['muon_veto']['controller'] += list(nv['controller'].keys()) + ret['muon_veto']['readers'] += list(nv['readers'].keys()) + ret['muon_veto']['detectors'] += ['neutron_veto'] + else: + # case B or C + ret['neutron_veto'] = {'controller': list(nv['controller'].keys()), + 'readers': list(nv['readers'].keys()), + 'detectors': ['neutron_veto']} + + # convert the host lists to dics for later + for det in list(ret.keys()): + ret[det]['controller'] = {c:{} for c in ret[det]['controller']} + ret[det]['readers'] = {c:{} for c in ret[det]['readers']} + return ret def get_run_mode(self, mode): - ''' + """ Pull a run doc from the options collection and add all the includes - ''' + """ if mode is None: return None base_doc = self.collections['options'].find_one({'name': mode}) if base_doc is None: - self.log_error("dispatcher", "Mode '%s' doesn't exist" % mode, "info", "info") + self.log_error("Mode '%s' doesn't exist" % mode, "info", "info") return None if 'includes' not in base_doc or len(base_doc['includes']) == 0: return base_doc try: if self.collections['options'].count_documents({'name': {'$in': base_doc['includes']}}) != len(base_doc['includes']): - self.log_error("dispatcher", "At least one subconfig for mode '%s' doesn't exist" % mode, "warn", "warn") + self.log_error("At least one subconfig for mode '%s' doesn't exist" % mode, "warn", "warn") return None return list(self.collections["options"].aggregate([ {'$match': {'name': mode}}, @@ -351,9 +428,9 @@ def get_run_mode(self, mode): return None def get_hosts_for_mode(self, mode): - ''' + """ Get the nodes we need from the run mode - ''' + """ if mode is None: self.log.debug("Run mode is none?") return [], [] @@ -375,28 +452,29 @@ def get_next_run_number(self): cursor = self.collections["run"].find({},{'number': 1}).sort("number", -1).limit(1) except Exception as e: self.log.error(f'Database is having a moment? {type(e)}, {e}') - return -1 + return NO_NEW_RUN if cursor.count() == 0: self.log.info("wtf, first run?") return 0 return list(cursor)[0]['number']+1 def set_stop_time(self, number, detectors, force): - ''' + """ Sets the 'end' field of the run doc to the time when the STOP command was ack'd - ''' + """ self.log.info(f"Updating run {number} with end time ({detectors})") + if number == -1: + return try: time.sleep(0.5) # this number depends on the CC command polling time - endtime = self.get_ack_time(detectors, 'stop') - if endtime is None: - self.logger.debug(f'No end time found for run {number}') - endtime = datetime.datetime.utcnow()-datetime.timedelta(seconds=1) + if (endtime := self.get_ack_time(detectors, 'stop') ) is None: + self.log.debug(f'No end time found for run {number}') + endtime = now() -datetime.timedelta(seconds=1) query = {"number": int(number), "end": None, 'detectors': detectors} updates = {"$set": {"end": endtime}} if force: updates["$push"] = {"tags": {"name": "_messy", "user": "daq", - "date": datetime.datetime.utcnow()}} + "date": now()}} if self.collections['run'].update_one(query, updates).modified_count == 1: self.log.debug('Update successful') rate = {} @@ -415,23 +493,32 @@ def set_stop_time(self, number, detectors, force): self.log.error(f"Database having a moment, hope this doesn't crash. {type(e)}, {e}") return - def get_ack_time(self, detector, command): + def get_ack_time(self, detector, command, recurse=True): ''' Finds the time when specified detector's crate controller ack'd the specified command ''' + # the first cc is the "master", so its ack time is what counts cc = list(self.latest_status[detector]['controller'].keys())[0] - query = {f'acknowledged.{cc}': {'$ne': 0}, - '_id': self.command_oid[detector][command]} - doc = self.collections['outgoing_commands'].find_one(query) - if doc is not None and not isinstance(doc['acknowledged'][cc], int): - return doc['acknowledged'][cc] - self.log.debug(f'No ACK time for {detector}-{command}') - return None + query = {'host': cc, f'acknowledged.{cc}': {'$ne': 0}, 'command': command} + sort = [('_id', -1)] + doc = self.collections['outgoing_commands'].find_one(query, sort=sort) + dt = (now() - doc['acknowledged'][cc].replace(tzinfo=pytz.utc)).total_seconds() + if dt > 30: # TODO make this a config value + if recurse: + # No way we found the correct command here, maybe we're too soon + self.log.debug(f'Most recent ack for {detector}-{command} is {dt:.1f}?') + time.sleep(2) # if in doubt + return self.get_ack_time(detector, command, False) + else: + # Welp + self.log.debug(f'No recent ack time for {detector}-{command}') + return None + return doc['acknowledged'][cc] def send_command(self, command, hosts, user, detector, mode="", delay=0, force=False): - ''' + """ Send this command to these hosts. If delay is set then wait that amount of time - ''' + """ number = None if command == 'stop' and not self.detector_ackd_command(detector, 'stop'): self.log.error(f"{detector} hasn't ack'd its last stop, let's not flog a dead horse") @@ -440,7 +527,7 @@ def send_command(self, command, hosts, user, detector, mode="", delay=0, force=F try: if command == 'arm': number = self.get_next_run_number() - if number == -1: + if number == NO_NEW_RUN: return -1 self.latest_status[detector]['number'] = number doc_base = { @@ -448,13 +535,13 @@ def send_command(self, command, hosts, user, detector, mode="", delay=0, force=F "user": user, "detector": detector, "mode": mode, - "createdAt": datetime.datetime.utcnow() + "createdAt": now() } if command == 'arm': doc_base['options_override'] = {'number': number} if delay == 0: docs = doc_base - docs['host'] = hosts[0]+hosts[1] if isinstance(hosts, tuple) else hosts + docs['host'] = hosts[0]+hosts[1] docs['acknowledged'] = {h:0 for h in docs['host']} else: docs = [dict(doc_base.items()), dict(doc_base.items())] @@ -473,21 +560,22 @@ def send_command(self, command, hosts, user, detector, mode="", delay=0, force=F return 0 def process_commands(self): - ''' + """ Process our internal command queue - ''' + """ + sort = [('createdAt', 1)] + incoming = self.collections['command_queue'] + outgoing = self.collections['outgoing_commands'] while self.run == True: try: - next_cmd = self.collections['command_queue'].find_one({}, sort=[('createdAt', 1)]) - if next_cmd is None: + if (next_cmd := incoming.find_one({}, sort=sort)) is None: dt = 10 else: - dt = (next_cmd['createdAt'] - datetime.datetime.utcnow()).total_seconds() + dt = (next_cmd['createdAt'].replace(tzinfo=pytz.utc) - now()).total_seconds() if dt < 0.01: oid = next_cmd.pop('_id') - ret = self.collections['outgoing_commands'].insert_one(next_cmd) - self.collections['command_queue'].delete_one({'_id': oid}) - self.command_oid[next_cmd['detector']][next_cmd['command']] = ret.inserted_id + outgoing.insert_one(next_cmd) + incoming.delete_one({'_id': oid}) except Exception as e: dt = 10 self.log.error(f"DB down? {type(e)}, {e}") @@ -496,22 +584,25 @@ def process_commands(self): def host_ackd_command(self, host): """ - Finds the timestamp of the most recent unacknowledged command send to the specified host + Finds the timestamp of the oldest unacknowledged command send to the specified host :param host: str, the process name to check :returns: float, the timestamp of the last unack'd command, or None if none exist """ q = {f'acknowledged.{host}': 0} - if (doc := self.collections['outgoing_commands'].find_one(q, sort=[('_id', 1)])) is None: + sort = [('_id', 1)] + if (doc := self.collections['outgoing_commands'].find_one(q, sort=sort)) is None: return None return doc['createdAt'].timestamp() - def detector_ackd_command(self, detector, command=None): + def detector_ackd_command(self, detector, command): """ Finds when the specified/most recent command was ack'd """ - if (oid := self.command_oid[detector][command]) is None: - return True - if (doc := self.collections['outoing_commands'].find_one({'_id': oid})) is None: + q = {'detector': detector} + sort = [('_id', -1)] + if command is not None: + q['command'] = command + if (doc := self.collections['outgoing_commands'].find_one(q, sort=sort)) is None: self.log.error('No previous command found?') return True for h in doc['host']: @@ -524,7 +615,7 @@ def detector_ackd_command(self, detector, command=None): def log_error(self, message, priority, etype): # Note that etype allows you to define timeouts. - nowtime = datetime.datetime.utcnow() + nowtime = now() if ( (etype in self.error_sent and self.error_sent[etype] is not None) and (etype in self.error_timeouts and self.error_timeouts[etype] is not None) and (nowtime-self.error_sent[etype]).total_seconds() <= self.error_timeouts[etype]): @@ -552,41 +643,35 @@ def get_run_start(self, number): return doc['start'] return None - def insert_run_doc(self, detector, goal_state): + def insert_run_doc(self, detector): - if (number := self.get_next_run_number()) == -1: + if (number := self.get_next_run_number()) == NO_NEW_RUN: self.log.error("DB having a moment") return -1 - self.latest_status[detector]['number'] = number - detectors = [detector] - if detector == 'tpc' and goal_state['tpc']['link_nv'] == 'true': - self.latest_status['neutron_veto']['number'] = number - detectors.append('neutron_veto') - if detector == 'tpc' and goal_state['tpc']['link_mv'] == 'true': - self.latest_status['muon_veto']['number'] = number - detectors.append('muon_veto') + # the rundoc gets the physical detectors, not the logical + detectors = self.goal_state[detector]['detectors'] run_doc = { "number": number, 'detectors': detectors, - 'user': goal_state[detector]['user'], - 'mode': goal_state[detector]['mode'], + 'user': self.goal_state[detector]['user'], + 'mode': self.goal_state[detector]['mode'], 'bootstrax': {'state': None}, 'end': None } # If there's a source add the source. Also add the complete ini file. - cfg = self.get_run_mode(goal_state[detector]['mode']) + cfg = self.get_run_mode(self.goal_state[detector]['mode']) if cfg is not None and 'source' in cfg.keys(): run_doc['source'] = {'type': cfg['source']} run_doc['daq_config'] = cfg # If the user started the run with a comment add that too - if "comment" in goal_state[detector] and goal_state[detector]['comment'] != "": + if "comment" in self.goal_state[detector] and self.goal_state[detector]['comment'] != "": run_doc['comments'] = [{ - "user": goal_state[detector]['user'], - "date": datetime.datetime.utcnow(), - "comment": goal_state[detector]['comment'] + "user": self.goal_state[detector]['user'], + "date": now(), + "comment": self.goal_state[detector]['comment'] }] # Make a data entry so bootstrax can find the thing @@ -597,17 +682,25 @@ def insert_run_doc(self, detector, goal_state): 'location': cfg['strax_output_path'] }] + # The cc needs some time to get started + time.sleep(self.cc_start_wait) try: - time.sleep(2) start_time = self.get_ack_time(detector, 'start') - if start_time is None: - start_time = datetime.datetime.utcnow()-datetime.timedelta(seconds=2) - run_doc['tags'] = [{'name': 'messy', 'user': 'daq', 'date': start_time}] - run_doc['start'] = start_time + except Exception as e: + self.log.error('Couldn\'t find start time ack') + start_time = None + + if start_time is None: + start_time = now()-datetime.timedelta(seconds=2) + # if we miss the ack time, we don't really know when the run started + # so may as well tag it + run_doc['tags'] = [{'name': 'messy', 'user': 'daq', 'date': start_time}] + run_doc['start'] = start_time + try: self.collections['run'].insert_one(run_doc) except Exception as e: self.log.error(f'Database having a moment: {type(e)}, {e}') return -1 - return number + return None diff --git a/dispatcher/config.ini b/dispatcher/config.ini index 46f826fe..c491ae29 100644 --- a/dispatcher/config.ini +++ b/dispatcher/config.ini @@ -44,7 +44,7 @@ StartCmdDelay = 1 StopCmdDelay = 5 # these are the control keys to look for -ControlKeys = active comment link_mv link_nv mode softstop stop_after +ControlKeys = active comment mode softstop stop_after # Declare detector configuration here. Each detector's top level # key is its system-wide name identifier. Under the top-level @@ -55,8 +55,7 @@ MasterDAQConfig = { "readers": [ "reader0_reader_0", "reader1_reader_0", - "reader2_reader_0", - "reader3_reader_0" + "reader2_reader_0" ] }, "muon_veto": { @@ -69,13 +68,14 @@ MasterDAQConfig = { } } +# Addresses for the VME crates VMEConfig = { "0": "192.168.131.60", "1": "192.168.131.61", "2": "192.168.131.62", "3": "192.168.131.63", "4": "192.168.131.64" -} + } [TESTING] LogName = dispatcher_test @@ -93,7 +93,7 @@ RetryReset = 3 TimeBetweenCommands = 6 StartCmdDelay = 1 StopCmdDelay = 1 -ControlKeys = active comment link_mv link_nv mode softstop stop_after +ControlKeys = active comment mode softstop stop_after MasterDAQConfig = { "tpc": {"controller": ["reader4_controller_0"], "readers": ["reader4_reader_0"]}, "muon_veto": {"controller": ["reader4_controller_1"], "readers": ["reader4_reader_1"]}, diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 66e5dff1..0b69d30b 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -11,14 +11,14 @@ from MongoConnect import MongoConnect from DAQController import DAQController - +from .MongoConnect import NO_NEW_RUN def main(): # Parse command line parser = argparse.ArgumentParser(description='Manage the DAQ') parser.add_argument('--config', type=str, help='Path to your configuration file', - default='config.ini') + default='config_test.ini') parser.add_argument('--log', type=str, help='Logging level', default='DEBUG', choices=['DEBUG','INFO','WARNING','ERROR','CRITICAL']) parser.add_argument('--test', action='store_true', help='Are you testing?') @@ -26,7 +26,7 @@ def main(): config = configparser.ConfigParser() config.read(args.config) config = config['DEFAULT' if not args.test else "TESTING"] - config['MasterDAQConfig'] = json.loads(config['MasterDAQConfig']) + daq_config = json.loads(config['MasterDAQConfig']) control_mc = daqnt.get_client('daq') runs_mc = daqnt.get_client('runs') logger = daqnt.get_daq_logger(config['LogName'], level=args.log, mc=control_mc) @@ -34,10 +34,10 @@ def main(): # Declare necessary classes sh = daqnt.SignalHandler() - hypervisor = daqnt.Hypervisor(control_mc[config['ControlDatabaseName']], logger, - config['MasterDAQConfig']['tpc'], vme_config, sh=sh, testing=args.test) - MongoConnector = MongoConnect(config, logger, control_mc, runs_mc, hypervisor, args.test) - DAQControl = DAQController(config, MongoConnector, logger, hypervisor) + Hypervisor = daqnt.Hypervisor(control_mc[config['ControlDatabaseName']], logger, + daq_config['tpc'], vme_config, sh=sh, testing=args.test) + MongoConnector = MongoConnect(config, daq_config, logger, control_mc, runs_mc, Hypervisor, args.test) + DAQControl = DAQController(config, daq_config, MongoConnector, logger, Hypervisor) # connect the triangle hypervisor.mongo_connect = MongoConnector hypervisor.daq_controller = DAQControl @@ -46,32 +46,30 @@ def main(): logger.info('Dispatcher starting up') - while(sh.event.is_set() == False): + while sh.event.is_set() == False: sh.event.wait(sleep_period) - # Get most recent check-in from all connected hosts - if MongoConnector.get_update(): - continue - latest_status = MongoConnector.latest_status - # Get most recent goal state from database. Users will update this from the website. if (goal_state := MongoConnector.get_wanted_state()) is None: continue + # Get the Super-Detector configuration + current_config = MongoConnector.get_super_detector() + # Get most recent check-in from all connected hosts + if (latest_status := MongoConnector.get_update(current_config)) is None: + continue # Print an update for detector in latest_status.keys(): state = 'ACTIVE' if goal_state[detector]['active'] == 'true' else 'INACTIVE' msg = (f'The {detector} should be {state} and is ' f'{latest_status[detector]["status"].name}') - if latest_status[detector]['number'] != -1: + # TODO add statement about linking + if latest_status[detector]['number'] != NO_NEW_RUN: msg += f' ({latest_status[detector]["number"]})' logger.debug(msg) # Decision time. Are we actually in our goal state? If not what should we do? DAQControl.solve_problem(latest_status, goal_state) - # Time to report back - MongoConnector.update_aggregate_status() - MongoConnector.quit() return