From 2ec585be9995eb42e26bc77c93d3c880ff7da5d1 Mon Sep 17 00:00:00 2001 From: Darryl Masson Date: Wed, 24 Mar 2021 12:47:48 +0100 Subject: [PATCH 1/3] Improved timeout logic --- dispatcher/MongoConnect.py | 44 +++++++++++++++++++------------------- dispatcher/config.ini | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 41cada5a..bc743880 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -67,6 +67,9 @@ def __init__(self, config, log, control_mc, runs_mc, hypervisor, testing=False): # Timeout (in seconds). How long must a node not report to be considered timing out self.timeout = int(config['ClientTimeout']) + # How long (in seconds) a host has to ack a command before it gets restarted (TPC only) + self.ack_timeout = int(config['ClientAckTimeout']) + # How long a node can be timing out before it gets fixed (TPC only) self.timeout_take_action = int(config['TimeoutActionThreshold']) @@ -210,17 +213,8 @@ def aggregate_status(self): try: status = DAQ_STATUS(doc['status']) - dt = (now - int(str(doc['_id'])[:8], 16)) - if dt > self.timeout: - self.log.debug(f'{doc["host"]} reported {int(dt)} sec ago') - status = DAQ_STATUS.TIMEOUT - if self.host_config[doc['host']] == 'tpc': - if (dt > self.timeout_take_action or - ((ts := self.host_ackd_command(doc['host'])) is not None and - ts-now > self.timeout)): - self.log.info(f'{doc["host"]} is getting restarted') - self.hypervisor.handle_timeout(doc['host']) - ret = 1 + if self.is_timeout(doc, now): + self.status = DAQ_STATUS.TIMEOUT except Exception as e: status = DAQ_STATUS.UNKNOWN @@ -233,18 +227,8 @@ def aggregate_status(self): mode = doc['mode'] status = DAQ_STATUS(doc['status']) - dt = (now - int(str(doc['_id'])[:8], 16)) - doc['last_checkin'] = dt - if dt > self.timeout: - self.log.debug(f'{doc["host"]} reported {int(dt)} sec ago') + if self.is_timeout(doc, now): status = DAQ_STATUS.TIMEOUT - if self.host_config[doc['host']] == 'tpc': - if (dt > self.timeout_take_action or - ((ts := self.host_ackd_command(doc['host'])) is not None and - ts-now > self.timeout)): - self.log.info(f'{doc["host"]} is getting restarted') - self.hypervisor.handle_timeout(doc['host']) - ret = 1 except Exception as e: self.log.debug(f'Setting status to unknown because of {type(e)}: {e}') status = DAQ_STATUS.UNKNOWN @@ -282,6 +266,22 @@ def aggregate_status(self): return ret + def is_timeout(self, doc, t): + """ + Checks to see if the specified status doc corresponds to a timeout situation + """ + host = doc['host'] + dt = t - int(str(doc['_id'])[:8], 16) + has_ackd = self.host_ackd_command(host) + if dt > self.timeout: + self.log.debug(f'{host} last reported {int(dt)} sec ago') + return True + if has_ackd is not None and t - has_ackd > self.ack_timeout: + self.log.debug(f'{host} hasn\'t ackd a command from {int(t-has_ackd)} sec ago') + if self.host_config[host] == 'tpc': + self.hypervisor.handle_timeout(host) + return True + return False def get_wanted_state(self): # Aggregate the wanted state per detector from the DB and return a dict diff --git a/dispatcher/config.ini b/dispatcher/config.ini index f373f5f6..46f826fe 100644 --- a/dispatcher/config.ini +++ b/dispatcher/config.ini @@ -12,7 +12,7 @@ PollFrequency = 3 # it to be 'timing out' ClientTimeout = 10 -# How long a client can be timing out before action gets taken (TPC only) +# How long a client can be timing out or missed an ack before action gets taken (TPC only) TimeoutActionThreshold = 20 # Database and collection names From 129cc0529eed85ec8e3981af3586273589f61de4 Mon Sep 17 00:00:00 2001 From: Darryl Masson Date: Wed, 24 Mar 2021 12:48:17 +0100 Subject: [PATCH 2/3] Better comments --- dispatcher/MongoConnect.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index bc743880..f1053352 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -67,10 +67,7 @@ def __init__(self, config, log, control_mc, runs_mc, hypervisor, testing=False): # Timeout (in seconds). How long must a node not report to be considered timing out self.timeout = int(config['ClientTimeout']) - # How long (in seconds) a host has to ack a command before it gets restarted (TPC only) - self.ack_timeout = int(config['ClientAckTimeout']) - - # How long a node can be timing out before it gets fixed (TPC only) + # How long a node can be timing out or missed an ack before it gets fixed (TPC only) self.timeout_take_action = int(config['TimeoutActionThreshold']) # Which control keys do we look for? @@ -276,7 +273,7 @@ def is_timeout(self, doc, t): if dt > self.timeout: self.log.debug(f'{host} last reported {int(dt)} sec ago') return True - if has_ackd is not None and t - has_ackd > self.ack_timeout: + if has_ackd is not None and t - has_ackd > self.timeout_take_action: self.log.debug(f'{host} hasn\'t ackd a command from {int(t-has_ackd)} sec ago') if self.host_config[host] == 'tpc': self.hypervisor.handle_timeout(host) From 1a9fb6d27f46472ed81fac962b3001399308557d Mon Sep 17 00:00:00 2001 From: Darryl Masson Date: Fri, 26 Mar 2021 10:41:31 +0100 Subject: [PATCH 3/3] Work --- dispatcher/dispatcher.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 0c49e3f9..66e5dff1 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -34,10 +34,13 @@ def main(): # Declare necessary classes sh = daqnt.SignalHandler() - Hypervisor = daqnt.Hypervisor(control_mc[config['ControlDatabaseName']], logger, + hypervisor = daqnt.Hypervisor(control_mc[config['ControlDatabaseName']], logger, config['MasterDAQConfig']['tpc'], vme_config, sh=sh, testing=args.test) - MongoConnector = MongoConnect(config, logger, control_mc, runs_mc, Hypervisor, args.test) - DAQControl = DAQController(config, MongoConnector, logger, Hypervisor) + MongoConnector = MongoConnect(config, logger, control_mc, runs_mc, hypervisor, args.test) + DAQControl = DAQController(config, MongoConnector, logger, hypervisor) + # connect the triangle + hypervisor.mongo_connect = MongoConnector + hypervisor.daq_controller = DAQControl sleep_period = int(config['PollFrequency'])