diff --git a/modules/telemetry/v1/data_block.py b/modules/telemetry/v1/data_block.py index 0fa1a1d..f1cc8a1 100644 --- a/modules/telemetry/v1/data_block.py +++ b/modules/telemetry/v1/data_block.py @@ -105,6 +105,8 @@ def parse(block_subtype: DataBlockSubtype, payload: bytes) -> DataBlock: DataBlockSubtype.TEMPERATURE: TemperatureDB, DataBlockSubtype.PRESSURE: PressureDB, DataBlockSubtype.HUMIDITY: HumidityDB, + DataBlockSubtype.ACCELERATION: LinearAccelerationDB, + DataBlockSubtype.ANGULAR_VELOCITY: AngularVelocityDB, } subtype = SUBTYPE_CLASSES.get(block_subtype) @@ -234,7 +236,7 @@ def __init__(self, mission_time: int, pressure: int) -> None: Constructs a pressure data block. Args: - mission_time: The mission time the altitude was measured at in milliseconds since launch. + mission_time: The mission time the pressure was measured at in milliseconds since launch. pressure: The pressure in millibars. """ @@ -308,6 +310,98 @@ def __iter__(self): yield "percentage", round(self.humidity / 100) +class LinearAccelerationDB(DataBlock): + """Represents a linear acceleration data block""" + + def __init__(self, mission_time: int, x_axis: int, y_axis: int, z_axis: int) -> None: + """ + Constructs a linear acceleration data block. + + Args: + mission_time: The mission time the linear acceleration was measured in milliseconds since launch. + x_axis: The acceleration about the x axis in meters per second squared. + y_axis: The acceleration about the y axis in meters per second squared. + z_axis: The acceleration about the z axis in meters per second squared. + + """ + super().__init__(mission_time) + self.x_axis: int = x_axis + self.y_axis: int = y_axis + self.z_axis: int = z_axis + + @classmethod + def from_bytes(cls, payload: bytes) -> Self: + """ + Constructs a linear acceleration data block from bytes. + Returns: + A linear acceleration data block. + """ + parts = struct.unpack(" int: + """ + Get the length of a linear acceleration data block in bytes + Returns: + The length of a linear acceleration data block in bytes not including the block header. + """ + return 10 + + def __str__(self): + return f"""{self.__class__.__name__} -> time: {self.mission_time} ms, x-axis: {self.x_axis} m/s^2, y-axis: + {self.y_axis} m/s^2, z-axis: {self.z_axis} m/s^2""" + + def __iter__(self): + yield "mission_time", self.mission_time + yield "acceleration", {"x_axis": self.x_axis, "y_axis": self.y_axis, "z_axis": self.z_axis} + + +class AngularVelocityDB(DataBlock): + """Represents an angular velocity data block""" + + def __init__(self, mission_time: int, x_axis: int, y_axis: int, z_axis: int) -> None: + """ + Constructus an angular velocity data block. + + Args: + mission_time: The mission time the angular velocity was measured in milliseconds since launch. + x_axis: The velocity about the x axis in degrees per second. + y_axis: The velocity about the y axis in degrees per second. + z_axis: The velocity about the z axis in degrees per second. + + """ + super().__init__(mission_time) + self.x_axis: int = x_axis + self.y_axis: int = y_axis + self.z_axis: int = z_axis + + @classmethod + def from_bytes(cls, payload: bytes) -> Self: + """ + Constructs an angular velocity data block from bytes. + Returns: + An angular velocity data block. + """ + parts = struct.unpack(" int: + """ + Get the length of an angular velocity data block in bytes + Returns: + The length of an angular velocity data block in bytes not including the block header. + """ + return 10 + + def __str__(self): + return f"""{self.__class__.__name__} -> time: {self.mission_time} ms, x-axis: {self.x_axis} dps, y-axis: + {self.y_axis} dps, z-axis: {self.z_axis} dps""" + + def __iter__(self): + yield "mission_time", self.mission_time + yield "velocity", {"x_axis": self.x_axis, "y_axis": self.y_axis, "z_axis": self.z_axis} + + def parse_data_block(type: DataBlockSubtype, payload: bytes) -> DataBlock: """ Parses a bytes payload into the correct data block type. @@ -331,5 +425,9 @@ def parse_data_block(type: DataBlockSubtype, payload: bytes) -> DataBlock: return PressureDB.from_bytes(payload) case DataBlockSubtype.HUMIDITY: return HumidityDB.from_bytes(payload) + case DataBlockSubtype.ACCELERATION: + return LinearAccelerationDB.from_bytes(payload) + case DataBlockSubtype.ANGULAR_VELOCITY: + return AngularVelocityDB.from_bytes(payload) case _: raise NotImplementedError diff --git a/tests/parsing/test_block_data.py b/tests/parsing/test_block_data.py index aca6b55..b7bb885 100644 --- a/tests/parsing/test_block_data.py +++ b/tests/parsing/test_block_data.py @@ -2,8 +2,7 @@ __author__ = "Elias Hawa" import pytest -from modules.telemetry.v1.data_block import PressureDB -from modules.telemetry.v1.data_block import TemperatureDB +from modules.telemetry.v1.data_block import PressureDB, TemperatureDB, LinearAccelerationDB, AngularVelocityDB @pytest.fixture @@ -26,6 +25,34 @@ def temperature_data_content() -> bytes: return b"\x00\x00\x00\x00\xf0\x55\x00\x00" +@pytest.fixture +def linear_acceleration_data_content() -> bytes: + """ + Returns a linear acceleration sensor reading with the following attributes + mission time: 0ms + x axis acceleration: 3cm/s^2 + y axis acceleration: -4cm/s^2 + z axis acceleration: 1032cm/s^2 + Note that LinearAccelerationDB from_bytes method should convert the axis values + from cm/s^2 to m/s^2 + """ + return b"\x00\x00\x00\x00\x03\x00\xfc\xff\x08\x04\x00\x00" + + +@pytest.fixture +def angular_velocity_data_content() -> bytes: + """ + Returns an angular velocity sensor reading with the following attributes + mission time: 0ms + x axis velocity: 60 tenths of a degree per second + y axis velocity: 110 tenths of a degree per second + z axis velocity -30 tenths of a degree per second + Note that the AngularVelocityDb from_bytes method should convert the axis values + from tenths of a degree per second to degrees per second + """ + return b"\x00\x00\x00\x00\x06\x00\x0b\x00\xfd\xff\x00\x00" + + def test_pressure_data_block(pressure_data_content: bytes) -> None: """Test that the pressure data block is parsed correctly.""" pdb = PressureDB.from_bytes(pressure_data_content) @@ -40,3 +67,23 @@ def test_temperature_data_block(temperature_data_content: bytes) -> None: assert tdb.mission_time == 0 assert tdb.temperature == 22000 + + +def test_linear_acceleration_data_block(linear_acceleration_data_content: bytes) -> None: + """Test that the linear acceleration is parsed correctly.""" + lin_acc = LinearAccelerationDB.from_bytes(linear_acceleration_data_content) + + assert lin_acc.mission_time == 0 + assert lin_acc.x_axis == 0.03 + assert lin_acc.y_axis == -0.04 + assert lin_acc.z_axis == 10.32 + + +def test_angular_velocity_data_block(angular_velocity_data_content: bytes) -> None: + """Test that the angular velocity is parsed correctly.""" + ang_vel = AngularVelocityDB.from_bytes(angular_velocity_data_content) + + assert ang_vel.mission_time == 0 + assert ang_vel.x_axis == 0.6 + assert ang_vel.y_axis == 1.1 + assert ang_vel.z_axis == -0.3 diff --git a/tests/parsing/test_full_telemetry_parsing.py b/tests/parsing/test_full_telemetry_parsing.py index 77d8307..d39d04f 100644 --- a/tests/parsing/test_full_telemetry_parsing.py +++ b/tests/parsing/test_full_telemetry_parsing.py @@ -10,6 +10,8 @@ from modules.telemetry.telemetry_utils import parse_radio_block, from_approved_callsign from modules.misc.config import load_config +# Fixtures and tests to ensure that parse_radio_block works as expected + @pytest.fixture def pkt_version() -> int: @@ -41,21 +43,20 @@ def test_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_cont """ prb = parse_radio_block(pkt_version, block_header, hex_block_contents) assert prb is not None - if prb is not None: - assert prb.block_header.length == 12 - assert prb.block_header.message_type == 0 - assert prb.block_header.message_subtype == 2 - assert prb.block_header.destination == 0 - assert prb.block_name == "temperature" - assert prb.block_contents["mission_time"] == 0 + assert prb.block_header.length == 12 + assert prb.block_header.message_type == 0 + assert prb.block_header.message_subtype == 2 + assert prb.block_header.destination == 0 + assert prb.block_name == "temperature" + assert prb.block_contents["mission_time"] == 0 -# fixtures +# Fixtures and tests to ensure that parse_radio_block handles errors as expected @pytest.fixture def not_implemented_datablock_subtype() -> BlockHeader: - return BlockHeader.from_hex("02000400") + return BlockHeader.from_hex("02000600") def test_invalid_datablock_subtype(pkt_version: int, hex_block_contents: str): @@ -80,7 +81,7 @@ def test_not_implemented_error( config = load_config("config.json") -# Fixtures +# Fixtures and tests to ensure that from_approved_callsign works as expected @pytest.fixture @@ -103,9 +104,6 @@ def non_approved_callsign() -> PacketHeader: return pkt_hdr -# Tests - - # Test valid header def test_is_approved_pkt_hdr( valid_packet_header: PacketHeader, approved_callsigns: dict[str, str], caplog: LogCaptureFixture @@ -131,7 +129,7 @@ def test_is_unauthorized_callsign( def test_is_invalid_hdr(approved_callsigns: dict[str, str]) -> None: hdr = "564133494e490000000c000137000000" with pytest.raises(UnsupportedEncodingVersionError, match="Unsupported encoding version: 0"): - from_approved_callsign(PacketHeader.from_hex(hdr), approved_callsigns) + PacketHeader.from_hex(hdr) # Test an invalid header: non approved callsign and incorrect version number @@ -139,4 +137,4 @@ def test_is_invalid_hdr2(approved_callsigns: dict[str, str]) -> None: hdr = "52415454204D4F53530c0b0137000000" with pytest.raises(UnsupportedEncodingVersionError, match="Unsupported encoding version: 11"): - from_approved_callsign(PacketHeader.from_hex(hdr), approved_callsigns) + PacketHeader.from_hex(hdr)