Skip to content

Commit

Permalink
Adds basic run metadata collection
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Butzi authored and sfc-gh-afedorov committed Dec 6, 2018
1 parent 2d9a9be commit 26ef364
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 5 deletions.
4 changes: 3 additions & 1 deletion src/runners/alert_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def log_alerts(ctx, alerts):
)
except Exception as e:
log.fatal("Failed to log alert", e)
pass
else:
print("No alerts to log.")

Expand Down Expand Up @@ -56,6 +57,7 @@ def log_failure(ctx, alert, e):
ctx.cursor().execute(f"DELETE FROM {ALERTS_TABLE} where ALERT:ALERT_ID = '{alert['ALERT_ID']}';")
except Exception as e:
log.fatal("Failed to log alert creation failure", e)
pass


def get_new_alerts(connection):
Expand All @@ -80,7 +82,7 @@ def main():
try:
alert = json.loads(row[0])
except Exception as e:
log.error("Failed unexepctedly", e)
log.error("Failed unexpectedly", e)
continue
print('Creating ticket for alert', alert)

Expand Down
43 changes: 39 additions & 4 deletions src/runners/alert_queries_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import datetime
from typing import Dict, Tuple

from config import ALERTS_TABLE, RULES_SCHEMA, RESULTS_SCHEMA, ALERT_QUERY_POSTFIX, CLOUDWATCH_METRICS
from config import ALERTS_TABLE, METADATA_TABLE, RULES_SCHEMA, RESULTS_SCHEMA, ALERT_QUERY_POSTFIX, CLOUDWATCH_METRICS
from helpers import log
from helpers.db import connect_and_execute, load_rules

GROUPING_CUTOFF = f"DATEADD(minute, -90, CURRENT_TIMESTAMP())"
RUN_METADATA = {'QUERY_HISTORY': []} # Contains metadata about this run


def alert_group(alert) -> str:
Expand Down Expand Up @@ -82,11 +83,18 @@ def log_alerts(ctx, alerts):
alerts)
except Exception as e:
log.fatal("Failed to log alert", e)
pass
else:
print("No alerts to log.")


def log_failure(ctx, query_name, e):
def log_failure(ctx, query_name, e, event_data=None, description=None):
if event_data is None:
event_data = f"The query '{query_name}' failed to execute with error: {e!r}"

if description is None:
description = f"The query '{query_name}' failed to execute with error: {e!r}"

alerts = [json.dumps({
'ALERT_ID': uuid.uuid4().hex,
'QUERY_ID': '3a3d173a64ca4fcab2d13ac3e6d08522',
Expand All @@ -99,23 +107,28 @@ def log_failure(ctx, query_name, e):
'TITLE': 'Query Runner Failure',
'ALERT_TIME': str(datetime.datetime.utcnow()),
'EVENT_TIME': str(datetime.datetime.utcnow()),
'EVENT_DATA': f"The query '{query_name}' failed to execute with error: {e!r}",
'DESCRIPTION': f"The query '{query_name}' failed to execute with error: {e!r}",
'EVENT_DATA': event_data,
'DESCRIPTION': description,
'DETECTOR': 'Query Runner',
'SEVERITY': 'High'
})]
try:
log_alerts(ctx, alerts)
log.fatal("Query failure successfully logged", e)
pass
except Exception as e:
log.fatal("Failed to log query failure", e)
pass


def snowalert_query(query_name: str):
log.info(f"{query_name} processing...")
metadata = {}
metadata['NAME'] = query_name

ctx = connect_and_execute()

metadata['START_TIME'] = datetime.datetime.utcnow()
attempt = 0
while attempt <= 1:
try:
Expand All @@ -125,13 +138,18 @@ def snowalert_query(query_name: str):
WHERE event_time > {GROUPING_CUTOFF}
'''
results = ctx.cursor().execute(query).fetchall()

except Exception as e:
if attempt > 1:
log_failure(ctx, query_name, e)
log.metadata_fill(metadata, status='failure', exception=e)
pass
else:
log.info(f"Query {query_name} failed to run, retrying...")
continue

log.metadata_fill(metadata, status='success', rows=ctx.cursor().rowcount)
RUN_METADATA['QUERY_HISTORY'].append(metadata)
log.info(f"{query_name} done.")
return results, ctx

Expand All @@ -157,12 +175,29 @@ def query_for_alerts(query_name: str):
process_results(results, ctx, query_name)


def record_metadata(ctx, metadata):
statement = f'''
INSERT INTO {METADATA_TABLE}
(event_time, v) select {metadata['RUN_START_TIME']},
PARSE_JSON(column1) from values({json.dumps(metadata)}))
'''
try:
ctx.cursor().execute(statement)
except Exception as e:
log.fatal("Metadata failed to log")
log_failure(ctx, "Metadata Logging", e, event_data=metadata, description="The run metadata failed to log")


def main():
# Force warehouse resume so query runner doesn't have a bunch of queries waiting for warehouse resume
RUN_METADATA['RUN_START_TIME'] = datetime.datetime.utcnow()
ctx = connect_and_execute("ALTER SESSION SET USE_CACHED_RESULT=FALSE;")
for query_name in load_rules(ctx, ALERT_QUERY_POSTFIX):
query_for_alerts(query_name)

RUN_METADATA['RUN_END_TIME'] = datetime.datetime.utcnow()
RUN_METADATA['RUN_DURATION'] = RUN_METADATA['RUN_END_TIME'] - RUN_METADATA['RUN_START_TIME']

if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Query Runner'}], 1)

Expand Down
2 changes: 2 additions & 0 deletions src/runners/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# table names
RESULTS_ALERTS_TABLE_NAME = os.environ.get('SA_RESULTS_ALERTS_TABLE_NAME', "alerts")
RESULTS_VIOLATIONS_TABLE_NAME = os.environ.get('SA_RESULTS_VIOLATIONS_TABLE_NAME', "violations")
RESULTS_METADATA_TABLE_NAME = os.environ.get('SA_METADATA_TABLE_NAME', "metadata")

# schemas
DATA_SCHEMA = os.environ.get('SA_DATA_SCHEMA', f"{DATABASE}.{DATA_SCHEMA_NAME}")
Expand All @@ -19,6 +20,7 @@
# tables
ALERTS_TABLE = os.environ.get('SA_ALERTS_TABLE', f"{RESULTS_SCHEMA}.{RESULTS_ALERTS_TABLE_NAME}")
VIOLATIONS_TABLE = os.environ.get('SA_VIOLATIONS_TABLE', f"{RESULTS_SCHEMA}.{RESULTS_VIOLATIONS_TABLE_NAME}")
METADATA_TABLE = os.environ.get('SA_METADATA_TABLE', f"{RESULTS_SCHEMA}.{RESULTS_METADATA_TABLE_NAME}")

# misc
ALERT_QUERY_POSTFIX = "alert_query"
Expand Down
8 changes: 8 additions & 0 deletions src/runners/helpers/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ def metric(metric, namespace, dimensions, value):
'Value': value
}]
)


def metadata_fill(metadata, status, rows=0, exception=None):
metadata['END_TIME'] = datetime.datetime.utcnow()
metadata['RUN_TIME'] = metadata['END_TIME'] - metadata['RUN_TIME']
metadata['ROWS'] = rows
metadata['STATUS'] = status
metadata['EXCEPTION'] = exception

0 comments on commit 26ef364

Please sign in to comment.