Skip to content

Commit

Permalink
Merge pull request #42 from HumanCellAtlas/FT-GH-675-FlaconLivenessFix
Browse files Browse the repository at this point in the history
Ft gh 675 flacon liveness fix
  • Loading branch information
Fab-T authored Mar 10, 2020
2 parents 0b4926c + 9baf560 commit 6999d00
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 65 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,6 @@ Icon
Network Trash Folder
Temporary Items
.apdisk

##JetBrain Stuff
.idea
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ COPY requirements.txt .

RUN pip install -r requirements.txt

COPY . .
COPY falcon falcon

CMD ["gunicorn", "falcon.run:app", "-b 0.0.0.0:8000"]
85 changes: 59 additions & 26 deletions falcon/queue_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from falcon import settings

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('falcon.{module_path}'.format(module_path=__name__))
logger = logging.getLogger("falcon.{module_path}".format(module_path=__name__))


class Workflow(object):
Expand Down Expand Up @@ -72,15 +72,15 @@ def __init__(self, config_path):
self.thread = None
self.settings = settings.get_settings(config_path)
self.cromwell_auth = settings.get_cromwell_auth(self.settings)
self.queue_update_interval = self.settings.get('queue_update_interval')
self.cromwell_query_dict = self.settings.get('cromwell_query_dict')
self.queue_update_interval = self.settings.get("queue_update_interval")
self.cromwell_query_dict = self.settings.get("cromwell_query_dict")

def spawn_and_start(self):
"""
Starts the thread, which is an instance variable. If thread has not been created, spawns it and then starts it.
"""
if not self.thread:
self.thread = Thread(target=self.execution_loop, name='queueHandler')
self.thread = Thread(target=self.execution_loop, name="queueHandler")
self.thread.start()

def join(self):
Expand All @@ -90,20 +90,22 @@ def join(self):
try:
self.thread.join()
except (AttributeError, AssertionError):
logger.error('The thread of this queue handler is not in a running state.')
logger.error("The thread of this queue handler is not in a running state.")

