Skip to content

Commit

Permalink
Merge pull request #77 from Baza-86/Release-0.0.6
Browse files Browse the repository at this point in the history
Release 0.0.6
  • Loading branch information
kamerons15 authored Sep 27, 2023
2 parents 961bc7a + 6322b84 commit d1f54ab
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 61 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.venv*
snippits.py
vpc_samples/
vpc_samples/
*pyc
Binary file not shown.
30 changes: 30 additions & 0 deletions lambda_functions/PayloadCreator/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from datetime import datetime, timedelta, date

def lambda_handler(event, context):
print(event.keys())
if 'generatedDate' in event.keys():
outputFileArray = event['outputFileArray']
outputCsv = event['outputCsv']
outputFileArray.append(outputCsv)
outputRows = event['outputRows']

queryParams = event['queryParams']

queryParams['queryOffset'] += queryParams['queryLimit']

payload_dict = {
'queryParams': queryParams,
'outputFileArray': outputFileArray,
'outputRows': outputRows
}
else:
print("date to be added to output")
date_yst = str(date.today() - timedelta(1))
payload_dict = {
'date': date_yst,
'day': date_yst.split("-")[2],
'month': date_yst.split("-")[1],
'year': date_yst.split("-")[0]
}

return payload_dict
127 changes: 96 additions & 31 deletions scripts/flow_logs_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from hashlib import sha1
from functools import lru_cache
from copy import deepcopy
import functools
import statistics

args = getResolvedOptions(sys.argv,
[
Expand All @@ -24,7 +26,8 @@
'DynamoTableName',
'SGARulesUseIndex',
'SGSortTableName',
'path'
'path',
'queryCsv'
])

s3 = boto3.resource('s3', args['region'])
Expand All @@ -38,10 +41,32 @@
sg_analysis_rules_use_idx= args["SGARulesUseIndex"]
sg_sort_table= args["SGSortTableName"]
athena_s3_prefix = args['path']
query_csv = args['outputCsv']
date_yst = (date.today() - timedelta(1))

my_bucket = s3.Bucket(flow_logs_athena_results_bucket)

sg_rule_id_query_results = []
get_sg_ref_ips_results = []
rule_matcher_results = []
security_group_rule_parser_results = []
get_sg_rule_id_results = []
get_interface_ddb_results = []

def timer(timer_results):
def timer_decorator(func):
@functools.wraps(func)
def wrapper_timer(*args, **kwargs):
tic = time.perf_counter()
value = func(*args, **kwargs)
toc = time.perf_counter()
elapsed_time = toc - tic
timer_results.append(elapsed_time)
print(f"Elapsed time: {elapsed_time:0.4f} seconds")
return value
return wrapper_timer
return timer_decorator

def network_test(rule_block,flow_addr):
net = IPv4Network(rule_block)
addr = IPv4Address(flow_addr)
Expand Down Expand Up @@ -71,6 +96,7 @@ def rule_filter(resp_list):
cidr_rules = [r for r in resp_list if r['properties'].get('CidrIpv4')]
return (ref_rules,cidr_rules)

@timer(timer_results=get_sg_ref_ips_results)
@lru_cache(maxsize=32)
def get_sg_ref_ips(sg_id):
deserialize = TypeDeserializer()
Expand All @@ -96,7 +122,7 @@ def port_test(rule_port_from,rule_port_to,flow_port):
return True
else:
return False

@timer(timer_results=rule_matcher_results)
def rule_matcher(resp_list,flow):
[r.update({'match_score':1}) for r in resp_list]
if len(resp_list) == 1:
Expand All @@ -120,42 +146,57 @@ def rule_matcher(resp_list,flow):

return max_score_list

def get_sg_rule_id(sg_id, flow_count, protocol, flow_dir, addr, dstport):

@timer(timer_results=sg_rule_id_query_results)
@lru_cache(maxsize=2048)
def get_sg_rule_id_dynamo_query(sg_id):
response=dynamodb.query(
TableName=sg_rules_tbl_name,
IndexName=sg_rules_group_idx,
KeyConditions={
"group_id":{
'ComparisonOperator': 'EQ',
'AttributeValueList': [ {"S": sg_id} ]
}
}
)

