Skip to content

Commit

Permalink
Refactor PdfTreeVerifier and IndeterminateNode classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ashariyar committed Oct 16, 2022
1 parent eea132d commit 108a75c
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 106 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# NEXT RELEASE

### 1.12.2
* Refactor `PdfTreeVerifier` and `IndeterminateNode` out of `Pdfalyzer` class

### 1.12.1
* Check for any explicit `/Kids` relationships when placing indeterminate nodes
* All other things being equal prefer a single `/Page` or `/Pages` referrer as the parent
Expand Down
96 changes: 96 additions & 0 deletions pdfalyzer/decorators/pdf_tree_verifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Verify that the PDF tree is complete/contains all the nodes in the PDF file.
"""
from PyPDF2.errors import PdfReadError
from PyPDF2.generic import IndirectObject, NameObject, NumberObject
from rich.markup import escape
from yaralyzer.output.rich_console import console
from yaralyzer.util.logging import log

from pdfalyzer.util.adobe_strings import *


class PdfTreeVerifier:
def __init__(self, pdfalyzer: 'Pdfalyzer') -> None:
self.pdfalyzer = pdfalyzer

def verify_all_nodes_encountered_are_in_tree(self) -> None:
"""Make sure every node we can see is reachable from the root of the tree"""
missing_nodes = [
node for idnum, node in self.pdfalyzer.nodes_encountered.items()
if self.pdfalyzer.find_node_by_idnum(idnum) is None
]

if len(missing_nodes) > 0:
msg = f"Nodes were traversed but never placed: {escape(str(missing_nodes))}\n" + \
"For link nodes like /First, /Next, /Prev, and /Last this might be no big deal - depends " + \
"on the PDF. But for other node typtes this could indicate missing data in the tree."
console.print(msg)
log.warning(msg)

def verify_unencountered_are_untraversable(self) -> None:
"""Make sure any PDF object IDs we can't find in tree are /ObjStm or /Xref nodes"""
if self.pdfalyzer.pdf_size is None:
log.warning(f"{SIZE} not found in PDF trailer; cannot verify all nodes are in tree")
return
if self.pdfalyzer.max_generation > 0:
log.warning(f"Methodd doesn't check revisions but this doc is generation {self.pdfalyzer.max_generation}")

# We expect to see all ordinals up to the number of nodes /Trailer claims exist as obj. IDs.
missing_node_ids = [i for i in range(1, self.pdfalyzer.pdf_size) if self.pdfalyzer.find_node_by_idnum(i) is None]

for idnum in missing_node_ids:
ref = IndirectObject(idnum, self.pdfalyzer.max_generation, self.pdfalyzer.pdf_reader)

try:
obj = ref.get_object()
except PdfReadError as e:
if 'Invalid Elementary Object' in str(e):
log.warning(f"Couldn't verify elementary obj with id {idnum} is properly in tree")
continue
log.error(str(e))
console.print_exception()
obj = None
raise e

if obj is None:
log.error(f"Cannot find ref {ref} in PDF!")
continue
elif isinstance(obj, (NumberObject, NameObject)):
log.info(f"Obj {idnum} is a {type(obj)} w/value {obj}; if relationshipd by /Length etc. this is a nonissue but maybe worth doublechecking")
continue
elif not isinstance(obj, dict):
log.error(f"Obj {idnum} ({obj}) of type {type(obj)} isn't dict, cannot determine if it should be in tree")
continue
elif TYPE not in obj:
msg = f"Obj {idnum} has no {TYPE} and is not in tree. Either a loose node w/no data or an error in pdfalyzer."
msg += f"\nHere's the contents for you to assess:\n{obj}"
log.warning(msg)
continue

obj_type = obj[TYPE]

if obj_type == OBJECT_STREAM:
log.debug(f"Object with id {idnum} not found in tree because it's an {OBJECT_STREAM}")
elif obj[TYPE] == XREF:
placeable = XREF_STREAM in self.pdfalyzer.pdf_reader.trailer

for k, v in self.pdfalyzer.pdf_reader.trailer.items():
xref_val_for_key = obj.get(k)

