Skip to content

Commit

Permalink
Merge pull request #96 from CarletonURocketry/eh/linaccel_angvel
Browse files Browse the repository at this point in the history
Add datablock parsing for linear acceleration and angular velocity
  • Loading branch information
linguini1 authored May 16, 2024
2 parents 1e00205 + 2121412 commit 31288bd
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 18 deletions.
100 changes: 99 additions & 1 deletion modules/telemetry/v1/data_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def parse(block_subtype: DataBlockSubtype, payload: bytes) -> DataBlock:
DataBlockSubtype.TEMPERATURE: TemperatureDB,
DataBlockSubtype.PRESSURE: PressureDB,
DataBlockSubtype.HUMIDITY: HumidityDB,
DataBlockSubtype.ACCELERATION: LinearAccelerationDB,
DataBlockSubtype.ANGULAR_VELOCITY: AngularVelocityDB,
}

subtype = SUBTYPE_CLASSES.get(block_subtype)
Expand Down Expand Up @@ -234,7 +236,7 @@ def __init__(self, mission_time: int, pressure: int) -> None:
Constructs a pressure data block.
Args:
mission_time: The mission time the altitude was measured at in milliseconds since launch.
mission_time: The mission time the pressure was measured at in milliseconds since launch.
pressure: The pressure in millibars.
"""
Expand Down Expand Up @@ -308,6 +310,98 @@ def __iter__(self):
yield "percentage", round(self.humidity / 100)


class LinearAccelerationDB(DataBlock):
"""Represents a linear acceleration data block"""

def __init__(self, mission_time: int, x_axis: int, y_axis: int, z_axis: int) -> None:
"""
Constructs a linear acceleration data block.
Args:
mission_time: The mission time the linear acceleration was measured in milliseconds since launch.
x_axis: The acceleration about the x axis in meters per second squared.
y_axis: The acceleration about the y axis in meters per second squared.
z_axis: The acceleration about the z axis in meters per second squared.
"""
super().__init__(mission_time)
self.x_axis: int = x_axis
self.y_axis: int = y_axis
self.z_axis: int = z_axis

@classmethod
def from_bytes(cls, payload: bytes) -> Self:
"""
Constructs a linear acceleration data block from bytes.
Returns:
A linear acceleration data block.
"""
parts = struct.unpack("<Ihhhh", payload)
return cls(parts[0], parts[1] / 100, parts[2] / 100, parts[3] / 100)

def __len__(self) -> int:
"""
Get the length of a linear acceleration data block in bytes
Returns:
The length of a linear acceleration data block in bytes not including the block header.
"""
return 10

def __str__(self):
return f"""{self.__class__.__name__} -> time: {self.mission_time} ms, x-axis: {self.x_axis} m/s^2, y-axis:
{self.y_axis} m/s^2, z-axis: {self.z_axis} m/s^2"""

def __iter__(self):
yield "mission_time", self.mission_time
yield "acceleration", {"x_axis": self.x_axis, "y_axis": self.y_axis, "z_axis": self.z_axis}


class AngularVelocityDB(DataBlock):
"""Represents an angular velocity data block"""

def __init__(self, mission_time: int, x_axis: int, y_axis: int, z_axis: int) -> None:
"""
Constructus an angular velocity data block.
Args:
mission_time: The mission time the angular velocity was measured in milliseconds since launch.
x_axis: The velocity about the x axis in degrees per second.
y_axis: The velocity about the y axis in degrees per second.
z_axis: The velocity about the z axis in degrees per second.
"""
super().__init__(mission_time)
self.x_axis: int = x_axis
self.y_axis: int = y_axis
self.z_axis: int = z_axis

@classmethod
def from_bytes(cls, payload: bytes) -> Self:
"""
Constructs an angular velocity data block from bytes.
Returns:
An angular velocity data block.
"""
parts = struct.unpack("<Ihhhh", payload)
return cls(parts[0], parts[1] / 10, parts[2] / 10, parts[3] / 10)

def __len__(self) -> int:
"""
Get the length of an angular velocity data block in bytes
Returns:
The length of an angular velocity data block in bytes not including the block header.
"""
return 10

def __str__(self):
return f"""{self.__class__.__name__} -> time: {self.mission_time} ms, x-axis: {self.x_axis} dps, y-axis:
{self.y_axis} dps, z-axis: {self.z_axis} dps"""

def __iter__(self):
yield "mission_time", self.mission_time
yield "velocity", {"x_axis": self.x_axis, "y_axis": self.y_axis, "z_axis": self.z_axis}


def parse_data_block(type: DataBlockSubtype, payload: bytes) -> DataBlock:
"""
Parses a bytes payload into the correct data block type.
Expand All @@ -331,5 +425,9 @@ def parse_data_block(type: DataBlockSubtype, payload: bytes) -> DataBlock:
return PressureDB.from_bytes(payload)
case DataBlockSubtype.HUMIDITY:
return HumidityDB.from_bytes(payload)
case DataBlockSubtype.ACCELERATION:
return LinearAccelerationDB.from_bytes(payload)
case DataBlockSubtype.ANGULAR_VELOCITY:
return AngularVelocityDB.from_bytes(payload)
case _:
raise NotImplementedError
51 changes: 49 additions & 2 deletions tests/parsing/test_block_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
__author__ = "Elias Hawa"