def execution_loop(self):
logger.info(
'QueueHandler | Initializing the queue handler with thread => {0} | {1}'.format(
"QueueHandler | Initializing the queue handler with thread => {0} | {1}".format(
get_ident(), datetime.now()
)
)
while True:
self.report_my_status() # Execute first to generate new handler_status.html
self.execution_event()

def execution_event(self):

logger.info(
'QueueHandler | QueueHandler thread {0} is warmed up and running. | {1}'.format(
"QueueHandler | QueueHandler thread {0} is warmed up and running. | {1}".format(
get_ident(), datetime.now()
)
)
Expand All @@ -119,8 +121,8 @@ def execution_event(self):
self.enqueue(workflows)
else:
logger.info(
'QueueHandler | Cannot fetch any workflow from Cromwell, go back to sleep and wait for next '
'attempt. | {0}'.format(datetime.now())
"QueueHandler | Cannot fetch any workflow from Cromwell, go back to sleep and wait for next "
"attempt. | {0}".format(datetime.now())
)
self.sleep_for(self.queue_update_interval)

Expand Down Expand Up @@ -156,32 +158,32 @@ def retrieve_workflows(self, query_dict):
```
"""
workflow_metas = None
query_dict['additionalQueryResultFields'] = 'labels'
query_dict["additionalQueryResultFields"] = "labels"
try:
response = CromwellAPI.query(auth=self.cromwell_auth, query_dict=query_dict)
if response.status_code != 200:
logger.warning(
'QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}'.format(
"QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}".format(
response.text, datetime.now()
)
)
else:
workflow_metas = response.json()['results']
workflow_metas = response.json()["results"]
num_workflows = len(workflow_metas)
logger.info(
'QueueHandler | Retrieved {0} workflows from Cromwell. | {1}'.format(
"QueueHandler | Retrieved {0} workflows from Cromwell. | {1}".format(
num_workflows, datetime.now()
)
)
logger.debug(
'QueueHandler | {0} | {1}'.format(workflow_metas, datetime.now())
"QueueHandler | {0} | {1}".format(workflow_metas, datetime.now())
) # TODO: remove this or not?
except (
requests.exceptions.ConnectionError,
requests.exceptions.RequestException,
) as error:
logger.error(
'QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}'.format(
"QueueHandler | Failed to retrieve workflows from Cromwell | {0} | {1}".format(
error, datetime.now()
)
)
Expand Down Expand Up @@ -235,7 +237,7 @@ def enqueue(self, workflows):
"""
for workflow in workflows:
logger.debug(
'QueueHandler | Enqueuing workflow {0} | {1}'.format(
"QueueHandler | Enqueuing workflow {0} | {1}".format(
workflow, datetime.now()
)
)
Expand Down Expand Up @@ -293,15 +295,15 @@ def _assemble_workflow(workflow_meta):
Returns:
Workflow: A concrete `Workflow` instance that has necessary properties.
"""
workflow_id = workflow_meta.get('id')
workflow_labels = workflow_meta.get('labels')
workflow_id = workflow_meta.get("id")
workflow_labels = workflow_meta.get("labels")
workflow_bundle_uuid = (
workflow_labels.get('bundle-uuid')
workflow_labels.get("bundle-uuid")
if isinstance(workflow_labels, dict)
else None
)
workflow_bundle_version = (
workflow_labels.get('bundle-version')
workflow_labels.get("bundle-version")
if isinstance(workflow_labels, dict)
else None
)
Expand Down Expand Up @@ -344,20 +346,20 @@ def is_workflow_list_in_oldest_first_order(workflow_list):
Returns:
bool: The return value. True if the workflow_list is sorted oldest first, False otherwise.
"""
CROMWELL_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
CROMWELL_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
try:
head = datetime.strptime(
str(workflow_list[0].get('submission')), CROMWELL_DATETIME_FORMAT
str(workflow_list[0].get("submission")), CROMWELL_DATETIME_FORMAT
)
tail = datetime.strptime(
str(workflow_list[-1].get('submission')), CROMWELL_DATETIME_FORMAT
str(workflow_list[-1].get("submission")), CROMWELL_DATETIME_FORMAT
)
return head <= tail
except ValueError:
logger.error(
'Queue | An error happened when try to parse the submission timestamps, will assume oldest first '
'for'
' the workflows returned from Cromwell | {0}'.format(datetime.now())
"Queue | An error happened when try to parse the submission timestamps, will assume oldest first "
"for"
" the workflows returned from Cromwell | {0}".format(datetime.now())
)
return True

Expand All @@ -381,3 +383,34 @@ def deep_deduplicate():
This deep de-duplication should search the given bundle-uuid and bundle-version in the whole history.
"""
return NotImplemented

@staticmethod
def report_my_status():
"""
Write a report_status.html file containing the timestamp when it ran
"""
try:
# Get timestamp now
# Converting datetime object to string
dateTimeObj = datetime.now()
timestampStr = dateTimeObj.strftime("%d-%b-%Y (%H:%M:%S)")

# Update file report_status.html with time so we can get the info from a curl
f = open(settings.docRootPath + settings.docRootFile, "w")

# content of html file with timestamp
header = """<html><br><head></head><br><body><br><p>
"""
body = "Time when report my status was generated: " + timestampStr
footer = """
</p><br></body><br></html>"""

f.write("{0}{1}{2}".format(header, body, footer))
f.close()
logger.info("QueueHandler | QueueHandler report status ran successfully ")
except Exception as exc:
logger.warning(
"QueueHandler | QueueHandler report Status failed with Exception: | {0}".format(
exc
)
)
45 changes: 30 additions & 15 deletions falcon/routes.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
import json
import threading
from datetime import datetime, timedelta
import os
from flask import abort

FALCON_THREAD_NAMES = ('queueHandler', 'igniter')
from flask import render_template
from falcon.settings import docRootPath, docRootFile, MAX_DELAY


def status():
active_threads = {thread.name: thread for thread in threading.enumerate()}
active_falcon_threads = {}
for falcon_thread_name in FALCON_THREAD_NAMES:
thread = active_threads.get(falcon_thread_name)
if not thread:
abort(500)
elif not thread.is_alive():
abort(500)
"""
This function reads a status report file creation date and
Compares it to the current time
Returns: render html file or abort (HTTP code 500) if time
difference is greater than max delay
"""

try:
# Get TimeStamp
now = datetime.today()

# read status report.html modified datetime
file_mod_time = datetime.fromtimestamp(
os.stat(docRootPath + docRootFile).st_mtime
) # This is a datetime.datetime object!

# Define max delay to 5 mins
max_delay = timedelta(minutes=MAX_DELAY)

# if reached max delay abort else render status report file
if now - file_mod_time > max_delay:
abort(500, 'reached max delay')
else:
display_name = '{}-thread'.format(falcon_thread_name)
active_falcon_threads[display_name] = str(thread.ident)
return json.dumps(active_falcon_threads)
return render_template(docRootFile)

except Exception as exc:
abort(500, exc)
19 changes: 17 additions & 2 deletions falcon/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,29 @@
from falcon.queue_handler import QueueHandler
from falcon.routes import status

app = Flask(__name__)
app.add_url_rule("/health", "health", status)
app = Flask(__name__, static_url_path='')

# prevent cached responses
# TODO : Does not seem to prevent caching issue when using curl http://localhost:8000/health in the container.
@app.after_request
def add_header(r):
"""
Add headers to both force latest IE rendering engine or Chrome Frame,
removing caching
"""
r.headers["Cache-Control"] = "no-store, max-age=0"
r.headers["Pragma"] = "no-cache"
return r


# Define endpoint and assign to status function
app.add_url_rule("/health", "health", status)

config_path = os.environ.get('CONFIG_PATH')
handler = QueueHandler(config_path) # instantiate a concrete `QueueHandler`
igniter = Igniter(config_path) # instantiate a concrete `Igniter`


handler.spawn_and_start() # start the thread within the handler
igniter.spawn_and_start(
handler
Expand Down
51 changes: 30 additions & 21 deletions falcon/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
import os
from cromwell_tools.cromwell_auth import CromwellAuth

# Global variables for the report status
docRootPath = "falcon/templates/" # Path to Flask docRoot - in the repo
docRootFile = (
"handler_status.html" # file to be returned to liveness Probe - in the repo
)
MAX_DELAY: int = (
5
) # we consider if docRootFile is older than 5 mins then the Thread is Frozen


def get_settings(config_path):
"""This function loads the config.json file based on the path and return the assembled settings dictionary.
Expand All @@ -21,51 +30,51 @@ def get_settings(config_path):
workflow_start_interval (int): The sleep time between each time the igniter starts a workflow in Cromwell.
cromwell_query_dict (dict): The query used for retrieving cromwell workflows
"""
with open(config_path, 'r') as f:
with open(config_path, "r") as f:
settings = json.load(f)

# Check Cromwell url
if not settings['cromwell_url']:
raise ValueError('A Cromwell URL is required.')
if not settings["cromwell_url"]:
raise ValueError("A Cromwell URL is required.")

# Check auth parameters
if settings['use_caas']:
if not settings['collection_name']:
if settings["use_caas"]:
if not settings["collection_name"]:
raise ValueError(
'To use the Cromwell-as-a-Service, you have to pass in a valid collection name.'
"To use the Cromwell-as-a-Service, you have to pass in a valid collection name."
)

caas_key = os.environ.get('caas_key') or os.environ.get('CAAS_KEY')
caas_key = os.environ.get("caas_key") or os.environ.get("CAAS_KEY")
if not caas_key:
raise ValueError(
'No service account json key provided for cromwell-as-a-service.'
"No service account json key provided for cromwell-as-a-service."
)
else:
settings['caas_key'] = caas_key
settings["caas_key"] = caas_key

# Check other config parameters
settings['queue_update_interval'] = int(settings.get('queue_update_interval', 1))
settings['workflow_start_interval'] = int(
settings.get('workflow_start_interval', 1)
settings["queue_update_interval"] = int(settings.get("queue_update_interval", 1))
settings["workflow_start_interval"] = int(
settings.get("workflow_start_interval", 1)
)

# Check cromwell query parameters
query_dict = settings.get('cromwell_query_dict', {})
if ('status', 'On Hold') not in query_dict.items():
query_dict.update({'status': 'On Hold'})
settings['cromwell_query_dict'] = query_dict
query_dict = settings.get("cromwell_query_dict", {})
if ("status", "On Hold") not in query_dict.items():
query_dict.update({"status": "On Hold"})
settings["cromwell_query_dict"] = query_dict

return settings


def get_cromwell_auth(settings):
cromwell_url = settings.get('cromwell_url')
if settings.get('use_caas'):
cromwell_url = settings.get("cromwell_url")
if settings.get("use_caas"):
return CromwellAuth.harmonize_credentials(
url=cromwell_url, service_account_key=settings.get('caas_key')
url=cromwell_url, service_account_key=settings.get("caas_key")
)
return CromwellAuth.harmonize_credentials(
url=cromwell_url,
username=settings.get('cromwell_user'),
password=settings.get('cromwell_password'),
username=settings.get("cromwell_user"),
password=settings.get("cromwell_password"),
)
12 changes: 12 additions & 0 deletions falcon/templates/return_status.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<html>
<head></head>
<body><p> Timestamp should be Here
############### NOTES #####################
For some caching issue it might not show up
Running : Curl -I http://localhost:8000/health
Result should confirm cache headers are correct (no-cache, etc)
checking the content of the file on he disk should confirm this is a cache issue
cat /falcon/templates/handler_status.html
###########################################
</p></body>
</html>

0 comments on commit 6999d00

Please sign in to comment.