if k in [XREF_STREAM, PREV]:
continue
elif k == SIZE:
if xref_val_for_key is None or v != (xref_val_for_key + 1):
log.info(f"{XREF} has {SIZE} of {xref_val_for_key}, trailer has {SIZE} of {v}")
placeable = False

continue
elif k not in obj or v != obj.get(k):
log.info(f"Trailer has {k} -> {v} but {XREF} obj has {obj.get(k)} at that key")
placeable = False

if placeable:
self.pdfalyzer.pdf_tree.add_child(self.pdfalyzer._build_or_find_node(ref, XREF_STREAM))
else:
log.warning(f"{XREF} Obj {idnum} not found in tree!")
2 changes: 0 additions & 2 deletions pdfalyzer/output/pdfalyzer_presenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,12 @@ def print_tree(self) -> None:
else:
console.print(Text(pre) + node.__rich__())

self.pdfalyzer._verify_all_nodes_encountered_are_in_tree()
console.print("\n\n")

def print_rich_table_tree(self) -> None:
"""Print the rich view of the PDF tree."""
print_section_header(f'Rich tree view of {self.pdfalyzer.pdf_basename}')
console.print(generate_rich_tree(self.pdfalyzer.pdf_tree))
self.pdfalyzer._verify_all_nodes_encountered_are_in_tree()

def print_summary(self) -> None:
print_section_header(f'PDF Node Summary for {self.pdfalyzer.pdf_basename}')
Expand Down
129 changes: 26 additions & 103 deletions pdfalyzer/pdfalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
from anytree import LevelOrderIter, SymlinkNode
from anytree.search import findall, findall_by_attr
from PyPDF2 import PdfReader
from PyPDF2.errors import PdfReadError
from PyPDF2.generic import IndirectObject, NameObject, NumberObject
from rich.markup import escape
from PyPDF2.generic import IndirectObject
from yaralyzer.helpers.bytes_helper import get_bytes_info
from yaralyzer.helpers.file_helper import load_binary_data
from yaralyzer.output.rich_console import console
from yaralyzer.util.logging import log, set_log_level
from yaralyzer.util.logging import log

from pdfalyzer.decorators.document_model_printer import print_with_header
from pdfalyzer.decorators.pdf_tree_verifier import PdfTreeVerifier
from pdfalyzer.decorators.indeterminate_node import IndeterminateNode
from pdfalyzer.decorators.pdf_tree_node import PdfTreeNode
from pdfalyzer.font_info import FontInfo
from pdfalyzer.helpers.string_helper import all_strings_are_same_ignoring_numbers, has_a_common_substring
from pdfalyzer.pdf_object_relationship import PdfObjectRelationship
from pdfalyzer.util.adobe_strings import *
from pdfalyzer.util.exceptions import PdfWalkError
Expand All @@ -48,19 +46,22 @@ def __init__(self, pdf_path: str):
self.max_generation = 0 # PDF revisions are "generations"; this is the max generation encountered

# Bootstrap the root of the tree with the trailer. PDFs are always read trailer first.
# Extract trailer. Technically the trailer has no PDF obj. ID but we set it to the /Size of the PDF
# Technically the trailer has no PDF Object ID but we set it to the /Size of the PDF.
trailer = self.pdf_reader.trailer
self.pdf_size = trailer.get(SIZE)
trailer_id = self.pdf_size if self.pdf_size is not None else TRAILER_FALLBACK_ID
self.pdf_tree = PdfTreeNode(trailer, TRAILER, trailer_id)
self.nodes_encountered[self.pdf_tree.idnum] = self.pdf_tree
self.walk_node(self.pdf_tree) # Build tree by recursively following relationships between nodes

# Build tree by recursively following relationships between nodes
self.walk_node(self.pdf_tree)

# After scanning all objects we place nodes whose position was uncertain, extract fonts, and verify
self._resolve_indeterminate_nodes()
self._extract_font_infos()
self._verify_all_nodes_encountered_are_in_tree()
self._verify_unencountered_are_untraversable()
self.verifier = PdfTreeVerifier(self)
self.verifier.verify_all_nodes_encountered_are_in_tree()
self.verifier.verify_unencountered_are_untraversable()

