Skip to content

Commit

Permalink
feat: Add dissector API
Browse files Browse the repository at this point in the history
  • Loading branch information
rumpelsepp committed Sep 20, 2024
1 parent 2d1935d commit b279457
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 39 deletions.
48 changes: 48 additions & 0 deletions src/gallia/dissect/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Literal, NotRequired, TypeAlias, TypedDict

from gallia.services.uds.core.service import UDSRequest, UDSResponse


@dataclass
class Field:
name: str
raw_data: bytes
dissected_data: str


class BaseDissector(ABC):
PROTO: str = ""

def __init_subclass__(
cls,
/,
proto: str,
**kwargs: Any,
) -> None:
super().__init_subclass__(**kwargs)
cls.PROTO = proto

@abstractmethod
def dissect(self, data: bytes, iotype: str | None = None) -> list[Field]: ...


class UDSDissector(BaseDissector, proto="uds"):
def dissect(self, data: bytes, iotype: str | None = None) -> list[Field]:
if data[0] & 0b01000000:
dissected_data = repr(UDSResponse.parse_dynamic(data))
name = "uds response"
else:
dissected_data = repr(UDSRequest.parse_dynamic(data))
name = "uds request"

return [Field(name=name, raw_data=data, dissected_data=dissected_data)]


# TODO: Can the PROTO attribute be used?
registry: dict[str, BaseDissector] = {"uds": UDSDissector()}
20 changes: 19 additions & 1 deletion src/gallia/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pathlib import Path
from queue import Queue
from types import TracebackType
from typing import TYPE_CHECKING, Any, BinaryIO, Self, TextIO, TypeAlias, cast
from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Self, TextIO, TypeAlias, cast