return response

@timer(timer_results=security_group_rule_parser_results)
def security_group_rule_parser(response, flow_dir):
deserializer = TypeDeserializer()
if flow_dir == 'egress':
resp_list = [{k: deserializer.deserialize(v) for k, v in r.items()} for r in response['Items'] if r['properties']['M']['IsEgress']['BOOL'] == True]
else:
resp_list = [{k: deserializer.deserialize(v) for k, v in r.items()} for r in response['Items'] if r['properties']['M']['IsEgress']['BOOL'] == False]
return resp_list

@timer(timer_results=get_sg_rule_id_results)
def get_sg_rule_id(sg_id, flow_count, protocol, flow_dir, addr, dstport):
try:

response=dynamodb.query(
TableName=sg_rules_tbl_name,
IndexName=sg_rules_group_idx,
KeyConditions={
"group_id":{
'ComparisonOperator': 'EQ',
'AttributeValueList': [ {"S": sg_id} ]
}
}
)
response = get_sg_rule_id_dynamo_query(sg_id)

flow_object = {
'flow_count': flow_count,
'addr': addr,
'port': dstport,
'protocol': protocol,
}
if flow_dir == 'egress':
resp_list = [{k: deserializer.deserialize(v) for k, v in r.items()} for r in response['Items'] if r['properties']['M']['IsEgress']['BOOL'] == True]
else:
resp_list = [{k: deserializer.deserialize(v) for k, v in r.items()} for r in response['Items'] if r['properties']['M']['IsEgress']['BOOL'] == False]

try:
result = rule_matcher(resp_list,flow_object)[0]
print(f"rule found for flow: sg_rule_id={result['id']},sg_id={result['group_id']},flow_dir={flow_dir},protocol={flow_object['protocol']},addr={flow_object['addr']},dstport={flow_object['port']}")
insert_usage_data(sg_rule_id=result['id'],sg_id=result['group_id'],flow_dir=flow_dir,**flow_object)
except Exception as e:
print(f'no rule found for flow:{flow_object} - {flow_dir}')
print(f'error: {e}')
# raise e

except Exception as e:
print("There was an error while trying to perform DynamoDB get operation on Rules table: "+str(e))

resp_list = security_group_rule_parser(response, flow_dir)

try:
result = rule_matcher(resp_list,flow_object)[0]
print(f"rule found for flow: sg_rule_id={result['id']},sg_id={result['group_id']},flow_dir={flow_dir},protocol={flow_object['protocol']},addr={flow_object['addr']},dstport={flow_object['port']}")
insert_usage_data(sg_rule_id=result['id'],sg_id=result['group_id'],flow_dir=flow_dir,**flow_object)
except Exception as e:
print(f'no rule found for flow:{flow_object} - {flow_dir}')
print(f'error: {e}')
# raise e


