Skip to content

Commit

Permalink
Refactor GRR file downloads (#891)
Browse files Browse the repository at this point in the history
* GRR module updates

* More updates

* Linter appeasement

* linter appeasement

* Linter appeasement

* Linter appeasement

* Unittest fix
  • Loading branch information
ramo-j authored Jul 4, 2024
1 parent 0bbb055 commit 857bc35
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 263 deletions.
175 changes: 44 additions & 131 deletions dftimewolf/lib/collectors/grr_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

import datetime
import os
import pathlib
import re
import tempfile
import time
import zipfile
import stat
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Tuple, Type

Expand Down Expand Up @@ -245,61 +244,6 @@ def _LaunchFlow(self, client: Client, name: str, args: str) -> str:

return flow_id

# TODO: change object to more specific GRR type information.
def _AwaitFlow(self, client: Client, flow_id: str) -> None:
"""Waits for a specific GRR flow to complete.
Args:
client (object): GRR Client object in which to await the flow.
flow_id (str): GRR identifier of the flow to await.
Raises:
DFTimewolfError: If a Flow error was encountered.
"""
self.logger.info(f'{flow_id:s}: Waiting to finish')
if self.skip_offline_clients:
self.logger.debug("Client will be skipped if offline.")

while True:
try:
status = client.Flow(flow_id).Get().data
except grr_errors.UnknownError:
msg = (f'Unknown error retrieving flow {flow_id} for host '
f'{client.data.os_info.fqdn.lower()}')
self.ModuleError(msg, critical=True)

if status.state == flows_pb2.FlowContext.ERROR:
# TODO(jbn): If one artifact fails, what happens? Test.
message = status.context.backtrace
if 'ArtifactNotRegisteredError' in status.context.backtrace:
message = status.context.backtrace.split('\n')[-2]
self.ModuleError(
f'{flow_id:s}: FAILED! Message from GRR:\n{message:s}',
critical=True)

if status.state == 4: # Flow crashed, no enum in flows_pb2
self.ModuleError(f'{flow_id:s}: Crashed', critical=False)
break

if status.state == flows_pb2.FlowContext.TERMINATED:
self.logger.info(f'{flow_id:s}: Complete')
break

time.sleep(self._CHECK_FLOW_INTERVAL_SEC)
if not self.skip_offline_clients:
continue

client_last_seen = datetime.datetime.fromtimestamp(
client.data.last_seen_at / 1000000, datetime.timezone.utc)
now = datetime.datetime.now(datetime.timezone.utc)
if (now - client_last_seen).total_seconds() > self._MAX_OFFLINE_TIME_SEC:
self.logger.warning(
'Client {0:s} has been offline for more than {1:.1f} minutes'
', skipping...'.format(
client.client_id, self._MAX_OFFLINE_TIME_SEC / 60))
self._skipped_flows.append((client.client_id, flow_id))
break

def _CheckSkippedFlows(self) -> None:
if not self._skipped_flows:
return
Expand All @@ -317,51 +261,49 @@ def _CheckSkippedFlows(self) -> None:
client_id, flow_id, self.reason
))

def _DownloadArchive(
self, grr_flow: Client.Flow, flow_output_dir: str
) -> None:
"""Request an archive of files from GRR, download and extract it.
This does not work on larger files, use _DownloadBlobs instead.
Args:
grr_flow: GRR Flow object to download files from.
flow_output_dir: Directory to extract the downloaded files.
"""
output_file_path = os.path.join(self.output_path, f"{grr_flow.flow_id}.zip")
file_archive = grr_flow.GetFilesArchive()
file_archive.WriteToFile(output_file_path)
with zipfile.ZipFile(output_file_path) as archive:
archive.extractall(path=flow_output_dir)
os.remove(output_file_path)

def _DownloadBlobs(
self,
client: Client,
pathspecs: List["jobs_pb2.PathSpec"],
payloads: List["jobs_pb2.PathSpec"],
flow_output_dir: str,
) -> None:
"""Download an individual collected file from GRR to the local filesystem.
"""Download individual collected files from GRR to the local filesystem.
Args:
client: GRR Client object to download blobs from.
pathspecs: List of pathspecs to download blobs from.
payloads: List of pathspecs to download blobs from.
flow_output_dir: Directory to store the downloaded files.
"""
for pathspec in pathspecs:
if pathspec.nested_path.pathtype == pathspec.nested_path.NTFS:
vfspath = f"fs/ntfs{pathspec.path}{pathspec.nested_path.path}"
for payload in payloads:
if hasattr(payload, 'stat'):
stats = payload.stat
elif hasattr(payload, 'stat_entry'):
stats = payload.stat_entry
else:
raise RuntimeError('Unsupported file collection attempted')
if stat.S_ISDIR(stats.st_mode):
continue
if (stats.pathspec.nested_path.pathtype ==
jobs_pb2.PathSpec.NTFS):
vfspath = (
f"fs/ntfs{stats.pathspec.path}{stats.pathspec.nested_path.path}")
else:
vfspath = re.sub("^([a-zA-Z]:)?/(.*)$", "fs/os/\\1/\\2", pathspec.path)
vfspath = re.sub("^([a-zA-Z]:)?/(.*)$", "fs/os/\\1/\\2",
stats.pathspec.path)
filename = os.path.basename(vfspath)
base_dir = os.path.join(flow_output_dir, os.path.dirname(vfspath))
os.makedirs(base_dir, exist_ok=True)

f = client.File(vfspath)
self.logger.debug(f"Downloading blob {filename} from {vfspath}")
try:
with open(os.path.join(base_dir, filename), "wb") as out:
f.GetBlobWithOffset(0).WriteToStream(out)
path = os.path.join(base_dir, filename)
if stats.st_size:
with open(path, "wb") as out:
self.logger.debug(f'File: {filename}')
f.GetBlob().WriteToStream(out)
else:
pathlib.Path(path).touch()
except grr_errors.ResourceNotFoundError as e:
self.logger.warning(
f"Failed to download blob {filename} from {vfspath}: {e}"
Expand Down Expand Up @@ -405,51 +347,22 @@ def _DownloadFiles(self, client: Client, flow_id: str) -> Optional[str]:
Returns:
str: path containing the downloaded files.
"""
grr_flow = client.Flow(flow_id)
flow_handle = client.Flow(flow_id).Get()

