diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index e5c08160..fd763466 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -80,7 +80,7 @@ def __init__(self, config, daq_config, log, control_mc, runs_mc, hypervisor, tes # Timeout (in seconds). How long must a node not report to be considered timing out self.timeout = int(config['ClientTimeout']) - # How long a node can be timing out before it gets fixed (TPC only) + # How long a node can be timing out or missed an ack before it gets fixed (TPC only) self.timeout_take_action = int(config['TimeoutActionThreshold']) # how long to give the CC to start the run @@ -211,17 +211,8 @@ def aggregate_status(self): try: status = DAQ_STATUS(doc['status']) - dt = (time_time - int(str(doc['_id'])[:8], 16)) - if dt > self.timeout: - self.log.debug(f'{doc["host"]} reported {int(dt)} sec ago') - status = DAQ_STATUS.TIMEOUT - if phys_det == 'tpc': - if (dt > self.timeout_take_action or - ((ts := self.host_ackd_command(doc['host'])) is not None and - ts-time_time > self.timeout)): - self.log.info(f'{doc["host"]} is getting restarted') - self.hypervisor.handle_timeout(doc['host']) - ret = 1 + if self.is_timeout(doc, now): + self.status = DAQ_STATUS.TIMEOUT except Exception as e: self.log.debug(f'Ran into {type(e)}, daq is in timeout. {e}') status = DAQ_STATUS.UNKNOWN @@ -233,18 +224,8 @@ def aggregate_status(self): try: status = DAQ_STATUS(doc['status']) - dt = (time_time - int(str(doc['_id'])[:8], 16)) - doc['last_checkin'] = dt - if dt > self.timeout: - self.log.debug(f'{doc["host"]} reported {int(dt)} sec ago') + if self.is_timeout(doc, now): status = DAQ_STATUS.TIMEOUT - if phys_det == 'tpc': - if (dt > self.timeout_take_action or - ((ts := self.host_ackd_command(doc['host'])) is not None and - ts-time_time > self.timeout)): - self.log.info(f'{doc["host"]} is getting restarted') - self.hypervisor.handle_timeout(doc['host']) - ret = 1 except Exception as e: self.log.debug(f'Setting status to unknown because of {type(e)}: {e}') status = DAQ_STATUS.UNKNOWN @@ -303,6 +284,24 @@ def aggregate_status(self): f'{type(e)}, {e}') return ret + def is_timeout(self, doc, t): + """ + Checks to see if the specified status doc corresponds to a timeout situation + """ + host = doc['host'] + dt = t - int(str(doc['_id'])[:8], 16) + has_ackd = self.host_ackd_command(host) + ret = False + if dt > self.timeout: + self.log.debug(f'{host} last reported {int(dt)} sec ago') + ret |= True + if has_ackd is not None and t - has_ackd > self.timeout_take_action: + self.log.debug(f'{host} hasn\'t ackd a command from {int(t-has_ackd)} sec ago') + if self.host_config[host] == 'tpc': + self.hypervisor.handle_timeout(host) + ret |= True + return ret + def get_wanted_state(self): """ Figure out what the system is supposed to be doing right now diff --git a/dispatcher/config.ini b/dispatcher/config.ini index 6e82107b..c491ae29 100644 --- a/dispatcher/config.ini +++ b/dispatcher/config.ini @@ -12,7 +12,7 @@ PollFrequency = 3 # it to be 'timing out' ClientTimeout = 10 -# How long a client can be timing out before action gets taken (TPC only) +# How long a client can be timing out or missed an ack before action gets taken (TPC only) TimeoutActionThreshold = 20 # Database and collection names diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 984672eb..0b69d30b 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -38,6 +38,9 @@ def main(): daq_config['tpc'], vme_config, sh=sh, testing=args.test) MongoConnector = MongoConnect(config, daq_config, logger, control_mc, runs_mc, Hypervisor, args.test) DAQControl = DAQController(config, daq_config, MongoConnector, logger, Hypervisor) + # connect the triangle + hypervisor.mongo_connect = MongoConnector + hypervisor.daq_controller = DAQControl sleep_period = int(config['PollFrequency'])