Skip to content

Commit

Permalink
Update Crowdstrike pagination when more then 10000 hosts (#157)
Browse files Browse the repository at this point in the history
* Update Crowdstrike pagination when more then 10000 hosts

Signed-off-by: Eric Larsen <[email protected]>

* Adding additional debug logging to connector

Signed-off-by: Eric Larsen <[email protected]>

---------

Signed-off-by: Eric Larsen <[email protected]>
Co-authored-by: Eric Larsen <[email protected]>
Co-authored-by: DerekRushton <[email protected]>
  • Loading branch information
3 people authored Jun 28, 2024
1 parent 14824a2 commit 57c0f1d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 6 deletions.
10 changes: 10 additions & 0 deletions connectors/crowdstrike/connector/data_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

import datetime

from car_framework.context import context
Expand Down Expand Up @@ -52,6 +53,15 @@ def group_host_sensor_apps(applications):
group_apps[agent_id].append(app)
return group_apps

def remove_duplicates(items):
"""Remove any duplicate hosts from the array of hosts"""
context().logger.debug("Removing duplicates from list with length: %s", len(items))
unique_items = []
for item in items:
if item not in unique_items:
unique_items.append(item)
context().logger.debug("Returning %s unique items", len(unique_items))
return unique_items

class DataHandler(BaseDataHandler):

Expand Down
87 changes: 81 additions & 6 deletions connectors/crowdstrike/connector/server_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from car_framework.context import context
from car_framework.util import DatasourceFailure
from connector.data_handler import epoch_to_datetime_conv, deep_get
from connector.data_handler import epoch_to_datetime_conv, deep_get, remove_duplicates
from falconpy import Discover
from falconpy import SpotlightVulnerabilities

Expand Down Expand Up @@ -42,13 +42,19 @@ def get_hosts(self, last_model_state_id=None):
returns:
hosts(list): host list
"""
context().logger.info("Beginning import of hosts from the CrowdStrike Falcon environment.")
hosts = []
try:
discover = Discover(client_id=self.client_id, client_secret=self.client_secret, base_url=self.base_url)
offset = 0
result_filter = ""
sort = "last_seen_timestamp|asc"
if last_model_state_id:
result_filter = f"last_seen_timestamp:>'{epoch_to_datetime_conv(last_model_state_id)}'"
host_lookup = discover.query_hosts(filter=result_filter, limit=DISCOVER_PAGE_SIZE)
context().logger.debug("Beginning hosts query using filter: %s", result_filter)
host_lookup = discover.query_hosts(filter=result_filter, limit=DISCOVER_PAGE_SIZE, sort=sort)
total = host_lookup['body']['meta']['pagination']['total']
context().logger.debug("Completed hosts query. Total hosts = %s", total)
is_paged_result = True
while is_paged_result:
if host_lookup["status_code"] == 200:
Expand All @@ -59,13 +65,41 @@ def get_hosts(self, last_model_state_id=None):
error_detail = host_lookup["body"]["errors"]
for err in error_detail:
raise DatasourceFailure(err["message"], err["code"])
if len(hosts) < host_lookup['body']['meta']['pagination']['total']:
offset = len(hosts)
host_lookup = discover.query_hosts(filter=result_filter, limit=DISCOVER_PAGE_SIZE, offset=offset)

if len(host_detail["body"]["resources"]) == DISCOVER_PAGE_SIZE:
'''CrowdStrike's Discover endpoint has the following limitation: limit + offset <= 10,000
For that reason we need to do additional queries to get all hosts. We do this by making
additional queries and filter on the last_seen_timestamp. This also means we have the
potential of collecting duplicate hosts. So we make sure to remove these duplicates.'''
if DISCOVER_PAGE_SIZE + offset > 10000:
offset = 0
last_added = deep_get(hosts[-1], ['last_seen_timestamp'])
last_added_filter = "last_seen_timestamp:>='" + last_added + "'"
if result_filter == "":
updated_filter = last_added_filter
else:
updated_filter = result_filter + " + " + last_added_filter

context().logger.debug("The limit + offset is greater then 10000. Making additional hosts query using filter: %s", updated_filter)
host_lookup = discover.query_hosts(filter=updated_filter, limit=DISCOVER_PAGE_SIZE, sort=sort)
context().logger.debug("Hosts query using updated filter is complete.")
else:
offset = len(hosts)
context().logger.debug("Beginning hosts query using offset of %s", offset)
host_lookup = discover.query_hosts(filter=result_filter, limit=DISCOVER_PAGE_SIZE, offset=offset, sort=sort)
context().logger.debug("Hosts query using offset complete")
else:
is_paged_result = False

if len(hosts) > total:
# Need to remove potential duplicates due to the additional grouping we do if the total number of hosts is >10,000
context().logger.info("Removing duplicate hosts added during import.")
hosts = remove_duplicates(hosts)

except Exception as ex:
raise DatasourceFailure(ex)
raise DatasourceFailure(ex)

context().logger.info("Imported %s hosts from the CrowdStrike Falcon environment.", len(hosts))
return hosts

def get_applications(self, app_filter=None, last_model_state_id=None):
Expand All @@ -76,6 +110,9 @@ def get_applications(self, app_filter=None, last_model_state_id=None):
returns:
applications(list): application list
"""
if app_filter is None:
# Adding this log message into an if statement because it can be called thousands of times adding noise to the logs when the app_filter is being used.
context().logger.info("Beginning import of applications from the CrowdStrike Falcon environment.")
applications = []
try:
discover = Discover(client_id=self.client_id, client_secret=self.client_secret, base_url=self.base_url)
Expand All @@ -86,7 +123,10 @@ def get_applications(self, app_filter=None, last_model_state_id=None):
installed_app_filter = f"last_updated_timestamp:>'{epoch_to_datetime_conv(last_model_state_id)}'"
used_app_filter = f"last_used_timestamp:>'{epoch_to_datetime_conv(last_model_state_id)}'"
result_filter = f"{installed_app_filter},{used_app_filter}"
context().logger.debug("Beginning applications query using filter: %s", result_filter)
app_lookup = discover.query_applications(filter=result_filter, limit=DISCOVER_PAGE_SIZE)
total = app_lookup['body']['meta']['pagination']['total']
context().logger.debug("Completed applications query. Total applications = %s", total)
is_paged_result = True
while is_paged_result:
if app_lookup["status_code"] == 200:
Expand All @@ -99,12 +139,19 @@ def get_applications(self, app_filter=None, last_model_state_id=None):
raise DatasourceFailure(err["message"], err["code"])
if len(applications) < app_lookup['body']['meta']['pagination']['total']:
offset = len(applications)
context().logger.debug("Beginning applications query using offset of %s", offset)
app_lookup = discover.query_applications(filter=result_filter, limit=DISCOVER_PAGE_SIZE,
offset=offset)
context().logger.debug("Applications query using offset complete")
else:
is_paged_result = False
except Exception as ex:
raise DatasourceFailure(ex)

if app_filter is None:
# Adding this log message into an if statement because it can be called thousands of times adding noise to the logs when the app_filter is being used.
context().logger.info("Imported %s applications from the CrowdStrike Falcon environment.", len(applications))

return applications

def get_accounts(self, last_model_state_id=None):
Expand All @@ -115,6 +162,7 @@ def get_accounts(self, last_model_state_id=None):
returns:
accounts(list): account list
"""
context().logger.info("Beginning import of accounts from the CrowdStrike Falcon environment.")
accounts = []
try:
discover = Discover(client_id=self.client_id, client_secret=self.client_secret, base_url=self.base_url)
Expand All @@ -123,7 +171,10 @@ def get_accounts(self, last_model_state_id=None):
result_filter = f"first_seen_timestamp:>'{epoch_to_datetime_conv(last_model_state_id)}'," \
f"last_successful_login_timestamp:>'{epoch_to_datetime_conv(last_model_state_id)}'"

context().logger.debug("Beginning accounts query using filter: %s", result_filter)
account_lookup = discover.query_accounts(filter=result_filter, limit=DISCOVER_PAGE_SIZE)
total = account_lookup['body']['meta']['pagination']['total']
context().logger.debug("Completed accounts query. Total accounts = %s", total)
is_paged_result = True
while is_paged_result:
if account_lookup["status_code"] == 200:
Expand All @@ -136,12 +187,17 @@ def get_accounts(self, last_model_state_id=None):
raise DatasourceFailure(err["message"], err["code"])
if len(accounts) < account_lookup['body']['meta']['pagination']['total']:
offset = len(accounts)
context().logger.debug("Beginning accounts query using offset of %s", offset)
account_lookup = discover.query_accounts(filter=result_filter, limit=DISCOVER_PAGE_SIZE,
offset=offset)
context().logger.debug("Accounts query using offset complete")
else:
is_paged_result = False
except Exception as ex:
raise DatasourceFailure(ex)

context().logger.info("Imported %s accounts from the CrowdStrike Falcon environment.", len(accounts))

return accounts

def get_logins(self, user_accounts):
Expand All @@ -152,14 +208,18 @@ def get_logins(self, user_accounts):
returns:
account_login(list): account login event list
"""
context().logger.info("Beginning import of user logins from the CrowdStrike Falcon environment.")
user_logins = []
try:
discover = Discover(client_id=self.client_id, client_secret=self.client_secret, base_url=self.base_url)
account_ids = [account['id'] for account in user_accounts if account]
for i in range(0, len(account_ids), 100):
res_filter = "', account_id:'".join(account_ids[i:i+100])
context().logger.debug("Beginning logins query using filter: %s", res_filter)
login_lookup = discover.query_logins(filter=f"account_id:'{res_filter}'", sort='login_timestamp.desc',
limit=DISCOVER_PAGE_SIZE)
total = login_lookup['body']['meta']['pagination']['total']
context().logger.debug("Completed logins query. Total logins = %s", total)
is_paged_result = True
login_events = []
user_ids = []
Expand All @@ -185,6 +245,9 @@ def get_logins(self, user_accounts):
is_paged_result = False
except Exception as ex:
raise DatasourceFailure(ex)

context().logger.info("Imported %s user logins from the CrowdStrike Falcon environment.", len(user_logins))

return user_logins

def get_vulnerabilities(self, last_model_state_id=None):
Expand All @@ -195,6 +258,7 @@ def get_vulnerabilities(self, last_model_state_id=None):
returns:
vulnerabilities(list): vulnerability list
"""
context().logger.info("Beginning import of vulnerability information from the CrowdStrike Falcon environment.")
vulnerabilities = []
try:
spotlight = SpotlightVulnerabilities(client_id=self.client_id, client_secret=self.client_secret,
Expand All @@ -205,8 +269,11 @@ def get_vulnerabilities(self, last_model_state_id=None):
result_filter = f"updated_timestamp:>'{epoch_to_datetime_conv(last_model_state_id)}'+" \
f"(created_timestamp:>'{epoch_to_datetime_conv(last_model_state_id)}'," \
f"status:['open', 'closed', 'reopen'])"
context().logger.debug("Beginning vulnerabilities query using filter: %s", result_filter)
vuln_lookup = spotlight.query_vulnerabilities_combined(filter=result_filter, facet=facet,
limit=SPOTLIGHT_PAGE_SIZE)
total = vuln_lookup['body']['meta']['pagination']['total']
context().logger.debug("Completed vulnerabilities query. Total vulnerabilities = %s", total)
after = 'true'
while after:
if vuln_lookup["status_code"] == 200:
Expand All @@ -218,14 +285,20 @@ def get_vulnerabilities(self, last_model_state_id=None):
raise DatasourceFailure(err["message"], err["code"])
if 'after' in vuln_lookup['body']['meta']['pagination']:
after = vuln_lookup['body']['meta']['pagination']['after']
context().logger.debug("Beginning vulnerabilities query using after of %s", after)
vuln_lookup = spotlight.query_vulnerabilities_combined(filter=result_filter, facet=facet,
limit=SPOTLIGHT_PAGE_SIZE, after=after)
context().logger.debug("Vulnerabilities query using after complete")
except Exception as ex:
raise DatasourceFailure(ex)

context().logger.info("Imported %s vulnerabilities from the CrowdStrike Falcon environment.", len(vulnerabilities))

return vulnerabilities

def get_vulnerable_applications(self, vulnerabilities, agent_application_map):
"""add application id details to the vulnerability apps sections"""
context().logger.info("Adding application details to the application vulnerabities section")
for vuln in vulnerabilities:
agent_id = vuln['aid']
for vuln_app in vuln['apps']:
Expand Down Expand Up @@ -261,3 +334,5 @@ def get_vulnerable_applications(self, vulnerabilities, agent_application_map):
for app in app_details:
app_id = app['id']
vuln_app['app_id'] = app_id

context().logger.info("Adding application details to the application vulnerabities section is complete.")

0 comments on commit 57c0f1d

Please sign in to comment.