Skip to content

Commit

Permalink
Merge pull request #34 from ElectronicCats/cat_threat
Browse files Browse the repository at this point in the history
feat: Add Thread documentation and Firmware name support
  • Loading branch information
JahazielLem authored Dec 27, 2024
2 parents db22743 + 5ad1a8f commit 4136116
Show file tree
Hide file tree
Showing 16 changed files with 832 additions and 447 deletions.
50 changes: 47 additions & 3 deletions pycatsniffer_bv3/Modules/Protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def __init__(
spacing: float = 0,
channel_range: list = [0, 0],
pcap_header: int = 147,
common_names: list = [],
profile: str = "Default",
):
self.phy_index = phy_index
self.name = name
Expand All @@ -22,6 +24,19 @@ def __init__(
self.spacing = spacing
self.channel_range = channel_range
self.pcap_header = pcap_header
self.common_names = common_names
self.profile = profile

def find_by_name(self, name: str):
if name in self.common_names:
return self
return None

def get_profile(self):
return self.profile

def get_common_name_str(self):
return ", ".join(self.common_names)

def get_phy_index(self):
return self.phy_index
Expand Down Expand Up @@ -159,18 +174,35 @@ def __str__(self):
spacing=2,
channel_range=[(37, 2402), (38, 2426), (39, 2480)],
pcap_header=147,
common_names=["ble", "bluetooth", "bluetoothle"],
)