import pytest
from modules.telemetry.v1.data_block import PressureDB
from modules.telemetry.v1.data_block import TemperatureDB
from modules.telemetry.v1.data_block import PressureDB, TemperatureDB, LinearAccelerationDB, AngularVelocityDB


@pytest.fixture
Expand All @@ -26,6 +25,34 @@ def temperature_data_content() -> bytes:
return b"\x00\x00\x00\x00\xf0\x55\x00\x00"


@pytest.fixture
def linear_acceleration_data_content() -> bytes:
"""
Returns a linear acceleration sensor reading with the following attributes
mission time: 0ms
x axis acceleration: 3cm/s^2
y axis acceleration: -4cm/s^2
z axis acceleration: 1032cm/s^2
Note that LinearAccelerationDB from_bytes method should convert the axis values
from cm/s^2 to m/s^2
"""
return b"\x00\x00\x00\x00\x03\x00\xfc\xff\x08\x04\x00\x00"


@pytest.fixture
def angular_velocity_data_content() -> bytes:
"""
Returns an angular velocity sensor reading with the following attributes
mission time: 0ms
x axis velocity: 60 tenths of a degree per second
y axis velocity: 110 tenths of a degree per second
z axis velocity -30 tenths of a degree per second
Note that the AngularVelocityDb from_bytes method should convert the axis values
from tenths of a degree per second to degrees per second
"""
return b"\x00\x00\x00\x00\x06\x00\x0b\x00\xfd\xff\x00\x00"


def test_pressure_data_block(pressure_data_content: bytes) -> None:
"""Test that the pressure data block is parsed correctly."""
pdb = PressureDB.from_bytes(pressure_data_content)
Expand All @@ -40,3 +67,23 @@ def test_temperature_data_block(temperature_data_content: bytes) -> None:

assert tdb.mission_time == 0
assert tdb.temperature == 22000


def test_linear_acceleration_data_block(linear_acceleration_data_content: bytes) -> None:
"""Test that the linear acceleration is parsed correctly."""
lin_acc = LinearAccelerationDB.from_bytes(linear_acceleration_data_content)

assert lin_acc.mission_time == 0
assert lin_acc.x_axis == 0.03
assert lin_acc.y_axis == -0.04
assert lin_acc.z_axis == 10.32


def test_angular_velocity_data_block(angular_velocity_data_content: bytes) -> None:
"""Test that the angular velocity is parsed correctly."""
ang_vel = AngularVelocityDB.from_bytes(angular_velocity_data_content)

assert ang_vel.mission_time == 0
assert ang_vel.x_axis == 0.6
assert ang_vel.y_axis == 1.1
assert ang_vel.z_axis == -0.3
28 changes: 13 additions & 15 deletions tests/parsing/test_full_telemetry_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from modules.telemetry.telemetry_utils import parse_radio_block, from_approved_callsign
from modules.misc.config import load_config

# Fixtures and tests to ensure that parse_radio_block works as expected


@pytest.fixture
def pkt_version() -> int:
Expand Down Expand Up @@ -41,21 +43,20 @@ def test_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_cont
"""
prb = parse_radio_block(pkt_version, block_header, hex_block_contents)
assert prb is not None
if prb is not None:
assert prb.block_header.length == 12
assert prb.block_header.message_type == 0
assert prb.block_header.message_subtype == 2
assert prb.block_header.destination == 0
assert prb.block_name == "temperature"
assert prb.block_contents["mission_time"] == 0
assert prb.block_header.length == 12
assert prb.block_header.message_type == 0
assert prb.block_header.message_subtype == 2
assert prb.block_header.destination == 0
assert prb.block_name == "temperature"
assert prb.block_contents["mission_time"] == 0


# fixtures
# Fixtures and tests to ensure that parse_radio_block handles errors as expected


@pytest.fixture
def not_implemented_datablock_subtype() -> BlockHeader:
return BlockHeader.from_hex("02000400")
return BlockHeader.from_hex("02000600")


def test_invalid_datablock_subtype(pkt_version: int, hex_block_contents: str):
Expand All @@ -80,7 +81,7 @@ def test_not_implemented_error(

config = load_config("config.json")

# Fixtures
# Fixtures and tests to ensure that from_approved_callsign works as expected


@pytest.fixture
Expand All @@ -103,9 +104,6 @@ def non_approved_callsign() -> PacketHeader:
return pkt_hdr


# Tests


# Test valid header
def test_is_approved_pkt_hdr(
valid_packet_header: PacketHeader, approved_callsigns: dict[str, str], caplog: LogCaptureFixture
Expand All @@ -131,12 +129,12 @@ def test_is_unauthorized_callsign(
def test_is_invalid_hdr(approved_callsigns: dict[str, str]) -> None:
hdr = "564133494e490000000c000137000000"
with pytest.raises(UnsupportedEncodingVersionError, match="Unsupported encoding version: 0"):
from_approved_callsign(PacketHeader.from_hex(hdr), approved_callsigns)
PacketHeader.from_hex(hdr)


# Test an invalid header: non approved callsign and incorrect version number
def test_is_invalid_hdr2(approved_callsigns: dict[str, str]) -> None:
hdr = "52415454204D4F53530c0b0137000000"

with pytest.raises(UnsupportedEncodingVersionError, match="Unsupported encoding version: 11"):
from_approved_callsign(PacketHeader.from_hex(hdr), approved_callsigns)
PacketHeader.from_hex(hdr)

0 comments on commit 31288bd

Please sign in to comment.