Skip to content

Commit

Permalink
Merge pull request #397 from ryanohoro/rem-fixture
Browse files Browse the repository at this point in the history
Add get_remote_fixture and get_remote_fixture_archive for tests
  • Loading branch information
phutelmyer authored Oct 20, 2023
2 parents 0ca89f8 + 5809d73 commit 534c8bb
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ The table below describes each scanner and its options. Each scanner has the hid
| ScanZlib | Decompresses gzip files | N/A
## Tests
As Strelka consists of many scanners and dependencies for those scanners. Pytests are particularly valuable for testing the ongoing functionality of Strelka and it's scanners. Tests allow users to write test cases that verify the correct behavior of Strelka scanners to ensure that the scanners remain reliable and accurate. Additionally, using pytests can help streamline the development process, allowing developers to focus on writing new features and improvements for the scanners. Strelka contains a set of standard test fixture files that represent the types of files Strelka ingests.
As Strelka consists of many scanners and dependencies for those scanners. Pytests are particularly valuable for testing the ongoing functionality of Strelka and it's scanners. Tests allow users to write test cases that verify the correct behavior of Strelka scanners to ensure that the scanners remain reliable and accurate. Additionally, using pytests can help streamline the development process, allowing developers to focus on writing new features and improvements for the scanners. Strelka contains a set of standard test fixture files that represent the types of files Strelka ingests. Test fixtures can also be loaded remotely with the helper functions `get_remote_fixture` and `get_remote_fixture_archive` for scanner tests that need malicious samples.

The best way to run Strelka's test suite is to build the docker containers. Some of Strelka's scanners have OS level dependencies which make them unsuitable for individual testing.

Expand Down
1 change: 1 addition & 0 deletions src/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pillow-avif-plugin==1.3.1
pillow-heif==0.12.0
pgpdump3==1.5.2
py-tlsh==4.7.2
py7zr==0.20.2
pycdlib==1.14.0
pygments==2.15.0
pylzma==0.5.0
Expand Down
103 changes: 102 additions & 1 deletion src/python/strelka/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import datetime
import gzip
import io
import os
import tarfile
import typing
from pathlib import Path
from zipfile import ZipFile

import magic
import py7zr
import requests

from strelka.strelka import File


def run_test_scan(
mocker, scan_class, fixture_path=None, options=None, backend_cfg=None
mocker,
scan_class,
fixture_fileobj: typing.IO = None,
fixture_path: str = None,
options=None,
backend_cfg=None,
):
if options is None:
options = {}
Expand All @@ -20,6 +35,8 @@ def run_test_scan(

if fixture_path:
data = Path(fixture_path).read_bytes()
elif fixture_fileobj:
data = fixture_fileobj.read()
else:
data = None

Expand All @@ -31,3 +48,87 @@ def run_test_scan(
)

return scanner.event


def get_remote_fixture(url: str, session: requests.Session = None) -> io.BytesIO:
"""Download a fixture from a URL"""

# Get a streamed version of the downloaded file
if session:
response = session.get(url, stream=True)
else:
response = requests.get(url, stream=True)

response.raw.decode_content = True

# Convert the raw file-like object to a real BytesIO object
bytesfile = io.BytesIO()
bytesfile.write(response.raw.read())
bytesfile.seek(0)

return bytesfile


def get_remote_fixture_archive(
url: str, session: requests.Session = None, password: str = None
) -> [dict[str, io.BytesIO]]:
"""Decompress zip, 7zip, gzip, tar+gzip remote fixtures with an optional password"""
bytesfile: io.BytesIO = get_remote_fixture(url, session)

mime: magic.Magic = magic.Magic(mime=True)
mime_type: str = mime.from_buffer(bytesfile.read())
bytesfile.seek(0)

allfiles: dict[str, typing.IO] = {}

if mime_type == "application/zip":
try:
with ZipFile(bytesfile) as archive:
for fileentry in archive.filelist:
if not fileentry.is_dir():
allfiles.update(
{
fileentry.filename: io.BytesIO(
archive.read(
fileentry.filename,
pwd=password.encode("utf-8")
if password
else None,
)
)
}
)
except Exception as e:
raise e

elif mime_type == "application/x-7z-compressed":
try:
with py7zr.SevenZipFile(bytesfile, password=password) as archive:
allfiles = archive.readall()
except Exception as e:
raise e

elif mime_type == "application/gzip":
try:
with gzip.open(bytesfile) as archive:
allfiles.update(
{os.path.basename(url).rstrip(".gz"): io.BytesIO(archive.read())}
)

except Exception as e:
raise e

elif mime_type == "application/x-tar":
try:
with tarfile.open(fileobj=bytesfile) as archive:
for member in archive.getmembers():
if member.isfile():
allfiles.update({member.name: archive.extractfile(member)})

except Exception as e:
raise e

else:
raise ValueError(f"Archive type {mime_type} not supported")

return allfiles

0 comments on commit 534c8bb

Please sign in to comment.