# Create SymlinkNodes for relationships between PDF objects that are not parent/child relationships.
# (Do this last because it has the side effect of making a lot more nodes)
Expand Down Expand Up @@ -94,11 +95,6 @@ def find_node_by_idnum(self, idnum) -> Optional[PdfTreeNode]:
else:
raise PdfWalkError(f"Too many nodes had id {idnum}: {nodes}")

def stream_nodes(self) -> List[PdfTreeNode]:
"""List of actual nodes (not SymlinkNodes) containing streams sorted by PDF object ID"""
stream_filter = lambda node: node.contains_stream() and not isinstance(node, SymlinkNode)
return sorted(findall(self.pdf_tree, stream_filter), key=lambda r: r.idnum)

def is_in_tree(self, search_for_node: PdfTreeNode) -> bool:
"""Returns true if search_for_node is in the tree already."""
for node in self.node_iterator():
Expand All @@ -111,6 +107,11 @@ def node_iterator(self) -> Iterator[PdfTreeNode]:
"""Iterate over nodes, grouping them by distance from the root."""
return LevelOrderIter(self.pdf_tree)

def stream_nodes(self) -> List[PdfTreeNode]:
"""List of actual nodes (not SymlinkNodes) containing streams sorted by PDF object ID"""
stream_filter = lambda node: node.contains_stream() and not isinstance(node, SymlinkNode)
return sorted(findall(self.pdf_tree, stream_filter), key=lambda r: r.idnum)

def _process_relationship(self, relationship: PdfObjectRelationship) -> Optional[PdfTreeNode]:
"""
Place the relationship 'node' in the tree. Returns an optional node that should be
Expand Down Expand Up @@ -147,32 +148,35 @@ def _process_relationship(self, relationship: PdfObjectRelationship) -> Optional
if relationship.to_obj.idnum in self.indeterminate_ids:
log.info(f" Found {relationship} => {to_node} was marked indeterminate but now placed")
self.indeterminate_ids.remove(relationship.to_obj.idnum)

# If the relationship is indeterminate or we've seen the PDF object before, add it as
# a non-tree relationship for now. An attempt to place the node will be made at the end.
elif relationship.is_indeterminate or relationship.is_link or was_seen_before:
# If the relationship is indeterminate or we've seen the PDF object before, add it as
# a non-tree relationship for now. An attempt to place the node will be made at the end.
to_node.add_non_tree_relationship(relationship)

# If we already encountered 'to_node' then skip adding it to the queue of nodes to walk
if was_seen_before:
if relationship.to_obj.idnum not in self.indeterminate_ids and to_node.parent is None:
raise PdfWalkError(f"{relationship} - ref has no parent and is not indeterminate")
else:
log.debug(f" Already saw {relationship}; not scanning next")
return None

# Indeterminate relationships need to wait until everything has been scanned to be placed
if relationship.is_indeterminate or (relationship.is_link and not self.is_in_tree(to_node)):
elif relationship.is_indeterminate or (relationship.is_link and not self.is_in_tree(to_node)):
log.info(f' Indeterminate ref {relationship}')
self.indeterminate_ids.add(to_node.idnum)
# Link nodes like /Dest are usually just links between nodes
elif relationship.is_link:
log.debug(f" Link ref {relationship}") # Link nodes like /Dest are usually just links between nodes
log.debug(f" Link ref {relationship}")

# If no other conditions are met make from_node the parent of to_node
else:
# If no other conditions are met, add the relationship as a child
from_node.add_child(to_node)

return to_node

def _resolve_indeterminate_nodes(self) -> None:
"""Place all unplaced nodes in the tree."""
"""Place all indeterminate nodes in the tree."""
#set_log_level('INFO')
indeterminate_nodes = [self.nodes_encountered[idnum] for idnum in self.indeterminate_ids]
indeterminate_nodes_string = "\n ".join([f"{node}" for node in indeterminate_nodes])
Expand All @@ -198,7 +202,7 @@ def _extract_font_infos(self) -> None:
]

def _build_or_find_node(self, relationship: IndirectObject, relationship_key: str) -> PdfTreeNode:
"""If node exists in self.nodes_encountered return it, otherwise build a node and store it."""
"""If node in self.nodes_encountered already then return it, otherwise build a node and store it."""
if relationship.idnum in self.nodes_encountered:
return self.nodes_encountered[relationship.idnum]

Expand All @@ -211,84 +215,3 @@ def _print_nodes_encountered(self) -> None:
"""Debug method that displays which nodes have already been walked"""
for i in sorted(self.nodes_encountered.keys()):
console.print(f'{i}: {self.nodes_encountered[i]}')

def _verify_all_nodes_encountered_are_in_tree(self) -> None:
"""Make sure every node we can see is reachable from the root of the tree"""
missing_nodes = [
node for idnum, node in self.nodes_encountered.items()
if self.find_node_by_idnum(idnum) is None
]

if len(missing_nodes) > 0:
msg = f"Nodes were traversed but never placed: {escape(str(missing_nodes))}\n" + \
"For link nodes like /First, /Next, /Prev, and /Last this might be no big deal - depends " + \
"on the PDF. But for other node typtes this could indicate missing data in the tree."
console.print(msg)
log.warning(msg)

def _verify_unencountered_are_untraversable(self) -> None:
"""Make sure any PDF object IDs we can't find in tree are /ObjStm or /Xref nodes"""
if self.pdf_size is None:
log.warning(f"{SIZE} not found in PDF trailer; cannot verify all nodes are in tree")
return
if self.max_generation > 0:
log.warning(f"_verify_unencountered_are_untraversable() only checking generation {self.max_generation}")