def insert_usage_data(sg_rule_id, sg_id, flow_dir, flow_count, addr, port, protocol):
addr_rule_hash = [sg_rule_id,addr,port,protocol]
Expand Down Expand Up @@ -198,6 +239,9 @@ def insert_usage_data(sg_rule_id, sg_id, flow_dir, flow_count, addr, port, proto
print("There was an error while trying to perform DynamoDB insert operation on Usage table: "+str(e))
# raise e


@timer(timer_results=get_interface_ddb_results)
@lru_cache(maxsize=1024)
def get_interface_ddb(id:str) -> dict:
deserialize = TypeDeserializer()
response = dynamodb.get_item(
Expand All @@ -211,11 +255,13 @@ def get_interface_ddb(id:str) -> dict:
print (f'nic id: {id} not found!')




def main():
s3_folder_path = f's3://{flow_logs_athena_results_bucket}/{athena_s3_prefix}/{date_yst.isoformat().replace("-","/")}/'
query_csv_path = f's3://{flow_logs_athena_results_bucket}/{athena_s3_prefix}/{date_yst.isoformat().replace("-","/")}/{query_csv}'
start = time.time()
print("Writing rules data to DynamoDB table- started at: "+str(datetime.now()))
dfs = wr.s3.read_csv(path=s3_folder_path, chunksize=1000, encoding = 'ISO-8859-1')
dfs = wr.s3.read_csv(path=query_csv_path, chunksize=1000, encoding = 'ISO-8859-1')
for df in dfs:
try:
df_row_count = len(df) - 1
Expand All @@ -224,7 +270,7 @@ def main():
print(f'processing row {index} of {df_row_count}')
if row is not None and 'dstport' in row:
nw_int_info = get_interface_ddb(id=row['interface_id'])

for grp in nw_int_info['security_group_ids']:
print(grp, row['flow_count'], row['protocol'],row['flow_direction'],row['addr'],row['dstport'])
get_sg_rule_id(grp, row['flow_count'], row['protocol'],row['flow_direction'],row['addr'],row['dstport'])
Expand All @@ -234,6 +280,25 @@ def main():
print(f'error: {e}')
# raise e

#print(f'sg_rule_id_query cache stats')
#print(get_sg_rule_id_dynamo_query.cache_info())
print(f'quantiles for sg_rule_id_query: {statistics.quantiles(sg_rule_id_query_results)}')

#print(f'get_sg_ref_ips cache stats')
#print(get_sg_ref_ips.cache_info())
print(f'quantiles for get_sg_ref_ips: {statistics.quantiles(get_sg_ref_ips_results)}')

print(f'quantiles for rule_matcher: {statistics.quantiles(rule_matcher_results)}')

#print(f'security_group_rule_parser cache stats')
#print(security_group_rule_parser.cache_info())
print(f'quantiles for security_group_rule_parser: {statistics.quantiles(security_group_rule_parser_results)}')

print(f'quantiles for get_sg_rule_id: {statistics.quantiles(get_sg_rule_id_results)}')

print(f'quantiles for get_interface_ddb: {statistics.quantiles(get_interface_ddb_results)}')


print("Writing rules data to DynamoDB table- completed at: "+str(datetime.now()))
end = time.time()
print("Total time taken in minutes: "+str((end - start)/60))
Expand Down
4 changes: 2 additions & 2 deletions scripts/query_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
date_yst = (date.today() - timedelta(1))
if date_yst.day > 9:
params['query'] = f"""
select count("interface_id") as flow_count, interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr") as addr, dstport FROM {params['database']}.\"{params['table']}\" WHERE dstport is not null and day='{date_yst.day}' and action='ACCEPT' group by interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr"), dstport, dstport having count(interface_id) > 5 order by flow_count desc limit 60000
select count("interface_id") as flow_count, interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr") as addr, dstport FROM {params['database']}.\"{params['table']}\" WHERE dstport is not null and day='{date_yst.day}' and action='ACCEPT' group by interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr"), dstport, dstport having count(interface_id) > 5 order by interface_id, flow_count desc limit 10000
"""
else:
params['query'] = f"""
select count("interface_id") as flow_count, interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr") as addr, dstport FROM {params['database']}.\"{params['table']}\" WHERE dstport is not null and day='0{date_yst.day}' and action='ACCEPT' group by interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr"), dstport, dstport having count(interface_id) > 5 order by flow_count desc limit 60000
select count("interface_id") as flow_count, interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr") as addr, dstport FROM {params['database']}.\"{params['table']}\" WHERE dstport is not null and day='0{date_yst.day}' and action='ACCEPT' group by interface_id, protocol, flow_direction, if("flow_direction"='ingress',"srcaddr","dstaddr"), dstport, dstport having count(interface_id) > 5 order by interface_id, flow_count desc limit 10000
"""

session = boto3.Session()
Expand Down
2 changes: 1 addition & 1 deletion templates/athena_integration_account_glue_table.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Resources:
Handler: 'index.handler'
Timeout: 60
Runtime: nodejs18.x
Runtime: nodejs14.x
ReservedConcurrentExecutions: 1
Role: !GetAtt VpcFlowLogsTableIntegrationLambdaExecutorRole.Arn

Expand Down
Loading

0 comments on commit d1f54ab

Please sign in to comment.