diff --git a/.gitignore b/.gitignore index dcab4d1..4c67c9e 100644 --- a/.gitignore +++ b/.gitignore @@ -137,3 +137,6 @@ Icon Network Trash Folder Temporary Items .apdisk + +##JetBrain Stuff +.idea \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index bd6d577..29fc8fe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,6 +7,6 @@ COPY requirements.txt . RUN pip install -r requirements.txt -COPY . . +COPY falcon falcon CMD ["gunicorn", "falcon.run:app", "-b 0.0.0.0:8000"] diff --git a/falcon/queue_handler.py b/falcon/queue_handler.py index 07b998b..653d1a9 100644 --- a/falcon/queue_handler.py +++ b/falcon/queue_handler.py @@ -11,7 +11,7 @@ from falcon import settings logging.basicConfig(level=logging.INFO) -logger = logging.getLogger('falcon.{module_path}'.format(module_path=__name__)) +logger = logging.getLogger("falcon.{module_path}".format(module_path=__name__)) class Workflow(object): @@ -72,15 +72,15 @@ def __init__(self, config_path): self.thread = None self.settings = settings.get_settings(config_path) self.cromwell_auth = settings.get_cromwell_auth(self.settings) - self.queue_update_interval = self.settings.get('queue_update_interval') - self.cromwell_query_dict = self.settings.get('cromwell_query_dict') + self.queue_update_interval = self.settings.get("queue_update_interval") + self.cromwell_query_dict = self.settings.get("cromwell_query_dict") def spawn_and_start(self): """ Starts the thread, which is an instance variable. If thread has not been created, spawns it and then starts it. """ if not self.thread: - self.thread = Thread(target=self.execution_loop, name='queueHandler') + self.thread = Thread(target=self.execution_loop, name="queueHandler") self.thread.start() def join(self): @@ -90,20 +90,22 @@ def join(self): try: self.thread.join() except (AttributeError, AssertionError): - logger.error('The thread of this queue handler is not in a running state.') + logger.error("The thread of this queue handler is not in a running state.") def execution_loop(self): logger.info( - 'QueueHandler | Initializing the queue handler with thread => {0} | {1}'.format( + "QueueHandler | Initializing the queue handler with thread => {0} | {1}".format( get_ident(), datetime.now() ) ) while True: + self.report_my_status() # Execute first to generate new handler_status.html self.execution_event() def execution_event(self): + logger.info( - 'QueueHandler | QueueHandler thread {0} is warmed up and running. | {1}'.format( + "QueueHandler | QueueHandler thread {0} is warmed up and running. | {1}".format( get_ident(), datetime.now() ) ) @@ -119,8 +121,8 @@ def execution_event(self): self.enqueue(workflows) else: logger.info( - 'QueueHandler | Cannot fetch any workflow from Cromwell, go back to sleep and wait for next ' - 'attempt. | {0}'.format(datetime.now()) + "QueueHandler | Cannot fetch any workflow from Cromwell, go back to sleep and wait for next " + "attempt. | {0}".format(datetime.now()) ) self.sleep_for(self.queue_update_interval) @@ -156,32 +158,32 @@ def retrieve_workflows(self, query_dict): ``` """ workflow_metas = None - query_dict['additionalQueryResultFields'] = 'labels' + query_dict["additionalQueryResultFields"] = "labels" try: response = CromwellAPI.query(auth=self.cromwell_auth, query_dict=query_dict) if response.status_code != 200: logger.warning( - 'QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}'.format( + "QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}".format( response.text, datetime.now() ) ) else: - workflow_metas = response.json()['results'] + workflow_metas = response.json()["results"] num_workflows = len(workflow_metas) logger.info( - 'QueueHandler | Retrieved {0} workflows from Cromwell. | {1}'.format( + "QueueHandler | Retrieved {0} workflows from Cromwell. | {1}".format( num_workflows, datetime.now() ) ) logger.debug( - 'QueueHandler | {0} | {1}'.format(workflow_metas, datetime.now()) + "QueueHandler | {0} | {1}".format(workflow_metas, datetime.now()) ) # TODO: remove this or not? except ( requests.exceptions.ConnectionError, requests.exceptions.RequestException, ) as error: logger.error( - 'QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}'.format( + "QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}".format( error, datetime.now() ) ) @@ -235,7 +237,7 @@ def enqueue(self, workflows): """ for workflow in workflows: logger.debug( - 'QueueHandler | Enqueuing workflow {0} | {1}'.format( + "QueueHandler | Enqueuing workflow {0} | {1}".format( workflow, datetime.now() ) ) @@ -293,15 +295,15 @@ def _assemble_workflow(workflow_meta): Returns: Workflow: A concrete `Workflow` instance that has necessary properties. """ - workflow_id = workflow_meta.get('id') - workflow_labels = workflow_meta.get('labels') + workflow_id = workflow_meta.get("id") + workflow_labels = workflow_meta.get("labels") workflow_bundle_uuid = ( - workflow_labels.get('bundle-uuid') + workflow_labels.get("bundle-uuid") if isinstance(workflow_labels, dict) else None ) workflow_bundle_version = ( - workflow_labels.get('bundle-version') + workflow_labels.get("bundle-version") if isinstance(workflow_labels, dict) else None ) @@ -344,20 +346,20 @@ def is_workflow_list_in_oldest_first_order(workflow_list): Returns: bool: The return value. True if the workflow_list is sorted oldest first, False otherwise. """ - CROMWELL_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' + CROMWELL_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" try: head = datetime.strptime( - str(workflow_list[0].get('submission')), CROMWELL_DATETIME_FORMAT + str(workflow_list[0].get("submission")), CROMWELL_DATETIME_FORMAT ) tail = datetime.strptime( - str(workflow_list[-1].get('submission')), CROMWELL_DATETIME_FORMAT + str(workflow_list[-1].get("submission")), CROMWELL_DATETIME_FORMAT ) return head <= tail except ValueError: logger.error( - 'Queue | An error happened when try to parse the submission timestamps, will assume oldest first ' - 'for' - ' the workflows returned from Cromwell | {0}'.format(datetime.now()) + "Queue | An error happened when try to parse the submission timestamps, will assume oldest first " + "for" + " the workflows returned from Cromwell | {0}".format(datetime.now()) ) return True @@ -381,3 +383,34 @@ def deep_deduplicate(): This deep de-duplication should search the given bundle-uuid and bundle-version in the whole history. """ return NotImplemented + + @staticmethod + def report_my_status(): + """ + Write a report_status.html file containing the timestamp when it ran + """ + try: + # Get timestamp now + # Converting datetime object to string + dateTimeObj = datetime.now() + timestampStr = dateTimeObj.strftime("%d-%b-%Y (%H:%M:%S)") + + # Update file report_status.html with time so we can get the info from a curl + f = open(settings.docRootPath + settings.docRootFile, "w") + + # content of html file with timestamp + header = """