import msgspec
import zstandard
Expand Down Expand Up @@ -822,3 +822,21 @@ def result(

def get_logger(name: str) -> Logger:
return cast(Logger, logging.getLogger(name))


def log_io(
logger: Logger,
iotype: Literal["read", "write"],
proto: str,
data: bytes,
tags: list[str] | None = None,
trace: bool = False,
) -> None:
# tags without "=" are deprecated
t = [f"=io={iotype}", "encoding=hex", f"proto={proto}", iotype]
if tags is not None:
t += tags
if trace:
logger.trace(data.hex(), extra={"tags": t})
else:
logger.debug(data.hex(), extra={"tags": t})
34 changes: 5 additions & 29 deletions src/gallia/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import binascii
import io
from abc import ABC, abstractmethod
from typing import Any, Literal, Protocol, Self
from typing import Any, Protocol, Self
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

from gallia.log import Logger, get_logger
from gallia.log import get_logger, log_io
from gallia.transports.schemes import TransportScheme
from gallia.utils import join_host_port

Expand Down Expand Up @@ -145,24 +145,6 @@ def __init_subclass__(
cls.SCHEME = scheme
cls.BUFSIZE = bufsize

@staticmethod
def log_io(
logger: Logger,
iotype: Literal["read", "write"],
proto: str,
data: bytes,
tags: list[str] | None,
trace: bool = False,
) -> None:
# tags without "=" are deprecated
t = [f"type=io,{iotype}", "encoding=hex", f"proto={proto}", iotype]
if tags is not None:
t += tags
if trace:
logger.trace(data.hex(), extra={"tags": t})
else:
logger.debug(data.hex(), extra={"tags": t})

@classmethod
def check_scheme(cls, target: TargetURI) -> None:
"""Checks if the provided URI has the correct scheme."""
Expand Down Expand Up @@ -251,13 +233,10 @@ async def write(
timeout: float | None = None,
tags: list[str] | None = None,
) -> int:
t = tags + ["write"] if tags is not None else ["write"]

logger.trace(data.hex() + "0a", extra={"tags": t})

writer = self.get_writer()
writer.write(binascii.hexlify(data) + b"\n")
await asyncio.wait_for(writer.drain(), timeout)
log_io(logger, "write", "lines", data, trace=True)
return len(data)

async def read(
Expand All @@ -266,9 +245,6 @@ async def read(
tags: list[str] | None = None,
) -> bytes:
data = await asyncio.wait_for(self.get_reader().readline(), timeout)
d = data.decode().strip()

t = tags + ["read"] if tags is not None else ["read"]
logger.trace(d + "0a", extra={"tags": t})
log_io(logger, "read", "lines", data, trace=True)

return binascii.unhexlify(d)
return binascii.unhexlify(data.decode().strip())
7 changes: 5 additions & 2 deletions src/gallia/transports/hsfz.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pydantic import BaseModel, field_validator

from gallia.log import get_logger
from gallia.transports.base import BaseTransport, TargetURI
from gallia.transports.base import BaseTransport, TargetURI, log_io
from gallia.utils import auto_int

logger = get_logger(__name__)
Expand Down Expand Up @@ -357,7 +357,9 @@ async def read(
timeout: float | None = None,
tags: list[str] | None = None,
) -> bytes:
return await asyncio.wait_for(self._conn.read_diag_request(), timeout)
data = await asyncio.wait_for(self._conn.read_diag_request(), timeout)
log_io(logger, "read", "hsfz", data, tags, trace=True)
return data

async def write(
self,
Expand All @@ -366,4 +368,5 @@ async def write(
tags: list[str] | None = None,
) -> int:
await asyncio.wait_for(self._conn.write_diag_request(data), timeout)
log_io(logger, "write", "hsfz", data, tags, trace=True)
return len(data)
6 changes: 3 additions & 3 deletions src/gallia/transports/isotp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pydantic import BaseModel, field_validator

from gallia.log import get_logger
from gallia.transports.base import BaseTransport, TargetURI
from gallia.transports.base import BaseTransport, TargetURI, log_io
from gallia.utils import auto_int

logger = get_logger(__name__)
Expand Down Expand Up @@ -182,7 +182,7 @@ async def write(
timeout: float | None = None,
tags: list[str] | None = None,
) -> int:
self.log_io(logger, "write", "isotp", data, tags, trace=True)
log_io(logger, "write", "isotp", data, tags, trace=True)

loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout)
Expand All @@ -198,7 +198,7 @@ async def read(self, timeout: float | None = None, tags: list[str] | None = None
if e.errno == errno.EILSEQ:
raise BrokenPipeError(f"invalid consecutive frame numbers: {e}") from e
raise e
self.log_io(logger, "read", "isotp", data, tags, trace=True)
log_io(logger, "read", "isotp", data, tags, trace=True)
return data

async def close(self) -> None:
Expand Down
65 changes: 61 additions & 4 deletions src/hr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0

import argparse
import binascii
import os
import signal
import sys
Expand All @@ -12,8 +13,51 @@

import msgspec

from gallia import exitcodes
from gallia.log import ColorMode, PenlogPriority, PenlogReader, resolve_color_mode
from gallia import dissect, exitcodes
from gallia.log import ColorMode, PenlogPriority, PenlogReader, PenlogRecord, resolve_color_mode


def extract_tag_info(raw_tag: str) -> str:
parts = raw_tag.split("=", maxsplit=2)
if len(parts) != 2:
raise ValueError()
return parts[1]


def get_io_info(tags: list[str]) -> tuple[str, str]:
found_iotype = False
found_proto = False
for tag in tags:
if tag.startswith("io"):
iotype = extract_tag_info(tag)
found_iotype = True
if tag.startswith("proto"):
proto = extract_tag_info(tag)
found_proto = True
if found_iotype and found_proto:
return iotype, proto
raise ValueError()


def dissect_record(record: PenlogRecord) -> str | None:
if record.tags is None:
return None

try:
iotype, proto = get_io_info(record.tags)
except ValueError:
return None

if proto not in dissect.registry:
return None

dissector = dissect.registry[proto]
data = binascii.unhexlify(record.data)

out = ""
for field in dissector.dissect(data, iotype):
out += f"{field.name}: {field.dissected_data}"
return out


def parse_args() -> argparse.Namespace:
Expand Down Expand Up @@ -50,7 +94,14 @@ def parse_args() -> argparse.Namespace:
"--lines",
type=int,
default=100,
help="print the last n lines",
help="argument for --head/--tail)",
)
parser.add_argument(
"-d",
"--dissect",
action=argparse.BooleanOptionalAction,
default=False,
help="dissect log messages with io and proto tags",
)
parser.add_argument(
"--color",
Expand Down Expand Up @@ -81,7 +132,13 @@ def _main() -> int:

for record in record_generator:
record.colored = colored
print(record, end="")
if args.dissect:
# TODO: Maybe a method of the record? record.dissect()?
if (dissected := dissect_record(record)) is not None:
print(f" {dissected}")
print(record, end="")
else:
print(record, end="")

return 0

Expand Down

0 comments on commit b279457

Please sign in to comment.