PROTOCOL_IEEE = Protocol(
PROTOCOL_ZIGBEE = Protocol(
phy_index=bytearray([0x12]),
name="IEEE 802.15.4 QPSK",
name="Zigbee",
phy_label="2405 MHz - Freq Band",
base_frequency=2405.0,
spacing=5,
channel_range=[
(channel, (2405.0 + (5 * (channel - 11)))) for channel in range(11, 27)
],
pcap_header=147,
common_names=["zigbee", "zig", "zb"],
profile="Zigbee",
)

PROTOCOL_THREAT = Protocol(
phy_index=bytearray([0x12]),
name="Threat",
phy_label="2405 MHz - Freq Band",
base_frequency=2405.0,
spacing=5,
channel_range=[
(channel, (2405.0 + (5 * (channel - 11)))) for channel in range(11, 27)
],
pcap_header=147,
common_names=["threat"],
profile="Threat",
)


Expand Down Expand Up @@ -207,7 +239,8 @@ def set_badwidth(self, badwidth_index: int) -> None:

class PROTOCOLSLIST(Definitions.BaseEnum):
PROTOCOL_BLE: Protocol = PROTOCOL_BLE
PROTOCOL_IEEE: Protocol = PROTOCOL_IEEE
PROTOCOL_ZIGBEE: Protocol = PROTOCOL_ZIGBEE
PROTOCOL_THREAT: Protocol = PROTOCOL_THREAT
PROTOCOL_LORA: Protocol = PROTOCOL_LORA

@classmethod
Expand All @@ -229,5 +262,16 @@ def get_str_list_channels(cls, protocol_index: int):
str_list += f"[{channel[0]}] {channel[1]}\n"
return str_list

@classmethod
def get_protocol_by_name(cls, name: str):
for protocol in cls:
if protocol.name == name:
return protocol
else:
get_protocol = protocol.value.find_by_name(name)
if get_protocol:
return get_protocol
return None

def __str__(self):
return super().__str__()
22 changes: 14 additions & 8 deletions pycatsniffer_bv3/Modules/SnifferCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
from .Pcap import Pcap
from .Packets import GeneralUARTPacket, IEEEUARTPacket, BLEUARTPacket, LoraUARTPacket
from .Definitions import DEFAULT_TIMEOUT_JOIN
from .Protocols import PROTOCOL_BLE, PROTOCOL_IEEE, PROTOCOL_LORA, PROTOCOLSLIST
from .Protocols import (
PROTOCOL_BLE,
PROTOCOL_ZIGBEE,
PROTOCOL_LORA,
PROTOCOL_THREAT,
PROTOCOLSLIST,
)
from .Utils import LOG_ERROR, LOG_WARNING, LOG_INFO


Expand Down Expand Up @@ -121,17 +127,16 @@ def open_board_uart(self):
def set_channel_hopping(self, hopping: bool):
self.hopping_channel = hopping

def set_protocol_phy(self, phy: int):
def set_protocol_phy(self, phy: str):
"""Set the phy protocol for the sniffer"""
get_protocol = PROTOCOLSLIST.get_list_protocols()[int(phy)].value
get_protocol = PROTOCOLSLIST.get_protocol_by_name(phy)
self.protocol = get_protocol
self.protocol_linktype = get_protocol.get_pcap_header()

def set_protocol_channel(self, channel: int):
"""Set the protocol channel for the sniffer"""
get_channel = self.protocol.get_channel_range_bytes(channel)
self.protocol_freq_channel = get_channel[0]
self.logger.info(f"Channel: {self.protocol_freq_channel}")

def close_board_uart(self):
self.board_uart.close()
Expand Down Expand Up @@ -196,7 +201,10 @@ def handle_sniffer_data(self):
if self.protocol == PROTOCOL_BLE:
protocol = b"\x03"
phy = bytes.fromhex("05")
elif self.protocol == PROTOCOL_IEEE:
elif (
self.protocol == PROTOCOL_ZIGBEE
or self.protocol == PROTOCOL_THREAT
):
protocol = b"\x02"
phy = bytes.fromhex("03")

Expand Down Expand Up @@ -272,7 +280,6 @@ def dissector(self, packet: bytes) -> bytes:
"""Dissector the packet"""
if self.is_catsniffer == 2:
data_packet = LoraUARTPacket(packet)
self.logger.debug(f"RECV -> {data_packet.get_payload_hex()}")
return data_packet

general_packet = GeneralUARTPacket(packet)
Expand All @@ -284,10 +291,9 @@ def dissector(self, packet: bytes) -> bytes:
if self.protocol == PROTOCOL_BLE:
data_packet = BLEUARTPacket(general_packet.packet_bytes)
packet = data_packet
elif self.protocol == PROTOCOL_IEEE:
elif self.protocol == PROTOCOL_ZIGBEE or self.protocol == PROTOCOL_THREAT:
ieee_packet = IEEEUARTPacket(general_packet.packet_bytes)
packet = ieee_packet
self.logger.debug(f"RECV -> {packet.get_payload_hex()}")
else:
self.logger.error("Protocol not supported yet")
LOG_WARNING("Protocol not supported yet")
Expand Down
13 changes: 11 additions & 2 deletions pycatsniffer_bv3/Modules/UART.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
import serial
import serial.tools.list_ports
import threading
import time
import sys
import re
from .Definitions import START_OF_FRAME, END_OF_FRAME
from .Utils import LOG_ERROR, LOG_WARNING

if platform.system() == "Windows":
DEFAULT_COMPORT = "COM1"
elif platform.system() == "Darwin":
DEFAULT_COMPORT = "/dev/tty.usbmodem0001"
else:
DEFAULT_COMPORT = "/dev/ttyACM0"

DEFAULT_SERIAL_BAUDRATE = 921600
CATSNIFFER_VID = 11914
CATSNIFFER_PID = 192


class UART(threading.Thread):
Expand Down Expand Up @@ -107,3 +109,10 @@ def recv(self):
return self.recv_catsniffer()
else:
return self.recv_boards()

def find_catsniffer_serial_port(self):
ports = serial.tools.list_ports.comports()
for port in ports:
if port.vid == CATSNIFFER_VID and port.pid == CATSNIFFER_PID:
return port.device
return DEFAULT_COMPORT
9 changes: 8 additions & 1 deletion pycatsniffer_bv3/Modules/Wireshark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@


class Wireshark(threading.Thread):
def __init__(self, fifo_name: str = DEFAULT_FILENAME):
def __init__(self, fifo_name: str = DEFAULT_FILENAME, profile: str = "default"):
super().__init__()
self.fifo_name = fifo_name
self.running = True
self.type_worker = "wireshark"
self.wireshark_process = None
self.profile = profile

def run(self):
if platform.system() == "Windows":
Expand All @@ -22,6 +23,8 @@ def run(self):
"-k",
"-i",
f"\\\\.\\pipe\\{self.fifo_name}",
"-C",
self.profile,
]
)

Expand All @@ -33,6 +36,8 @@ def run(self):
"-k",
"-i",
f"/tmp/{self.fifo_name}",
"-C",
self.profile,
]
)
elif platform.system() == "Darwin":
Expand All @@ -42,6 +47,8 @@ def run(self):
"-k",
"-i",
f"/tmp/{self.fifo_name}",
"-C",
self.profile,
]
)
else:
Expand Down
Loading

0 comments on commit 4136116

Please sign in to comment.