Skip to content

Commit

Permalink
Made taggers return db keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ooprathamm committed Mar 20, 2024
1 parent 4daed60 commit 2caf018
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 40 deletions.
16 changes: 10 additions & 6 deletions floss/qs/db/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,29 @@ class ExpertStringDatabase:
def __len__(self) -> int:
return len(self.string_rules) + len(self.substring_rules) + len(self.regex_rules)

def query(self, s: str) -> Set[str]:
ret = set()
def query(self, s: str) -> Tuple[Set[str], List[ExpertRule]]:
ret_set = set()
ret_list = list()

if s in self.string_rules:
ret.add(self.string_rules[s].tag)
ret_set.add(self.string_rules[s].tag)
ret_list.append(self.string_rules[s])

# note that this is O(m * n)
# #strings * #rules
for rule in self.substring_rules:
if rule.value in s:
ret.add(rule.tag)
ret_set.add(rule.tag)
ret_list.append(rule)

# note that this is O(m * n)
# #strings * #rules
for rule, regex in self.regex_rules:
if regex.search(s):
ret.add(rule.tag)
ret_set.add(rule.tag)
ret_list.append(rule)

return ret
return ret_set, ret_list

@classmethod
def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase":
Expand Down
10 changes: 7 additions & 3 deletions floss/qs/db/gp.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ def __contains__(self, other: bytes | str) -> bool:
if isinstance(other, bytes):
return other in self.string_hashes
elif isinstance(other, str):
m = hashlib.md5()
m.update(other.encode("utf-8"))
return m.digest()[:8] in self.string_hashes
return self.get_hash(other) in self.string_hashes
else:
raise ValueError("other must be bytes or str")

@staticmethod
def get_hash(string: str) -> bytes:
m = hashlib.md5()
m.update(string.encode("utf-8"))
return m.digest()[:8]

@classmethod
def from_file(cls, path: pathlib.Path) -> "StringHashDatabase":
string_hashes: Set[bytes] = set()
Expand Down
73 changes: 42 additions & 31 deletions floss/qs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dataclasses import field, dataclass

import pefile
import msgspec
import colorama
import lancelot
import rich.traceback
Expand All @@ -29,9 +30,9 @@
import floss.qs.db.oss
import floss.qs.db.expert
import floss.qs.db.winapi
from floss.qs.db.gp import StringHashDatabase, StringGlobalPrevalenceDatabase
from floss.qs.db.oss import OpenSourceStringDatabase
from floss.qs.db.expert import ExpertStringDatabase
from floss.qs.db.gp import StringGlobalPrevalence, StringHashDatabase, StringGlobalPrevalenceDatabase
from floss.qs.db.oss import OpenSourceString, OpenSourceStringDatabase
from floss.qs.db.expert import ExpertRule, ExpertStringDatabase
from floss.qs.db.winapi import WindowsApiStringDatabase

logger = logging.getLogger("quantumstrand")
Expand Down Expand Up @@ -112,12 +113,14 @@ class ExtractedString:

Tag = str

MetadataType = Union[StringGlobalPrevalence, OpenSourceString, ExpertRule]

@dataclass
class TaggedString:
string: ExtractedString
tags: Set[Tag]
structure: str = ""
meta: Sequence[MetadataType] = field(default_factory=list)

@property
def offset(self) -> int:
Expand Down Expand Up @@ -198,7 +201,8 @@ def from_layout(cls, layout: 'Layout') -> 'ResultDocument':
result = cls(layout.slice, layout.name,
[TaggedString(string.string,
string.tags,
string.structure)
string.structure,
string.meta)
for string in layout.strings],
)
if isinstance(layout, PELayout):
Expand Down Expand Up @@ -484,71 +488,72 @@ def get_reloc_offsets(slice: Slice, pe: pefile.PE) -> Set[int]:

def check_is_xor(xor_key: Union[int, None]):
if isinstance(xor_key, int):
return ("#decoded",)
return ("#decoded",), []