# We expect to see all ordinals up to the number of nodes /Trailer claims exist as obj. IDs.
missing_node_ids = [i for i in range(1, self.pdf_size) if self.find_node_by_idnum(i) is None]

for idnum in missing_node_ids:
ref = IndirectObject(idnum, self.max_generation, self.pdf_reader)

try:
obj = ref.get_object()
except PdfReadError as e:
if 'Invalid Elementary Object' in str(e):
log.warning(f"Couldn't verify elementary obj with id {idnum} is properly in tree")
continue
log.error(str(e))
console.print_exception()
obj = None
raise e

if obj is None:
log.error(f"Cannot find ref {ref} in PDF!")
continue
elif isinstance(obj, (NumberObject, NameObject)):
log.info(f"Obj {idnum} is a {type(obj)} w/value {obj}; if relationshipd by /Length etc. this is a nonissue but maybe worth doublechecking")
continue
elif not isinstance(obj, dict):
log.error(f"Obj {idnum} ({obj}) of type {type(obj)} isn't dict, cannot determine if it should be in tree")
continue
elif TYPE not in obj:
msg = f"Obj {idnum} has no {TYPE} and is not in tree. Either a loose node w/no data or an error in pdfalyzer."
msg += f"\nHere's the contents for you to assess:\n{obj}"
log.warning(msg)
continue

obj_type = obj[TYPE]

if obj_type == OBJECT_STREAM:
log.debug(f"Object with id {idnum} not found in tree because it's an {OBJECT_STREAM}")
elif obj[TYPE] == XREF:
placeable = XREF_STREAM in self.pdf_reader.trailer

for k, v in self.pdf_reader.trailer.items():
xref_val_for_key = obj.get(k)

if k in [XREF_STREAM, PREV]:
continue
elif k == SIZE:
if xref_val_for_key is None or v != (xref_val_for_key + 1):
log.info(f"{XREF} has {SIZE} of {xref_val_for_key}, trailer has {SIZE} of {v}")
placeable = False

continue
elif k not in obj or v != obj.get(k):
log.info(f"Trailer has {k} -> {v} but {XREF} obj has {obj.get(k)} at that key")
placeable = False

if placeable:
self.pdf_tree.add_child(self._build_or_find_node(ref, XREF_STREAM))
else:
log.warning(f"{XREF} Obj {idnum} not found in tree!")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pdfalyzer"
version = "1.12.1"
version = "1.12.2"
description = "A PDF analysis toolkit. Scan a PDF with relevant YARA rules, visualize its inner tree-like data structure in living color (lots of colors), force decodes of suspicious font binaries, and more."
authors = ["Michel de Cryptadamus <[email protected]>"]
license = "GPL-3.0-or-later"
Expand Down

0 comments on commit 108a75c

Please sign in to comment.