+ """ + body = "Time when report my status was generated: " + timestampStr + footer = """ +



""" + + f.write("{0}{1}{2}".format(header, body, footer)) + f.close() + logger.info("QueueHandler | QueueHandler report status ran successfully ") + except Exception as exc: + logger.warning( + "QueueHandler | QueueHandler report Status failed with Exception: | {0}".format( + exc + ) + ) diff --git a/falcon/routes.py b/falcon/routes.py index 403830f..28c2575 100644 --- a/falcon/routes.py +++ b/falcon/routes.py @@ -1,20 +1,35 @@ -import json -import threading +from datetime import datetime, timedelta +import os from flask import abort - -FALCON_THREAD_NAMES = ('queueHandler', 'igniter') +from flask import render_template +from falcon.settings import docRootPath, docRootFile, MAX_DELAY def status(): - active_threads = {thread.name: thread for thread in threading.enumerate()} - active_falcon_threads = {} - for falcon_thread_name in FALCON_THREAD_NAMES: - thread = active_threads.get(falcon_thread_name) - if not thread: - abort(500) - elif not thread.is_alive(): - abort(500) + """ + This function reads a status report file creation date and + Compares it to the current time + Returns: render html file or abort (HTTP code 500) if time + difference is greater than max delay + """ + + try: + # Get TimeStamp + now = datetime.today() + + # read status report.html modified datetime + file_mod_time = datetime.fromtimestamp( + os.stat(docRootPath + docRootFile).st_mtime + ) # This is a datetime.datetime object! + + # Define max delay to 5 mins + max_delay = timedelta(minutes=MAX_DELAY) + + # if reached max delay abort else render status report file + if now - file_mod_time > max_delay: + abort(500, 'reached max delay') else: - display_name = '{}-thread'.format(falcon_thread_name) - active_falcon_threads[display_name] = str(thread.ident) - return json.dumps(active_falcon_threads) + return render_template(docRootFile) + + except Exception as exc: + abort(500, exc) diff --git a/falcon/run.py b/falcon/run.py index a900209..c5efe5c 100644 --- a/falcon/run.py +++ b/falcon/run.py @@ -4,14 +4,29 @@ from falcon.queue_handler import QueueHandler from falcon.routes import status -app = Flask(__name__) -app.add_url_rule("/health", "health", status) +app = Flask(__name__, static_url_path='') + +# prevent cached responses +# TODO : Does not seem to prevent caching issue when using curl http://localhost:8000/health in the container. +@app.after_request +def add_header(r): + """ + Add headers to both force latest IE rendering engine or Chrome Frame, + removing caching + """ + r.headers["Cache-Control"] = "no-store, max-age=0" + r.headers["Pragma"] = "no-cache" + return r +# Define endpoint and assign to status function +app.add_url_rule("/health", "health", status) + config_path = os.environ.get('CONFIG_PATH') handler = QueueHandler(config_path) # instantiate a concrete `QueueHandler` igniter = Igniter(config_path) # instantiate a concrete `Igniter` + handler.spawn_and_start() # start the thread within the handler igniter.spawn_and_start( handler diff --git a/falcon/settings.py b/falcon/settings.py index a74b564..aa2f798 100644 --- a/falcon/settings.py +++ b/falcon/settings.py @@ -2,6 +2,15 @@ import os from cromwell_tools.cromwell_auth import CromwellAuth +# Global variables for the report status +docRootPath = "falcon/templates/" # Path to Flask docRoot - in the repo +docRootFile = ( + "handler_status.html" # file to be returned to liveness Probe - in the repo +) +MAX_DELAY: int = ( + 5 +) # we consider if docRootFile is older than 5 mins then the Thread is Frozen + def get_settings(config_path): """This function loads the config.json file based on the path and return the assembled settings dictionary. @@ -21,51 +30,51 @@ def get_settings(config_path): workflow_start_interval (int): The sleep time between each time the igniter starts a workflow in Cromwell. cromwell_query_dict (dict): The query used for retrieving cromwell workflows """ - with open(config_path, 'r') as f: + with open(config_path, "r") as f: settings = json.load(f) # Check Cromwell url - if not settings['cromwell_url']: - raise ValueError('A Cromwell URL is required.') + if not settings["cromwell_url"]: + raise ValueError("A Cromwell URL is required.") # Check auth parameters - if settings['use_caas']: - if not settings['collection_name']: + if settings["use_caas"]: + if not settings["collection_name"]: raise ValueError( - 'To use the Cromwell-as-a-Service, you have to pass in a valid collection name.' + "To use the Cromwell-as-a-Service, you have to pass in a valid collection name." ) - caas_key = os.environ.get('caas_key') or os.environ.get('CAAS_KEY') + caas_key = os.environ.get("caas_key") or os.environ.get("CAAS_KEY") if not caas_key: raise ValueError( - 'No service account json key provided for cromwell-as-a-service.' + "No service account json key provided for cromwell-as-a-service." ) else: - settings['caas_key'] = caas_key + settings["caas_key"] = caas_key # Check other config parameters - settings['queue_update_interval'] = int(settings.get('queue_update_interval', 1)) - settings['workflow_start_interval'] = int( - settings.get('workflow_start_interval', 1) + settings["queue_update_interval"] = int(settings.get("queue_update_interval", 1)) + settings["workflow_start_interval"] = int( + settings.get("workflow_start_interval", 1) ) # Check cromwell query parameters - query_dict = settings.get('cromwell_query_dict', {}) - if ('status', 'On Hold') not in query_dict.items(): - query_dict.update({'status': 'On Hold'}) - settings['cromwell_query_dict'] = query_dict + query_dict = settings.get("cromwell_query_dict", {}) + if ("status", "On Hold") not in query_dict.items(): + query_dict.update({"status": "On Hold"}) + settings["cromwell_query_dict"] = query_dict return settings def get_cromwell_auth(settings): - cromwell_url = settings.get('cromwell_url') - if settings.get('use_caas'): + cromwell_url = settings.get("cromwell_url") + if settings.get("use_caas"): return CromwellAuth.harmonize_credentials( - url=cromwell_url, service_account_key=settings.get('caas_key') + url=cromwell_url, service_account_key=settings.get("caas_key") ) return CromwellAuth.harmonize_credentials( url=cromwell_url, - username=settings.get('cromwell_user'), - password=settings.get('cromwell_password'), + username=settings.get("cromwell_user"), + password=settings.get("cromwell_password"), ) diff --git a/falcon/templates/return_status.html b/falcon/templates/return_status.html new file mode 100644 index 0000000..d7e52dd --- /dev/null +++ b/falcon/templates/return_status.html @@ -0,0 +1,12 @@ + + +

Timestamp should be Here + ############### NOTES ##################### + For some caching issue it might not show up + Running : Curl -I http://localhost:8000/health + Result should confirm cache headers are correct (no-cache, etc) + checking the content of the file on he disk should confirm this is a cache issue + cat /falcon/templates/handler_status.html + ########################################### +

+ \ No newline at end of file