return ()
return (), []


def check_is_reloc(reloc_offsets: Set[int], string: ExtractedString):
for addr in string.slice.range:
if addr in reloc_offsets:
return ("#reloc",)
return ("#reloc",), []

return ()
return (), []


def check_is_code(code_offsets: Set[int], string: ExtractedString):
for addr in string.slice.range:
if addr in code_offsets:
return ("#code",)
return ("#code",), []

return ()
return (), []


def query_code_string_database(db: StringGlobalPrevalenceDatabase, string: str):
if db.query(string):
return ("#code-junk",)
return ("#code-junk",), db.query(string)

return ()
return (), []


def query_global_prevalence_database(db: StringGlobalPrevalenceDatabase, string: str):
if db.query(string):
return ("#common",)
return ("#common",), db.query(string)

return ()
return (), []


def query_global_prevalence_hash_database(db: StringHashDatabase, string: str):
if string in db:
return ("#common",)
return ("#common",), [{'hash': str(db.get_hash(string))}]

return ()
return (), []


def query_library_string_database(db: OpenSourceStringDatabase, string: str) -> Sequence[Tag]:
def query_library_string_database(db: OpenSourceStringDatabase, string: str):
meta = db.metadata_by_string.get(string)
if not meta:
return ()
return (), []

return (f"#{meta.library_name}",)
return (f"#{meta.library_name}",), meta


def query_expert_string_database(db: ExpertStringDatabase, string: str) -> Sequence[Tag]:
return tuple(db.query(string))
def query_expert_string_database(db: ExpertStringDatabase, string: str):
tag, meta = db.query(string)
return tag, meta


def query_winapi_name_database(db: WindowsApiStringDatabase, string: str) -> Sequence[Tag]:
def query_winapi_name_database(db: WindowsApiStringDatabase, string: str):
if string.lower() in db.dll_names:
return ("#winapi",)
return ("#winapi",), []

if string in db.api_names:
return ("#winapi",)
return ("#winapi",), []

return ()
return (), []


Tagger = Callable[[ExtractedString], Sequence[Tag]]
Tagger = Callable[[ExtractedString], Tuple[Sequence[Tag], MetadataType]]


def load_databases() -> Sequence[Tagger]:
Expand All @@ -557,7 +562,7 @@ def load_databases() -> Sequence[Tagger]:
def query_database(db, queryfn, string: ExtractedString):
return queryfn(db, string.string)

def make_tagger(db, queryfn) -> Sequence[Tag]:
def make_tagger(db, queryfn) -> Tuple[Sequence[Tag], MetadataType]:
return functools.partial(query_database, db, queryfn)

for db in floss.qs.db.winapi.get_default_databases():
Expand Down Expand Up @@ -728,16 +733,20 @@ def tag_strings(self, taggers: Sequence[Tagger]):
# this routine will transform them into TaggedStrings.
assert isinstance(string, ExtractedString)
tags: Set[Tag] = set()
metas: List[MetadataType] = list()

string_counts[string.string] += 1

if string_counts[string.string] > 1:
tags.add("#duplicate")

for tagger in taggers:
tags.update(tagger(string))
tag, meta = tagger(string)
tags.update(tag)
if meta != [] and meta not in metas:
metas.append(meta)

tagged_strings.append(TaggedString(string, tags))
tagged_strings.append(TaggedString(string, tags, meta=metas))
self.strings = tagged_strings

for child in self.children:
Expand Down Expand Up @@ -1144,11 +1153,13 @@ class QSJSONEncoder(json.JSONEncoder):
"""

def default(self, o):
if isinstance(o, set):
return list(o)
if isinstance(o, MetadataType):
return json.loads(msgspec.json.encode(o))
if dataclasses.is_dataclass(o):
if isinstance(o, Slice):
return o.range
elif isinstance(o, set):
return list(o)
else:
return dataclasses.asdict(o)

Expand Down

0 comments on commit 2caf018

Please sign in to comment.