fqdn = client.data.os_info.fqdn.lower()
flow_output_dir = os.path.join(self.output_path, fqdn, flow_id)
os.makedirs(flow_output_dir, exist_ok=True)

flow_name = grr_flow.Get().data.name
flow_name = flow_handle.data.name
if flow_name == "TimelineFlow":
self.logger.debug("Downloading timeline from GRR")
self._DownloadTimeline(client, grr_flow, flow_output_dir)
self._DownloadTimeline(client, flow_handle, flow_output_dir)
return flow_output_dir

results = grr_flow.ListResults()
pathspecs = []
large_files = []
collect_browser_flow = False
for result in results:
stat_entry = result.payload
if flow_name == "CollectBrowserHistory":
stat_entry = result.payload.stat_entry
collect_browser_flow = True
if stat_entry.st_size > self._LARGE_FILE_SIZE_THRESHOLD:
size_gb = stat_entry.st_size / 1024 / 1024 / 1024
self.logger.warning(
"Large file detected:"
f" {stat_entry.pathspec.path} ({size_gb:.2f} GB)"
)
large_files.append(size_gb)
pathspecs.append(stat_entry.pathspec)

if large_files:
self.logger.warning(
f'Large files detected ({", ".join(large_files)} GB), downloading'
' blobs instead of archive.'
)
self._DownloadBlobs(client, pathspecs, flow_output_dir)
elif collect_browser_flow:
self.logger.debug(
"CollectBrowserHistory flow detected, downloading blobs instead of"
" archive..."
)
self._DownloadBlobs(client, pathspecs, flow_output_dir)
else:
self.logger.debug("Downloading file archive from GRR")
self._DownloadArchive(grr_flow, flow_output_dir)
payloads = []
for r in flow_handle.ListResults():
payloads.append(r.payload)
self._DownloadBlobs(client, payloads, flow_output_dir)

return flow_output_dir

Expand Down Expand Up @@ -617,10 +530,10 @@ def Process(self, container: containers.Host
self.logger.info(
f'Launched flow {flow_id} on {client.client_id} ({grr_hostname})')

self._AwaitFlow(client, flow_id)
grr_flow = client.Flow(flow_id)
grr_flow.WaitUntilDone()

# Get latest flow data from GRR server.
grr_flow = client.Flow(flow_id).Get()
grr_flow = grr_flow.Get()
results = list(grr_flow.ListResults())
yara_hits_df = self._YaraHitsToDataFrame(client, results)

Expand Down Expand Up @@ -870,7 +783,8 @@ def Process(self, container: containers.Host
msg = f'Flow could not be launched on {client.client_id:s}.'
msg += f'\nArtifactCollectorFlow args: {flow_args!s}'
self.ModuleError(msg, critical=True)
self._AwaitFlow(client, flow_id)
client.Flow(flow_id).WaitUntilDone()

collected_flow_data = self._DownloadFiles(client, flow_id)

if collected_flow_data:
Expand Down Expand Up @@ -1001,7 +915,7 @@ def Process(self, container: containers.Host
pathtype=path_type,
action=flow_action)
flow_id = self._LaunchFlow(client, 'FileFinder', flow_args)
self._AwaitFlow(client, flow_id)
client.Flow(flow_id).WaitUntilDone()
collected_flow_data = self._DownloadFiles(client, flow_id)
if collected_flow_data:
self.PublishMessage(f'{flow_id}: Downloaded: {collected_flow_data}')
Expand Down Expand Up @@ -1165,7 +1079,7 @@ def _ProcessQuery(

try:
flow_id = self._LaunchFlow(client, 'OsqueryFlow', flow_args)
self._AwaitFlow(client, flow_id)
client.Flow(flow_id).WaitUntilDone()
except DFTimewolfError as error:
self.ModuleError(
f'Error raised while launching/awaiting flow: {error.message}')
Expand Down Expand Up @@ -1356,7 +1270,7 @@ def Process(self, container: containers.GrrFlow
# We don't need clients to be online to grab the flows.
client = self._GetClientBySelector(
container.hostname, discard_inactive=False)
self._AwaitFlow(client, container.flow_id)
client.Flow(container.flow_id).WaitUntilDone()
self._CheckSkippedFlows()
collected_flow_data = self._DownloadFiles(client, container.flow_id)
if collected_flow_data:
Expand Down Expand Up @@ -1471,10 +1385,9 @@ def Process(self, container: containers.Host

timeline_args = timeline_pb2.TimelineArgs(root=root_path,)
flow_id = self._LaunchFlow(client, 'TimelineFlow', timeline_args)
self._AwaitFlow(client, flow_id)
temp_directory = tempfile.mkdtemp()
client.Flow(flow_id).WaitUntilDone()
collected_timeline = self._DownloadTimeline(
client, client.Flow(flow_id), temp_directory
client, client.Flow(flow_id), self.output_path
)
self.PublishMessage(f"{flow_id}: Downloaded: {collected_timeline}")
cont = containers.File(
Expand Down
15 changes: 7 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 857bc35

Please sign in to comment.