Skip to content

Commit

Permalink
metadata logging works in dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Butzi authored and sfc-gh-afedorov committed Dec 6, 2018
1 parent 26ef364 commit 8308666
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 17 deletions.
17 changes: 12 additions & 5 deletions src/runners/alert_queries_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from helpers.db import connect_and_execute, load_rules

GROUPING_CUTOFF = f"DATEADD(minute, -90, CURRENT_TIMESTAMP())"
RUN_METADATA = {'QUERY_HISTORY': []} # Contains metadata about this run
RUN_METADATA = {'QUERY_HISTORY': [], 'RUN_TYPE': 'ALERT QUERIES'} # Contains metadata about this run


def alert_group(alert) -> str:
Expand Down Expand Up @@ -73,7 +73,7 @@ def update_recent_alerts(ctx, alert_map):

def log_alerts(ctx, alerts):
if len(alerts):
print("Logging alerts...")
print("Recording alerts.")
format_string = ", ".join(["(%s)"] * len(alerts))
try:
ctx.cursor().execute((
Expand Down Expand Up @@ -143,6 +143,7 @@ def snowalert_query(query_name: str):
if attempt > 1:
log_failure(ctx, query_name, e)
log.metadata_fill(metadata, status='failure', exception=e)
RUN_METADATA['QUERY_HISTORY'].append(metadata)
pass
else:
log.info(f"Query {query_name} failed to run, retrying...")
Expand Down Expand Up @@ -176,15 +177,20 @@ def query_for_alerts(query_name: str):


def record_metadata(ctx, metadata):
metadata['RUN_START_TIME'] = str(metadata['RUN_START_TIME']) # We wantd them to be objects for mathing
metadata['RUN_END_TIME'] = str(metadata['RUN_END_TIME']) # then convert to string for json serializing
metadata['RUN_DURATION'] = str(metadata['RUN_DURATION'])

statement = f'''
INSERT INTO {METADATA_TABLE}
(event_time, v) select {metadata['RUN_START_TIME']},
PARSE_JSON(column1) from values({json.dumps(metadata)}))
(event_time, v) select '{metadata['RUN_START_TIME']}',
PARSE_JSON(column1) from values('{json.dumps(metadata)}')
'''
try:
log.info("Recording run metadata...")
ctx.cursor().execute(statement)
except Exception as e:
log.fatal("Metadata failed to log")
log.fatal("Metadata failed to log", e)
log_failure(ctx, "Metadata Logging", e, event_data=metadata, description="The run metadata failed to log")


Expand All @@ -197,6 +203,7 @@ def main():

RUN_METADATA['RUN_END_TIME'] = datetime.datetime.utcnow()
RUN_METADATA['RUN_DURATION'] = RUN_METADATA['RUN_END_TIME'] - RUN_METADATA['RUN_START_TIME']
record_metadata(ctx, RUN_METADATA)

if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Query Runner'}], 1)
Expand Down
49 changes: 44 additions & 5 deletions src/runners/alert_suppressions_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import datetime
from typing import List

from config import ALERTS_TABLE, RULES_SCHEMA, ALERT_SQUELCH_POSTFIX, CLOUDWATCH_METRICS
from config import ALERTS_TABLE, METADATA_TABLE, RULES_SCHEMA, ALERT_SQUELCH_POSTFIX, CLOUDWATCH_METRICS
from helpers import log
from helpers.db import connect, load_rules

RUN_METADATA = {'QUERY_HISTORY': [], 'RUN_TYPE': 'ALERT SUPPRESSIONS'} # Contains metadata about this run


def log_alerts(ctx, alerts):
"""We don't usually log alerts in the suppression runner, but we want the runner to create an alert if a
Expand All @@ -33,7 +35,12 @@ def log_alerts(ctx, alerts):
print("No alerts to log.")


def log_failure(ctx, suppression_name, e):
def log_failure(ctx, suppression_name, e, event_data=None, description=None):
if event_data is None:
event_data = f"The suppression '{suppression_name}' failed to execute with error: {e}"

if description is None:
description = f"The suppression '{suppression_name}' failed to execute with error: {e}"
alert = {}
alert['ALERT_ID'] = uuid.uuid4().hex
alert['QUERY_ID'] = 'b1d02051dd2c4d62bb75274f2ee5996a'
Expand All @@ -46,9 +53,9 @@ def log_failure(ctx, suppression_name, e):
alert['TITLE'] = 'Suppression Runner Failure'
alert['EVENT_TIME'] = str(datetime.datetime.utcnow())
alert['ALERT_TIME'] = str(datetime.datetime.utcnow())
alert['DESCRIPTION'] = f"The suppression '{suppression_name}' failed to execute with error: {e}"
alert['DESCRIPTION'] = description
alert['DETECTOR'] = 'Suppression Runner'
alert['EVENT_DATA'] = f"The suppression '{suppression_name}' failed to execute with error: {e}"
alert['EVENT_DATA'] = event_data
alert['SEVERITY'] = 'High'
alerts = []
alerts.append(json.dumps(alert))
Expand All @@ -74,15 +81,25 @@ def do_suppression(suppression_name, ctx):

def run_suppressions(squelch_name):
print(f"Received suppression {squelch_name}")
metadata = {}
metadata['NAME'] = squelch_name

ctx = connect()

metadata['START_TIME'] = datetime.datetime.utcnow()

try:
do_suppression(squelch_name, ctx)
except Exception as e:
log_failure(ctx, squelch_name, e)
log.metadata_fill(metadata, status='failure', rows=0)
RUN_METADATA['QUERY_HISTORY'].append(metadata)
pass

print(f"Suppression query {squelch_name} executed")
log.metadata_fill(metadata, status='success', rows=ctx.cursor().rowcount)
RUN_METADATA['QUERY_HISTORY'].append(metadata)

print(f"Suppression query {squelch_name} executed. ")


def flag_remaining_alerts(ctx) -> List[str]:
Expand All @@ -95,11 +112,33 @@ def flag_remaining_alerts(ctx) -> List[str]:
return [name[1] for name in suppression_view_list]


def record_metadata(ctx, metadata):
metadata['RUN_START_TIME'] = str(metadata['RUN_START_TIME']) # We wantd them to be objects for mathing
metadata['RUN_END_TIME'] = str(metadata['RUN_END_TIME']) # then convert to string for json serializing
metadata['RUN_DURATION'] = str(metadata['RUN_DURATION'])

statement = f'''
INSERT INTO {METADATA_TABLE}
(event_time, v) select '{metadata['RUN_START_TIME']}',
PARSE_JSON(column1) from values('{json.dumps(metadata)}')
'''
try:
log.info("Recording run metadata...")
ctx.cursor().execute(statement)
except Exception as e:
log.fatal("Metadata failed to log", e)
log_failure(ctx, "Metadata Logging", e, event_data=metadata, description="The run metadata failed to log")


def main():
RUN_METADATA['RUN_START_TIME'] = datetime.datetime.utcnow()
ctx = connect()
for squelch_name in load_rules(ctx, ALERT_SQUELCH_POSTFIX):
run_suppressions(squelch_name)
flag_remaining_alerts(ctx)
RUN_METADATA['RUN_END_TIME'] = datetime.datetime.utcnow()
RUN_METADATA['RUN_DURATION'] = RUN_METADATA['RUN_END_TIME'] - RUN_METADATA['RUN_START_TIME']
record_metadata(ctx, RUN_METADATA)

if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Suppression Runner'}], 1)
Expand Down
5 changes: 4 additions & 1 deletion src/runners/helpers/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ def metric(metric, namespace, dimensions, value):

def metadata_fill(metadata, status, rows=0, exception=None):
metadata['END_TIME'] = datetime.datetime.utcnow()
metadata['RUN_TIME'] = metadata['END_TIME'] - metadata['RUN_TIME']
metadata['RUN_TIME'] = metadata['END_TIME'] - metadata['START_TIME']
metadata['ROWS'] = rows
metadata['STATUS'] = status
metadata['EXCEPTION'] = exception
metadata['START_TIME'] = str(metadata['START_TIME']) # This is mildly gross, but we record them as
metadata['END_TIME'] = str(metadata['END_TIME']) # datetime objects so we can do math on them, then
metadata['RUN_TIME'] = str(metadata['RUN_TIME']) # convert to string so we can json serialize.
37 changes: 34 additions & 3 deletions src/runners/violation_queries_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!/usr/bin/env python

import datetime
import json
import os

from config import VIOLATIONS_TABLE, RULES_SCHEMA, VIOLATION_QUERY_POSTFIX, CLOUDWATCH_METRICS
from config import VIOLATIONS_TABLE, METADATA_TABLE, RULES_SCHEMA, VIOLATION_QUERY_POSTFIX, CLOUDWATCH_METRICS
from helpers.db import connect_and_fetchall, connect_and_execute, load_rules
from helpers import log

RUN_METADATA = {'QUERY_HISTORY': [], 'RUN_TYPE': 'VIOLATION QUERIES'} # Contains metadata about this run


def log_alerts(ctx, alerts):
output_column = os.environ.get('output_column', 'result')
Expand Down Expand Up @@ -39,24 +42,52 @@ def snowalert_query(query_name):
return results, ctx


def process_results(results, ctx, query_name):
def process_results(results, ctx, query_name, metadata):
alerts = []
for res in results:
jres = json.loads(res[0])
alerts.append(json.dumps(jres))
log.metadata_fill(metadata, status='success', rows=ctx.cursor().rowcount)
log_alerts(ctx, alerts)


def run_query(query_name):
metadata = {}
metadata['NAME'] = query_name
metadata['START_TIME'] = datetime.datetime.utcnow()
results, ctx = snowalert_query(query_name)
process_results(results, ctx, query_name)
process_results(results, ctx, query_name, metadata)
RUN_METADATA['QUERY_HISTORY'].append(metadata)


def record_metadata(ctx, metadata):
metadata['RUN_START_TIME'] = str(metadata['RUN_START_TIME']) # We wantd them to be objects for mathing
metadata['RUN_END_TIME'] = str(metadata['RUN_END_TIME']) # then convert to string for json serializing
metadata['RUN_DURATION'] = str(metadata['RUN_DURATION'])

statement = f'''
INSERT INTO {METADATA_TABLE}
(event_time, v) select '{metadata['RUN_START_TIME']}',
PARSE_JSON(column1) from values('{json.dumps(metadata)}')
'''
try:
log.info("Recording run metadata...")
ctx.cursor().execute(statement)
except Exception as e:
log.fatal("Metadata failed to log", e)
# log_failure(ctx, "Metadata Logging", e, event_data=metadata, description="The run metadata failed to log")


def main():
# Force warehouse resume so query runner doesn't have a bunch of queries waiting for warehouse resume
RUN_METADATA['RUN_START_TIME'] = datetime.datetime.utcnow()
ctx = connect_and_execute("ALTER SESSION SET use_cached_result=FALSE;")
for query_name in load_rules(ctx, VIOLATION_QUERY_POSTFIX):
run_query(query_name)

RUN_METADATA['RUN_END_TIME'] = datetime.datetime.utcnow()
RUN_METADATA['RUN_DURATION'] = RUN_METADATA['RUN_END_TIME'] - RUN_METADATA['RUN_START_TIME']
record_metadata(ctx, RUN_METADATA)
if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Violation Query Runner'}], 1)

Expand Down
36 changes: 35 additions & 1 deletion src/runners/violation_suppressions_runner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#!/usr/bin/env python

from config import VIOLATIONS_TABLE, RULES_SCHEMA, VIOLATION_SQUELCH_POSTFIX, CLOUDWATCH_METRICS
import datetime
from config import VIOLATIONS_TABLE, METADATA_TABLE, RULES_SCHEMA, VIOLATION_SQUELCH_POSTFIX, CLOUDWATCH_METRICS
from helpers import log
from helpers.db import connect, load_rules
import json

RUN_METADATA = {'QUERY_HISTORY': [], 'RUN_TYPE': 'VIOLATION SUPPRESSIONS'} # Contains metadata about this run


def flag_remaining_alerts(ctx):
Expand All @@ -13,6 +17,9 @@ def flag_remaining_alerts(ctx):


def run_suppression(squelch_name):
metadata = {}
metadata['NAME'] = squelch_name
metadata['START_TIME'] = datetime.datetime.utcnow()
ctx = connect()
print(f"Received suppression {squelch_name}")
try:
Expand All @@ -24,17 +31,44 @@ def run_suppression(squelch_name):
SET t.suppressed='true', t.suppression_rule='{squelch_name}';
""")
except Exception as e:
log.metadata_fill(metadata, status='failure', rows=0)
RUN_METADATA['QUERY_HISTORY'].append(metadata)
log.fatal("Suppression query {squelch_name} execution failed.", e)
pass

print(f"Suppression query {squelch_name} executed")
log.metadata_fill(metadata, status='success', rows=ctx.cursor().rowcount)
RUN_METADATA['QUERY_HISTORY'].append(metadata)


def record_metadata(ctx, metadata):
metadata['RUN_START_TIME'] = str(metadata['RUN_START_TIME']) # We wantd them to be objects for mathing
metadata['RUN_END_TIME'] = str(metadata['RUN_END_TIME']) # then convert to string for json serializing
metadata['RUN_DURATION'] = str(metadata['RUN_DURATION'])

statement = f'''
INSERT INTO {METADATA_TABLE}
(event_time, v) select '{metadata['RUN_START_TIME']}',
PARSE_JSON(column1) from values('{json.dumps(metadata)}')
'''
try:
log.info("Recording run metadata...")
ctx.cursor().execute(statement)
except Exception as e:
log.fatal("Metadata failed to log", e)
# log_failure(ctx, "Metadata Logging", e, event_data=metadata, description="The run metadata failed to log")


def main():
RUN_METADATA['RUN_START_TIME'] = datetime.datetime.utcnow()
ctx = connect()
for squelch_name in load_rules(ctx, VIOLATION_SQUELCH_POSTFIX):
run_suppression(squelch_name)
flag_remaining_alerts(ctx)

RUN_METADATA['RUN_END_TIME'] = datetime.datetime.utcnow()
RUN_METADATA['RUN_DURATION'] = RUN_METADATA['RUN_END_TIME'] - RUN_METADATA['RUN_START_TIME']
record_metadata(ctx, RUN_METADATA)
if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Violation Suppression Runner'}], 1)

Expand Down
10 changes: 8 additions & 2 deletions src/scripts/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import snowflake.connector

from runners.config import DATABASE, DATA_SCHEMA, RULES_SCHEMA, RESULTS_SCHEMA
from runners.config import ALERTS_TABLE, VIOLATIONS_TABLE
from runners.config import ALERTS_TABLE, VIOLATIONS_TABLE, METADATA_TABLE
from runners.config import ALERT_QUERY_POSTFIX, ALERT_SQUELCH_POSTFIX
from runners.config import VIOLATION_QUERY_POSTFIX, VIOLATION_SQUELCH_POSTFIX
from runners.helpers import log
Expand Down Expand Up @@ -86,7 +86,13 @@
, suppressed BOOLEAN
, suppression_rule STRING DEFAULT NULL
);
"""
""",
f"""
CREATE TABLE IF NOT EXISTS {METADATA_TABLE}(
event_time TIMESTAMP_LTZ
, v VARIANT
);
"""
]

CREATE_TABLE_SUPPRESSIONS_QUERY = f"CREATE TABLE IF NOT EXISTS suppression_queries ( suppression_spec VARIANT );"
Expand Down

0 comments on commit 8308666

Please sign in to comment.