diff --git a/.gitignore b/.gitignore index 2448b17..677bb16 100644 --- a/.gitignore +++ b/.gitignore @@ -186,3 +186,6 @@ OD.* # setuptools-scm */_version.py + +# xml +*.xsd diff --git a/oresat_configs/__init__.py b/oresat_configs/__init__.py index d7fefa2..0dadb90 100644 --- a/oresat_configs/__init__.py +++ b/oresat_configs/__init__.py @@ -11,7 +11,7 @@ "pyyaml missing/installed without libyaml bindings. See oresat-configs README.md for more" ) from e -from dataclasses import dataclass +import os from typing import Union from ._yaml_to_od import ( @@ -25,6 +25,7 @@ from .beacon_config import BeaconConfig from .card_info import Card, cards_from_csv from .constants import Consts, NodeId, OreSatId, __version__ +from .edl_cmd_defs import EdlCommandDefinition, EdlCommandDefinitions, EdlCommandField __all__ = ["Card", "Consts", "NodeId", "OreSatId", "__version__"] @@ -57,3 +58,19 @@ def __init__(self, mission: Union[OreSatId, Consts, str]): self.beacon_def = _gen_c3_beacon_defs(c3_od, beacon_config) self.fram_def = _gen_c3_fram_defs(c3_od, self.configs["c3"]) self.fw_base_od = _gen_fw_base_od(mission, FW_COMMON_CONFIG_PATH) + + # edl commands definitions + node_ids = {} + opd_addrs = {} + for name in self.configs: + card = self.cards[name] + if card.node_id != 0: + node_ids[name] = card.node_id + if card.opd_address != 0: + opd_addrs[name] = card.opd_address + custom_enums = { + "node_id": node_ids, + "opd_addr": opd_addrs, + } + edl_file_path = f"{os.path.dirname(os.path.abspath(__file__))}/edl_cmd_defs.yaml" + self.edl_cmd_defs = EdlCommandDefinitions(edl_file_path, custom_enums) diff --git a/oresat_configs/base/c3.yaml b/oresat_configs/base/c3.yaml index eb9d75c..0461969 100644 --- a/oresat_configs/base/c3.yaml +++ b/oresat_configs/base/c3.yaml @@ -118,7 +118,7 @@ objects: - subindex: 0x8 name: good_test_threshold data_type: uint16 - description: | + description: >- when testing the antennas, any voltage above this threshold is considered good access_type: const default: 100 diff --git a/oresat_configs/base/solar.yaml b/oresat_configs/base/solar.yaml index 6d8f340..392ae71 100644 --- a/oresat_configs/base/solar.yaml +++ b/oresat_configs/base/solar.yaml @@ -127,10 +127,9 @@ objects: - index: 0x4003 name: mppt_alg data_type: uint8 - description: | - mppt (maximum power point tracking) algorithm - - 0: perturb and observse - access_type: rw + description: mppt (maximum power point tracking) algorithm + value_descriptions: + perturb_and_observse: 0 - index: 0x4004 name: lt1618_iadj diff --git a/oresat_configs/edl_cmd_defs.py b/oresat_configs/edl_cmd_defs.py new file mode 100644 index 0000000..6760003 --- /dev/null +++ b/oresat_configs/edl_cmd_defs.py @@ -0,0 +1,227 @@ +"""Used to parse edl_cmd_defs.yaml that defines all EDL command requests and responses.""" + +import struct +from dataclasses import dataclass, field +from typing import Any, Union + +from dacite import from_dict +from yaml import CLoader, load + +_COMMAND_DATA_FMT = { + "bool": "?", + "int8": "b", + "int16": "h", + "int32": "i", + "int64": "q", + "uint8": "B", + "uint16": "H", + "uint32": "I", + "uint64": "Q", + "float32": "f", + "float64": "d", +} + + +@dataclass +class EdlCommandField: + """A field in EDL command request or response packet.""" + + name: str + """ + str: Unique name (scope of the fields in the command, not all fields in all commands) for the + EDL command field. + """ + data_type: str + """ + str: Data type of field. + + Can be: + - signed integers: `"int8"`, `"int16"`, `"int32"`, `"int64"` + - unsigned integers: `"uint8"`, `"uint16"`, `"uint32"`, `"uint64"` + - floats: `"float32"`, `"float64"` + - string: `"str"` (NOTE: `fix_size` or `max_size` must be set.) + - binary: `"bytes"` (NOTE: `fix_size` or `size_prefix` must be set.) + """ + description: str = "" + """str: A short description of the EDL command field.""" + enums: dict[str, int] = field(default_factory=dict) + """dict[str, int]: Enum values for "uintX" or "bool" types.""" + max_size: int = 0 + """ + int: Max size in bytes for variable "str" data types. String must end with a '\0'. + This takes precedence over fix_size. + """ + size_prefix: str = "" + """ + str: Data type of leading prefix bytes used to determind the size of a "bytes" field. Set to a + "uintX" data type. This takes precedence over fix_size. + """ + fixed_size: int = 0 + """ + int: Fixed size in bytes for "bytes" or "str" data types. Value that are not the full size + will be filled with "\0" at the end as padding. + """ + unit: str = "" + "str: Optional unit for the field" + + +@dataclass +class EdlCommandDefinition: + """A EDL command.""" + + uid: int + """int: Unique id to identify the EDL command.""" + name: str + """str: A unique snake_case name for the EDL command.""" + description: str = "" + """str: A short description of the EDL command.""" + request: list[EdlCommandField] = field(default_factory=list) + """list[EdlCommandDefinition]: List of request fields for the EDL command.""" + response: list[EdlCommandField] = field(default_factory=list) + """list[EdlCommandDefinition]: List of response fields for the EDL command.""" + + def _dynamic_len(self, fields: list[EdlCommandField]) -> bool: + return True in [f.size_prefix != "" for f in fields] + + def _decode(self, raw: bytes, fields: list[EdlCommandField]) -> tuple[Any]: + + # fixed size packet - quick decode + if not self._dynamic_len(fields): + fmt = "".join([_COMMAND_DATA_FMT[f.data_type] for f in fields]) + return struct.unpack(fmt, raw) + + # dynamic size packet - slower decode + data: dict[str, Any] = {} + offset = 0 + for f in fields: + if f.data_type in _COMMAND_DATA_FMT: + data_type_size = struct.calcsize(_COMMAND_DATA_FMT[f.data_type]) + tmp = raw[offset : offset + data_type_size] + fmt = _COMMAND_DATA_FMT[f.data_type] + data[f.name] = struct.unpack(fmt, tmp)[0] + elif f.data_type == "bytes": + if f.size_prefix != "": # dynamic length in bits + size_prefix = int(f.size_prefix[4:]) // 8 + data_type_size_raw = raw[offset : offset + size_prefix] + data_type_size = int.from_bytes(data_type_size_raw, "little") // 8 + offset += size_prefix + else: # fix_size + data_type_size = f.fixed_size + data[f.name] = raw[offset : offset + data_type_size] + elif f.data_type == "str": + if f.max_size != "": # dynamic length that ends with "\0" + data_type_size = raw[offset:].find(b"\0") + else: # fix_size + data_type_size = f.fixed_size + data[f.name] = raw[offset : offset + data_type_size].decode() + else: + raise ValueError(f"invalid edl field {f.name}") + offset += data_type_size + + return tuple(data.values()) + + def _encode(self, values: tuple[Any], fields: list[EdlCommandField]) -> bytes: + + if not isinstance(values, (tuple, list)): + values = (values,) + + if len(values) != len(fields): + raise ValueError( + f"invalid number of values for packet: got {len(values)} expected {len(fields)}" + ) + + # fixed size packet - quick encode + if not self._dynamic_len(fields): + fmt = "".join([_COMMAND_DATA_FMT[f.data_type] for f in fields]) + return struct.pack(fmt, *values) + + # dynamic size packet - slower encode + raw = b"" + for f, v in zip(fields, values): + if f.data_type in _COMMAND_DATA_FMT: + fmt = _COMMAND_DATA_FMT[f.data_type] + raw += struct.pack(fmt, v) + elif f.data_type == "bytes": + value = v + if f.size_prefix != "": # dynamic length in bits + fmt = _COMMAND_DATA_FMT[f.size_prefix] + raw += struct.pack(fmt, len(v) * 8) + else: # fixed length + value += b"\x00" * (f.fixed_size - len(value)) + raw += value + elif f.data_type == "str": + value = v.encode() + if f.max_size != "": # dynamic length that ends with "\0" + value += b"\0" + else: # fixed length + value += b"\0" * (f.fixed_size - len(value)) + raw += value + else: + raise ValueError(f"invalid data type {f.data_type} for edl field {f.name}") + return raw + + def decode_request(self, raw: bytes) -> tuple[Any]: + """Decode a EDL request payload.""" + return self._decode(raw, self.request) + + def encode_request(self, values: tuple[Any]) -> bytes: + """Encode a EDL request payload.""" + return self._encode(values, self.request) + + def decode_response(self, raw: bytes) -> tuple[Any]: + """Decode a EDL response payload.""" + return self._decode(raw, self.response) + + def encode_response(self, values: tuple[Any]) -> bytes: + """Encode a EDL reponse payload.""" + return self._encode(values, self.response) + + +class EdlCommandDefinitions: + """ + A custom dictionary-like class to store EDL commands that can use the EDL command uid and EDL + command name as keys. + """ + + def __init__(self, file_path: str, custom_enums: dict[str, dict[str, int]] = {}): + self._names: dict[str, EdlCommandDefinition] = {} + self._uids: dict[int, EdlCommandDefinition] = {} + + _raw = {} + with open(file_path, "r") as f: + _raw = load(f, Loader=CLoader) + + for command_raw in _raw.get("commands", []): + command = from_dict(data_class=EdlCommandDefinition, data=command_raw) + command.description = command.description.replace("\n", "") + for req in command.request: + req.description = req.description.replace("\n", "") + if req.name in custom_enums: + req.enums = custom_enums[req.name] + for res in command.response: + res.description = res.description.replace("\n", "") + if res.name in custom_enums: + res.enums = custom_enums[res.name] + self._uids[command.uid] = command + self._names[command.name] = command + + def __getitem__(self, value: Union[int, str]) -> EdlCommandDefinition: + return self._uids.get(value) or self._names.get(value) # type: ignore + + def __len__(self) -> int: + return len(self._uids) + + def __iter__(self): + return iter(self._uids) + + def values(self): + """Get dictionary values.""" + return self._uids.values() + + def names(self): + """Get command names.""" + return self._names.keys() + + def uid(self): + """Get command unique ids.""" + return self._uids.keys() diff --git a/oresat_configs/edl_cmd_defs.yaml b/oresat_configs/edl_cmd_defs.yaml new file mode 100644 index 0000000..e1d78c2 --- /dev/null +++ b/oresat_configs/edl_cmd_defs.yaml @@ -0,0 +1,235 @@ +# fields with name opd_addr or node_id, will have their enums generated +commands: + - uid: 0x0 + name: tx_control + description: enable / disable tx + request: + - name: enable + data_type: bool + description: true to enable tx or false to disable tx + enums: + disable: 0 + enable: 1 + response: + - name: status + data_type: bool + description: tx status + enums: + disabled: 0 + enabled: 1 + + - uid: 0x1 + name: c3_soft_reset + description: soft reset the c3 (reboot c3 daemon) + + - uid: 0x2 + name: c3_hard_reset + description: hard reset the c3 (reboot system) + + - uid: 0x3 + name: c3_factory_reset + description: factory reset the c3 (clear FRAM, reset RTC, and reboot system) + + - uid: 0x4 + name: co_node_enable + description: enable a CANopen node + request: + - name: node_id + data_type: uint8 + description: node id of the CANopen node to enable / disable + - name: enable + data_type: bool + description: true to enable or false to disable + enums: + disable: 0 + enable: 1 + response: + - name: status + data_type: uint8 + description: node status + + - uid: 0x5 + name: co_node_status + description: get the status of a CANopen node + request: + - name: node_id + data_type: uint8 + description: node id of node to get the status for + response: + - name: status + data_type: uint8 + description: node status + + - uid: 0x6 + name: co_sdo_write + description: write a value to a node's OD over the CAN bus using a CANopen SDO message + request: + - name: node_id + data_type: uint8 + description: the id of The CANopen node to write to + - name: index + data_type: uint16 + description: the OD index to write to + - name: subindex + data_type: uint8 + description: the OD subindex to write to + - name: buffer + data_type: bytes + description: sdo data buffer to write + size_prefix: "uint32" + response: + - name: error_code + data_type: uint32 + description: SDO error code (0 is no error) + + - uid: 0x7 + name: co_sync + description: send a CANopen SYNC message on the CAN bus + response: + - name: sent + data_type: bool + description: the CANopen SYNC message was sent successfully + + - uid: 0x8 + name: opd_sysenable + description: enable the OPD subsystem + request: + - name: enable + data_type: bool + description: true to enable or false to disable + enums: + disable: 0 + enable: 1 + response: + - name: status + data_type: bool + description: OPD subsystem enable status + enums: + disabled: 0 + enabled: 1 + + - uid: 0x9 + name: opd_scan + description: scan for all nodes on the OPD + response: + - name: nodes_found + data_type: uint8 + description: the number of nodes found + + - uid: 0xA + name: opd_probe + description: probe for a node on the OPD + request: + - name: opd_addr + data_type: uint8 + description: the id of the OPD node to probe for + response: + - name: found + data_type: bool + description: true if the node was found or false if not + enums: + not_found: 0 + found: 1 + + - uid: 0xB + name: opd_enable + description: enable / disable a node on the OPD + request: + - name: opd_addr + data_type: uint8 + description: the id of the OPD node to enable / disable + - name: enable + data_type: bool + description: true to enable or false to disable + enums: + disable: 0 + enable: 1 + response: + - name: status + data_type: uint8 + description: OPD node status + + - uid: 0xC + name: opd_reset + description: reset a node on the OPD + request: + - name: opd_addr + data_type: uint8 + description: The id of the OPD node to reset + response: + - name: status + data_type: uint8 + description: OPD node status + + - uid: 0xD + name: opd_status + description: get the status of a node on the OPD + request: + - name: opd_addr + data_type: uint8 + description: the id of the OPD node to get the status of + response: + - name: status + data_type: uint8 + description: OPD node status + + - uid: 0xE + name: rtc_set_time + description: set the RTC time + request: + - name: time + data_type: uint32 + description: the Unix time to set the rtc to + unit: s + + - uid: 0xF + name: time_sync + description: >- + c3 will send OreSat's time sync TPDO over the CAN bus (all nodes that are powered on and care + about time will sync to it) + response: + - name: sent + data_type: bool + description: time sync was sent + + - uid: 0x10 + name: beacon_ping + description: c3 will response with a beacon regardless of tx state + + - uid: 0x11 + name: ping + description: a basic ping to the c3 + request: + - name: value + data_type: uint32 + description: a value to return + response: + - name: value + data_type: uint32 + description: the parameter value + + - uid: 0x12 + name: rx_test + description: empty command for c3 Rx testing + + - uid: 0x13 + name: co_sdo_read + description: read a value from a node's OD over the CAN bus using a CANopen SDO message + request: + - name: node_id + data_type: uint8 + description: the id of The CANopen node to write to + - name: index + data_type: uint16 + description: the OD index to write to + - name: subindex + data_type: uint8 + description: the OD subindex to write to + response: + - name: error_code + data_type: uint32 + description: SDO error code (0 is no error) + - name: buffer + data_type: bytes + description: data buffer + size_prefix: "uint32" diff --git a/oresat_configs/scripts/gen_xtce.py b/oresat_configs/scripts/gen_xtce.py index 81e16e0..f6aa7d4 100644 --- a/oresat_configs/scripts/gen_xtce.py +++ b/oresat_configs/scripts/gen_xtce.py @@ -1,4 +1,12 @@ -"""Generate XTCE for the beacon.""" +""" +Generate XTCE for the beacon. + +To validate generated XTCE +- download the xsd: + curl -O https://www.omg.org/spec/XTCE/20180204/SpaceSystem.xsd +- run xmllint: + xmllint --noout --schema SpaceSystem.xsd *.xtce +""" import xml.etree.ElementTree as ET from argparse import ArgumentParser, Namespace @@ -7,7 +15,7 @@ import canopen -from .. import Consts, OreSatConfig +from .. import Consts, EdlCommandField, OreSatConfig, __version__ GEN_XTCE = "generate beacon xtce file" @@ -53,9 +61,10 @@ def register_subparser(subparsers: Any) -> None: canopen.objectdictionary.UNSIGNED16: "uint16", canopen.objectdictionary.UNSIGNED32: "uint32", canopen.objectdictionary.UNSIGNED64: "uint64", - canopen.objectdictionary.VISIBLE_STRING: "string", - canopen.objectdictionary.REAL32: "float", - canopen.objectdictionary.REAL64: "double", + canopen.objectdictionary.VISIBLE_STRING: "str", + canopen.objectdictionary.OCTET_STRING: "bytes", + canopen.objectdictionary.REAL32: "float32", + canopen.objectdictionary.REAL64: "float64", } DT_LEN = { @@ -69,6 +78,7 @@ def register_subparser(subparsers: Any) -> None: canopen.objectdictionary.UNSIGNED32: 32, canopen.objectdictionary.UNSIGNED64: 64, canopen.objectdictionary.VISIBLE_STRING: 0, + canopen.objectdictionary.OCTET_STRING: 0, canopen.objectdictionary.REAL32: 32, canopen.objectdictionary.REAL64: 64, } @@ -133,7 +143,7 @@ def write_xtce(config: OreSatConfig, dir_path: str = ".") -> None: attrib={ "validationStatus": "Working", "classification": "NotClassified", - "version": f'{config.od_db["c3"]["beacon"]["revision"].value}.0', + "version": f"{__version__}", "date": datetime.now().strftime("%Y-%m-%d"), }, ) @@ -142,247 +152,664 @@ def write_xtce(config: OreSatConfig, dir_path: str = ".") -> None: author.text = "PSAS (Portland State Aerospace Society)" tm_meta = ET.SubElement(root, "TelemetryMetaData") - tm_meta_para = ET.SubElement(tm_meta, "ParameterTypeSet") + para_type_set = ET.SubElement(tm_meta, "ParameterTypeSet") + + para_set = ET.SubElement(tm_meta, "ParameterSet") - # hard-code the unitless uint32 type for the crc32 - uint32_type = ET.SubElement( - tm_meta_para, - "IntegerParameterType", + cont_set = ET.SubElement(tm_meta, "ContainerSet") + seq_cont = ET.SubElement( + cont_set, + "SequenceContainer", attrib={ - "name": "uint32_type", + "name": "beacon", }, ) - ET.SubElement(uint32_type, "UnitSet") - bin_data_enc = ET.SubElement( - uint32_type, - "IntegerDataEncoding", - attrib={ - "bitOrder": "leastSignificantBitFirst", - "encoding": "unsigned", - "sizeInBits": "32", - }, + beacon_entry_list = ET.SubElement(seq_cont, "EntryList") + ET.SubElement( + beacon_entry_list, + "ParameterRefEntry", + attrib={"parameterRef": "ax25_header"}, ) - # hard-code the 128b type for the AX.25 parameter - uint128_type = ET.SubElement( - tm_meta_para, - "BinaryParameterType", + # hard-code data type + _add_parameter_type(para_type_set, "b128_type", "bytes", default=b"\x00" * 16) + _add_parameter_type(para_type_set, "unix_time_type", "unix_time") + _add_parameter_type(para_type_set, "uint32_type", "uint32") + para_types = ["unix_time_type", "b128_type", "uint32_type"] + + # beacon headers + _add_parameter(para_set, "ax25_header", "b128_type", "AX.25 Header") + _add_parameter_ref(beacon_entry_list, "ax25_header") + + for obj in config.beacon_def: + para_name = make_obj_name(obj) + para_type_name = make_dt_name(obj) + if para_type_name not in para_types: + para_types.append(para_type_name) + + data_type = CANOPEN_TO_XTCE_DT[obj.data_type] + value_descriptions = {name: value for value, name in obj.value_descriptions.items()} + _add_parameter_type( + root=para_type_set, + name=para_type_name, + data_type=data_type, + description=obj.description, + unit=obj.unit, + factor=obj.factor, + default=obj.default, + value_descriptions=value_descriptions, + ) + + _add_parameter(para_set, para_name, para_type_name, obj.description) + _add_parameter_ref(beacon_entry_list, para_name) + + # beacon tails + _add_parameter(para_set, "crc32", "uint32_type", "beacon crc32") + _add_parameter_ref(beacon_entry_list, "crc32") + + # OreSat0 was before oresat-configs and had different commands + if config.mission != Consts.ORESAT0: + _add_edl(config, root, cont_set, para_type_set, para_set, para_types) + + tree = ET.ElementTree(root) + ET.indent(tree, space=" ", level=0) + file_name = f"{config.mission.filename()}.xtce" + tree.write(f"{dir_path}/{file_name}", encoding="utf-8", xml_declaration=True) + + +def _add_edl( + config: OreSatConfig, + root: ET.Element, + cont_set: ET.Element, + para_type_set: ET.Element, + para_set: ET.Element, + para_types: list[str], +): + cmd_meta_data = ET.SubElement(root, "CommandMetaData") + arg_type_set = ET.SubElement(cmd_meta_data, "ArgumentTypeSet") + + # custom argument types + node_ids = {} + for name in config.od_db: + if config.cards[name].node_id != 0: + node_ids[config.cards[name].nice_name] = config.cards[name].node_id + _add_argument_type( + arg_type_set, EdlCommandField("node_id", "uint8", enums=node_ids), "node_id_type" + ) + opd_addrs = {} + for name in config.od_db: + if config.cards[name].opd_address != 0: + opd_addrs[config.cards[name].nice_name] = config.cards[name].opd_address + _add_argument_type( + arg_type_set, EdlCommandField("opd_addr", "uint8", enums=opd_addrs), "opd_addr_type" + ) + + arg_types = ["opd_addr_type", "node_id_type"] + + _add_parameter_type( + para_type_set, + "edl_command_code_type", + "uint8", + value_descriptions={cmd.name: cmd.uid for cmd in config.edl_cmd_defs.values()}, + ) + _add_parameter(para_set, "edl_command_code", "edl_command_code_type") + res_seq_cont = ET.SubElement( + cont_set, + "SequenceContainer", attrib={ - "name": "b128_type", - "shortDescription": "128 bitfield", + "name": "edl_responses", }, ) - ET.SubElement(uint128_type, "UnitSet") - bin_data_enc = ET.SubElement( - uint128_type, "BinaryDataEncoding", attrib={"bitOrder": "leastSignificantBitFirst"} + res_entry_list = ET.SubElement(res_seq_cont, "EntryList") + ET.SubElement( + res_entry_list, + "ParameterRefEntry", + attrib={"parameterRef": "edl_command_code"}, ) - bin_data_enc_size = ET.SubElement( - bin_data_enc, - "SizeInBits", + meta_cmd_set = ET.SubElement(cmd_meta_data, "MetaCommandSet") + + # uslp containers + meta_cmd = ET.SubElement( + meta_cmd_set, "MetaCommand", attrib={"name": "uslp_header", "abstract": "true"} ) - bin_data_enc_size_fixed = ET.SubElement( - bin_data_enc_size, - "FixedValue", + uslp_header_cont = ET.SubElement(meta_cmd, "CommandContainer", attrib={"name": "uslp_header"}) + uslp_header_entry_list = ET.SubElement(uslp_header_cont, "EntryList") + + # fill uslp transfer frame container + uslp_fields = [ + # uslp primary header + (EdlCommandField("version_number", "uint4"), 0xC), + (EdlCommandField("spacecraft_id", "uint16"), 0x4F53), + (EdlCommandField("src_dest", "bool"), 0), + (EdlCommandField("virtual_channel_id", "uint6"), 0), + (EdlCommandField("map_id", "uint4"), 0), + (EdlCommandField("eof_flag", "bool"), 0), + (EdlCommandField("frame_length", "uint16"), 0), + (EdlCommandField("bypass_sequence_control_flag", "bool"), 0), + (EdlCommandField("protocol_control_command_flag", "bool"), 0), + (EdlCommandField("reserved", "uint2"), 0), + (EdlCommandField("operation_control_field_flag", "bool"), 0), + (EdlCommandField("vc_frame_count_length", "uint3"), 0), + # uslp transfer frame insert zone + (EdlCommandField("sequence_number", "uint32"), 0), + # uslp data field header + (EdlCommandField("tfdz_contruction_rules", "uint3"), 0x7), + (EdlCommandField("protocol_id", "uint5"), 0x5), + ] + + uslp_seq_cont = ET.SubElement( + cont_set, + "SequenceContainer", + attrib={"name": "uslp_header", "abstract": "true"}, ) - bin_data_enc_size_fixed.text = "128" + uslp_entry_list = ET.SubElement(uslp_seq_cont, "EntryList") + + _add_parameter_type(para_type_set, "bytes_hmac_type", "bytes", default="00" * 32) + _add_parameter(para_set, "hmac", "bytes_hmac_type") + _add_parameter(para_set, "uslp_fecf", "uint32_type") + + for subpacket, fixed_value in uslp_fields: + size = "1" if subpacket.data_type == "bool" else subpacket.data_type[4:] + if int(size) > 16 and int(size) <= 32: + value = f"{fixed_value:08X}" + elif int(size) > 8 and int(size) <= 16: + value = f"{fixed_value:04X}" + else: + value = f"{fixed_value:02X}" - # hard-code the unix time type - para_type = ET.SubElement( - tm_meta_para, - "AbsoluteTimeParameterType", - attrib={ - "name": "unix_time", - "shortDescription": "Unix coarse timestamp", - }, + para_name = f"uslp_{subpacket.name}" + + # add uslp subpacket to telemetery + para_type_name = f"{subpacket.data_type}_type" + if para_type_name not in para_types: + para_types.append(para_type_name) + _add_parameter_type(para_type_set, para_type_name, subpacket.data_type) + _add_parameter(para_set, para_name, para_type_name) + _add_parameter_ref(uslp_entry_list, para_name) + + # add uslp subpacket to telecommand + ET.SubElement( + uslp_header_entry_list, + "FixedValueEntry", + attrib={"name": para_name, "binaryValue": value, "sizeInBits": size}, + ) + uslp_entry_list.append(ET.Comment("child containers go here")) + para_ref_entry = _add_parameter_ref(uslp_entry_list, "hmac") + loc_in_cont = ET.SubElement( + para_ref_entry, "LocationInContainerInBits", attrib={"referenceLocation": "nextEntry"} ) - enc = ET.SubElement(para_type, "Encoding") - ET.SubElement( - enc, - "IntegerDataEncoding", - attrib={ - "byteOrder": "leastSignificantByteFirst", - "sizeInBits": "32", - }, + fixed_value_str = ET.SubElement(loc_in_cont, "FixedValue") + fixed_value_str.text = str(32 * 8 + 16) + para_ref_entry = _add_parameter_ref(uslp_entry_list, "uslp_fecf") + loc_in_cont = ET.SubElement( + para_ref_entry, "LocationInContainerInBits", attrib={"referenceLocation": "containerEnd"} ) - ref_time = ET.SubElement(para_type, "ReferenceTime") - epoch = ET.SubElement(ref_time, "Epoch") - epoch.text = "1970-01-01T00:00:00.000" + fixed_value_str = ET.SubElement(loc_in_cont, "FixedValue") + fixed_value_str.text = "16" + + for cmd in config.edl_cmd_defs.values(): + # add command + meta_cmd = ET.SubElement(meta_cmd_set, "MetaCommand", attrib={"name": cmd.name}) + if cmd.description: + meta_cmd.attrib["shortDescription"] = cmd.description + if cmd.request: + # this must be added before CommandContainer, if it exist + arg_list = ET.SubElement(meta_cmd, "ArgumentList") + cmd_cont = ET.SubElement( + meta_cmd, "CommandContainer", attrib={"name": f"{cmd.name}_request"} + ) + cmd_entry_list = ET.SubElement(cmd_cont, "EntryList") - para_types = ["unix_time", "b128_type", "uint32_type"] - for obj in config.beacon_def: - name = make_dt_name(obj) - if name in para_types: - continue - para_types.append(name) - - if obj.data_type == canopen.objectdictionary.BOOLEAN: - para_type = ET.SubElement( - tm_meta_para, - "BooleanParameterType", - attrib={ - "name": name, - "zeroStringValue": "0", - "oneStringValue": "1", - }, - ) - elif obj.data_type in canopen.objectdictionary.UNSIGNED_TYPES and obj.value_descriptions: - para_type = ET.SubElement( - tm_meta_para, - "EnumeratedParameterType", - attrib={ - "name": name, - }, - ) - enum_list = ET.SubElement(para_type, "EnumerationList") - for value, name in obj.value_descriptions.items(): + ET.SubElement(cmd_cont, "BaseContainer", attrib={"containerRef": "uslp_header"}) + + ET.SubElement( + cmd_entry_list, + "FixedValueEntry", + attrib={ + "binaryValue": f"{cmd.uid:02X}", + "sizeInBits": "8", + }, + ) + + # add command argument(s) + if cmd.request: + for req_field in cmd.request: + if req_field.size_prefix != "": + data_type = req_field.size_prefix + type_name = f"{data_type}_type" + name = f"{req_field.name}_size" + if type_name not in arg_types: + arg_types.append(type_name) + _add_argument_type( + arg_type_set, EdlCommandField(name, data_type), type_name + ) + ET.SubElement( + arg_list, + "Argument", + attrib={ + "name": name, + "argumentTypeRef": type_name, + }, + ) + ET.SubElement( + cmd_entry_list, + "ArgumentRefEntry", + attrib={"argumentRef": name}, + ) + + if req_field.name in ["opd_addr", "node_id"]: + type_name = req_field.name + "_type" + else: + type_name = req_field.data_type + "_type" + + if type_name not in arg_types: + arg_types.append(type_name) + _add_argument_type(arg_type_set, req_field, type_name) ET.SubElement( - enum_list, - "Enumeration", + arg_list, + "Argument", attrib={ - "value": str(value), - "label": name, + "name": req_field.name, + "argumentTypeRef": type_name, }, ) - elif obj.data_type in canopen.objectdictionary.INTEGER_TYPES: - if obj.data_type in canopen.objectdictionary.UNSIGNED_TYPES: - signed = False - encoding = "unsigned" - else: - signed = True - encoding = "twosComplement" - - para_type = ET.SubElement( - tm_meta_para, - "IntegerParameterType", - attrib={ - "name": name, - "signed": str(signed).lower(), - }, - ) - - para_unit_set = ET.SubElement(para_type, "UnitSet") - if obj.unit: - para_unit = ET.SubElement( - para_unit_set, - "Unit", - attrib={ - "description": obj.unit, - }, + ET.SubElement( + cmd_entry_list, + "ArgumentRefEntry", + attrib={"argumentRef": req_field.name}, ) - para_unit.text = obj.unit - data_enc = ET.SubElement( - para_type, - "IntegerDataEncoding", + ET.SubElement( + cmd_entry_list, + "FixedValueEntry", + attrib={"name": "hmac", "binaryValue": "0" * 64, "sizeInBits": str(32 * 8)}, + ) + ET.SubElement( + cmd_entry_list, + "FixedValueEntry", + attrib={"name": "uslp_fecf", "binaryValue": "0000", "sizeInBits": "16"}, + ) + + # add command parameter(s) + if cmd.response: + container_name = f"{cmd.name}_response" + seq_cont = ET.SubElement( + cont_set, + "SequenceContainer", + attrib={"name": container_name}, + ) + entry_list = ET.SubElement(seq_cont, "EntryList") + ET.SubElement(seq_cont, "BaseContainer", attrib={"containerRef": "uslp_header"}) + for res_field in cmd.response: + para_name = f"{cmd.name}_{res_field.name}" + para_ref = "" + if res_field.size_prefix != "": + # add buffer size parameter + para_data_type = res_field.size_prefix + para_type_name = f"{para_name}_type" + if para_type_name not in para_types: + para_types.append(para_type_name) + _add_parameter_type( + para_type_set, + para_type_name, + para_data_type, + ) + para_ref = f"{para_name}_size" + _add_parameter(para_set, para_ref, para_type_name) + _add_parameter_ref(entry_list, para_ref) + + if res_field.unit: + para_type_name = f"{res_field.data_type}_{res_field.unit}_type" + else: + para_type_name = f"{res_field.data_type}_type" + + if para_type_name not in para_types: + para_types.append(para_type_name) + _add_parameter_type( + para_type_set, + para_type_name, + res_field.data_type, + unit=res_field.unit, + value_descriptions=res_field.enums, + size_prefix=res_field.size_prefix, + param_ref=para_ref, + ) + + _add_parameter(para_set, para_name, para_type_name, res_field.description) + _add_parameter_ref(entry_list, para_name) + + cont_ref_entry = ET.SubElement( + res_entry_list, + "ContainerRefEntry", attrib={ - "byteOrder": "leastSignificantByteFirst", - "encoding": encoding, - "sizeInBits": str(DT_LEN[obj.data_type]), + "containerRef": container_name, }, ) - if obj.factor != 1: - def_cal = ET.SubElement(data_enc, "DefaultCalibrator") - poly_cal = ET.SubElement(def_cal, "PolynomialCalibrator") - ET.SubElement( - poly_cal, - "Term", - attrib={ - "exponent": "1", - "coefficient": str(obj.factor), - }, - ) - elif obj.data_type == canopen.objectdictionary.VISIBLE_STRING: - para_type = ET.SubElement( - tm_meta_para, - "StringParameterType", + inc_cond = ET.SubElement(cont_ref_entry, "IncludeCondition") + ET.SubElement( + inc_cond, + "Comparison", attrib={ - "name": name, + "parameterRef": "edl_command_code", + "value": str(cmd.uid), }, ) - str_para_type = ET.SubElement( - para_type, - "StringDataEncoding", + + +def gen_xtce(args: Optional[Namespace] = None) -> None: + """Gen_dcf main.""" + if args is None: + args = build_parser(ArgumentParser()).parse_args() + + config = OreSatConfig(args.oresat) + write_xtce(config, args.dir_path) + + +def _add_parameter_type( + root, + name: str, + data_type: str, + description: str = "", + unit: str = "", + factor: float = 1, + default: Any = None, + value_descriptions: dict[str, int] = {}, + size_prefix: str = "", + param_ref: str = "", +): + + if data_type == "bool": + para_type = ET.SubElement( + root, + "BooleanParameterType", + attrib={ + "name": name, + "zeroStringValue": "0", + "oneStringValue": "1", + }, + ) + ET.SubElement( + para_type, + "IntegerDataEncoding", + attrib={ + "byteOrder": "leastSignificantByteFirst", + "encoding": "unsigned", + "sizeInBits": "8", + }, + ) + elif data_type.startswith("uint") and value_descriptions: # enums + para_type = ET.SubElement( + root, + "EnumeratedParameterType", + attrib={ + "name": name, + }, + ) + ET.SubElement( + para_type, + "IntegerDataEncoding", + attrib={ + "byteOrder": "leastSignificantByteFirst", + "encoding": "unsigned", + "sizeInBits": data_type[4:], + }, + ) + enum_list = ET.SubElement(para_type, "EnumerationList") + for e_name, e_value in value_descriptions.items(): + ET.SubElement( + enum_list, + "Enumeration", attrib={ - "encoding": "UTF-8", + "value": str(e_value), + "label": e_name, }, ) - size_in_bits = ET.SubElement(str_para_type, "SizeInBits") - fixed = ET.SubElement(size_in_bits, "Fixed") - fixed_value = ET.SubElement(fixed, "FixedValue") - fixed_value.text = str(len(obj.default) * 8) + elif data_type.startswith("int") or data_type.startswith("uint"): + if data_type.startswith("uint"): + signed = False + encoding = "unsigned" + size = data_type[4:] + else: + signed = True + encoding = "twosComplement" + size = data_type[3:] - para_set = ET.SubElement(tm_meta, "ParameterSet") + para_type = ET.SubElement( + root, + "IntegerParameterType", + attrib={ + "name": name, + "signed": str(signed).lower(), + }, + ) - # hard-code the AX.25 headers as a Binary128 type - ET.SubElement( - para_set, - "Parameter", - attrib={ - "name": "ax25_header", - "parameterTypeRef": "b128_type", - "shortDescription": "AX.25 Header", - }, - ) - for obj in config.beacon_def: + para_unit_set = ET.SubElement(para_type, "UnitSet") + if unit: + para_unit = ET.SubElement( + para_unit_set, + "Unit", + ) + para_unit.text = unit + + data_enc = ET.SubElement( + para_type, + "IntegerDataEncoding", + attrib={ + "byteOrder": "leastSignificantByteFirst", + "encoding": encoding, + "sizeInBits": size, + }, + ) + if factor != 1: + def_cal = ET.SubElement(data_enc, "DefaultCalibrator") + poly_cal = ET.SubElement(def_cal, "PolynomialCalibrator") + ET.SubElement( + poly_cal, + "Term", + attrib={ + "exponent": "1", + "coefficient": str(factor), + }, + ) + elif data_type == "str": + para_type = ET.SubElement( + root, + "StringParameterType", + attrib={ + "name": name, + }, + ) + str_data_enc = ET.SubElement( + para_type, + "StringDataEncoding", + attrib={ + "encoding": "UTF-8", + }, + ) + size_in_bits = ET.SubElement(str_data_enc, "SizeInBits") + fixed = ET.SubElement(size_in_bits, "Fixed") + fixed_value = ET.SubElement(fixed, "FixedValue") + fixed_value.text = str(len(default) * 8) + elif data_type == "bytes": + param_type = ET.SubElement( + root, + "BinaryParameterType", + attrib={ + "name": name, + }, + ) + if description: + param_type.attrib["shortDescription"] = description + ET.SubElement(param_type, "UnitSet") + bin_data_enc = ET.SubElement( + param_type, "BinaryDataEncoding", attrib={"bitOrder": "leastSignificantBitFirst"} + ) + size_in_bits = ET.SubElement( + bin_data_enc, + "SizeInBits", + ) + if size_prefix != "": + dyn_val = ET.SubElement( + size_in_bits, + "DynamicValue", + ) + ET.SubElement( + dyn_val, + "ParameterInstanceRef", + attrib={"parameterRef": param_ref}, + ) + else: + bin_data_enc_size_fixed = ET.SubElement( + size_in_bits, + "FixedValue", + ) + bin_data_enc_size_fixed.text = str(len(default)) + elif data_type == "unix_time": + para_type = ET.SubElement( + root, + "AbsoluteTimeParameterType", + attrib={ + "name": "unix_time_type", + "shortDescription": "Unix coarse timestamp", + }, + ) + enc = ET.SubElement(para_type, "Encoding") ET.SubElement( - para_set, - "Parameter", + enc, + "IntegerDataEncoding", attrib={ - "name": make_obj_name(obj), - "parameterTypeRef": make_dt_name(obj), - "shortDescription": obj.description, + "byteOrder": "leastSignificantByteFirst", + "sizeInBits": "32", }, ) - ET.SubElement( + ref_time = ET.SubElement(para_type, "ReferenceTime") + epoch = ET.SubElement(ref_time, "Epoch") + epoch.text = "1970-01-01T00:00:00.000" + else: + raise ValueError(f"data type {data_type} not implemented") + + +def _add_parameter(para_set: ET.Element, name: str, type_ref: str, description: str = ""): + para = ET.SubElement( para_set, "Parameter", attrib={ - "name": "crc32", - "parameterTypeRef": "uint32_type", - "shortDescription": "crc check for beacon", + "name": name, + "parameterTypeRef": type_ref, }, ) + if description: + para.attrib["shortDescription"] = description - cont_set = ET.SubElement(tm_meta, "ContainerSet") - seq_cont = ET.SubElement( - cont_set, - "SequenceContainer", - attrib={ - "name": "Beacon", - }, - ) - entry_list = ET.SubElement(seq_cont, "EntryList") - ET.SubElement( - entry_list, - "ParameterRefEntry", - attrib={"parameterRef": "ax25_header"}, - ) - for obj in config.beacon_def: - ET.SubElement( - entry_list, - "ParameterRefEntry", - attrib={ - "parameterRef": make_obj_name(obj), - }, - ) - ET.SubElement( + +def _add_parameter_ref(entry_list: ET.Element, name: str) -> ET.Element: + return ET.SubElement( entry_list, "ParameterRefEntry", attrib={ - "parameterRef": "crc32", + "parameterRef": name, }, ) - tree = ET.ElementTree(root) - ET.indent(tree, space=" ", level=0) - file_name = f"{config.mission.filename()}.xtce" - tree.write(f"{dir_path}/{file_name}", encoding="utf-8", xml_declaration=True) +def _add_argument_type(arg_type_set: ET.Element, req_field: EdlCommandField, type_name: str): + attrib = { + "name": type_name, + } -def gen_xtce(args: Optional[Namespace] = None) -> None: - """Gen_dcf main.""" - if args is None: - args = build_parser(ArgumentParser()).parse_args() + if req_field.data_type.startswith("int") or req_field.data_type.startswith("uint"): + if req_field.enums: + name = "EnumeratedArgumentType" + else: + name = "IntegerArgumentType" + elif req_field.data_type == "bool": + name = "BooleanArgumentType" + if req_field.enums: + attrib["zeroStringValue"] = list(req_field.enums.keys())[ + list(req_field.enums.values()).index(0) + ] + attrib["oneStringValue"] = list(req_field.enums.keys())[ + list(req_field.enums.values()).index(1) + ] + elif req_field.data_type in ["float32", "float64"]: + name = "FloatArgumentType" + elif req_field.data_type == "str": + name = "StringArgumentType" + elif req_field.data_type == "bytes": + name = "BinaryArgumentType" + else: + raise ValueError(f"invalid data type {req_field.data_type}") + + data_type = ET.SubElement(arg_type_set, name, attrib=attrib) + arg_unit_set = ET.SubElement(data_type, "UnitSet") + if req_field.unit: + unit = ET.SubElement( + arg_unit_set, + "Unit", + ) + unit.text = req_field.unit - config = OreSatConfig(args.oresat) - write_xtce(config, args.dir_path) + if req_field.data_type.startswith("int") or req_field.data_type.startswith("uint"): + size = req_field.data_type.split("int")[-1] + encoding = "twosComplement" if req_field.data_type.startswith("int") else "unsigned" + ET.SubElement( + data_type, + "IntegerDataEncoding", + attrib={"sizeInBits": size, "encoding": encoding}, + ) + + if req_field.enums: + enum_list = ET.SubElement(data_type, "EnumerationList") + for k, v in req_field.enums.items(): + ET.SubElement( + enum_list, + "Enumeration", + attrib={"value": str(v), "label": k}, + ) + elif req_field.data_type == "bool": + ET.SubElement( + data_type, + "IntegerDataEncoding", + attrib={"sizeInBits": "8", "encoding": "unsigned"}, + ) + elif req_field.data_type in ["float32", "float64"]: + ET.SubElement(data_type, "FloatDataEncoding") + elif req_field.data_type == "str": + str_data = ET.SubElement( + data_type, + "StringDataEncoding", + attrib={"encoding": "US-ASCII", "bitOrder": "mostSignificantBitFirst"}, + ) + if req_field.max_size > 0: + var = ET.SubElement( + str_data, + "Variable", + attrib={"maxSizeInBits": f"{req_field.max_size * 8}"}, + ) + dyn_val = ET.SubElement(var, "DynamicValue") + ET.SubElement( + dyn_val, + "ArgumentInstanceRef", + attrib={"argumentRef": f"{req_field.name}_size"}, + ) + elif req_field.fixed_size > 0: + size_bits = ET.SubElement(str_data, "SizeInBits") + fixed_value = ET.SubElement(size_bits, "FixedValue") + fixed_value.text = f"{req_field.fixed_size * 8}" + elif req_field.data_type == "bytes": + bytes_data = ET.SubElement( + data_type, + "BinaryDataEncoding", + attrib={"bitOrder": "mostSignificantBitFirst"}, + ) + size_bits = ET.SubElement(bytes_data, "SizeInBits") + if req_field.size_prefix != "": + dyn_val = ET.SubElement(size_bits, "DynamicValue") + ET.SubElement( + dyn_val, + "ArgumentInstanceRef", + attrib={"argumentRef": f"{req_field.name}_size"}, + ) + elif req_field.fixed_size > 0: + fixed_value = ET.SubElement(size_bits, "FixedValue") + fixed_value.text = f"{req_field.fixed_size * 8}" diff --git a/pyproject.toml b/pyproject.toml index cae8c8b..0c75e90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,9 @@ linters = "pycodestyle,pyflakes,pylint,mccabe,mypy,radon" # E0401: Cannot find implementation or library stub for module named # R0902: Too many instance attributes # W0511: TODOs or FIXMEs -ignore = "E402,C901,C0103,E203,R0912,R0915,R901,R901,R0914,C0413,C0206,R1716,W1514,R1702,E0401,R0902,W0511" +# W0102: Dangerous default value for function argument +# R0913: Too many arguments for a function +ignore = "E402,C901,C0103,E203,R0912,R0915,R901,R901,R0914,C0413,C0206,R1716,W1514,R1702,E0401,R0902,W0511,W0102,R0913" max_line_length = 100 [[tool.pylama.files]] diff --git a/tests/test_edl.py b/tests/test_edl.py new file mode 100644 index 0000000..fc6e79a --- /dev/null +++ b/tests/test_edl.py @@ -0,0 +1,205 @@ +"""Unit tests for validating the edl yaml config file.""" + +import re +import unittest +from random import choice, randbytes, randint, uniform +from string import ascii_letters +from typing import Any + +from oresat_configs import OreSatConfig, OreSatId + +DATA_TYPES = [ + "bool", + "int8", + "int16", + "int32", + "int64", + "uint8", + "uint16", + "uint32", + "uint64", + "float32", + "float64", + "str", + "bytes", +] + + +def _gen_random_value(data_type: str, length: int = 0) -> Any: + """Generate a random value for non-custom data types.""" + + value: Any = None + if data_type == "bool": + value = bool(randint(0, 1)) + elif data_type.startswith("int"): + bits = int(data_type[3:]) + value = randint(-(2 ** (bits - 1)), 2 ** (bits - 1) - 1) + elif data_type.startswith("uint"): + bits = int(data_type[4:]) + value = randint(0, 2**bits - 1) + elif data_type.startswith("float"): + value = uniform(-1_000.0, 1_000.0) + elif data_type == "str": + value = "".join(choice(ascii_letters) for i in range(length)) + elif data_type == "bytes": + value = randbytes(length) + else: + raise ValueError(f"invalid data type {data_type}") + + return value + + +class ConfigTypes(unittest.TestCase): + """Tests for the edl yaml config file.""" + + def _test_snake_case(self, string: str): + """Test that a string is snake_case.""" + + regex_str = r"^[a-z][a-z0-9_]*[a-z0-9]*$" # snake_case with no leading/trailing num or "_" + self.assertIsNotNone(re.match(regex_str, string), f'"{string}" is not snake_case') + + def test_edl_cmd_defs(self): + """Validate edl commands configs.""" + + edl_cmd_defs = OreSatConfig(OreSatId.ORESAT0_5).edl_cmd_defs + + cmd_uids = [cmd.uid for cmd in edl_cmd_defs.values()] + self.assertEqual(len(cmd_uids), len(set(cmd_uids)), "command uids are not unique") + cmd_names = [cmd.name for cmd in edl_cmd_defs.values()] + self.assertEqual(len(cmd_names), len(set(cmd_names)), "command names are not unique") + + for cmd in edl_cmd_defs.values(): + req_names = [req.name for req in cmd.request] + self.assertEqual( + len(req_names), + len(set(req_names)), + f"command {cmd.name} request fields names are not unique", + ) + res_names = [res.name for res in cmd.response] + self.assertEqual( + len(res_names), + len(set(res_names)), + f"command {cmd.name} response fields names are not unique", + ) + + test_values = tuple() + for req in cmd.request: + self.assertIn(req.data_type, DATA_TYPES) + self._test_snake_case(req.name) + if req.name == "bytes": + self.assertTrue( + (req.fixed_size == 0) != (req.size_prefix == ""), + ( + f"command {cmd.name} request field {req.name} has both fixed_size " + "and size_prefix set" + ), + ) + self.assertIn( + req.size_prefix, + ["", "uint8", "uint16", "uint32"], + ( + f"command {cmd.name} request field {req.name} size_prefix " + 'uintX data type or ""' + ), + ) + elif req.name == "str": + self.assertFalse( + (req.fixed_size == 0) and (req.max_size == 0), + ( + f"command {cmd.name} request field {req.name} has nether fixed_size and" + " max_size set", + ), + ) + self.assertTrue( + (req.fixed_size == 0) != (req.max_size == 0), + ( + f"command {cmd.name} request field {req.name} has both fixed_size and " + "max_size set", + ), + ) + size = 0 + if req.data_type == "bytes": + if req.size_prefix != "": + size = randint(1, 100) # set the random size to be reasonable + size_prefix = (size * 8).to_bytes(int(req.size_prefix[4:]) // 8, "little") + data = _gen_random_value(req.data_type, size) + test_values += (size_prefix + data,) + else: + size = req.fixed_size + test_values += (_gen_random_value(req.data_type, size),) + elif req.data_type == "str": + size = req.fixed_size + if req.max_size != 0: + size = req.max_size + test_values += (_gen_random_value(req.data_type, size),) + else: + test_values += (_gen_random_value(req.data_type, size),) + if len(test_values) > 0: + raw = cmd.encode_request(test_values) + test_values2 = cmd.decode_request(raw) + self.assertTupleEqual( + test_values, + test_values2, + f"command {cmd.name} request encode -> decode does not match", + ) + + test_values = tuple() + for res in cmd.response: + self.assertIn(res.data_type, DATA_TYPES) + self._test_snake_case(res.name) + if res.name == "bytes": + self.assertTrue( + (res.fixed_size == 0) != (res.size_prefix == ""), + ( + f"command {cmd.name} request field {res.name} has both fixed_size " + "and size_prefix set" + ), + ) + self.assertIn( + res.size_prefix, + ["", "uint8", "uint16", "uint32"], + ( + f"command {cmd.name} request field {res.name} size_prefix " + 'uintX data type or ""' + ), + ) + elif res.name == "str": + self.assertFalse( + (res.fixed_size == 0) and (res.max_size == 0), + ( + f"command {cmd.name} response field {res.name} has nether fixed_size " + "and max_size set", + ), + ) + self.assertTrue( + (res.fixed_size == 0) != (res.max_size == 0), + ( + f"command {cmd.name} response field {res.name} has both fixed_size and " + "max_size set", + ), + ) + size = 0 + if res.data_type == "bytes": + if res.size_prefix != "": + size = randint(1, 100) # set the random size to be reasonable + size_prefix = (size * 8).to_bytes(int(res.size_prefix[4:]) // 8, "little") + data = _gen_random_value(res.data_type, size) + test_values += (size_prefix + data,) + else: + size = res.fixed_size + test_values += (_gen_random_value(res.data_type, size),) + elif res.data_type == "str": + size = res.fixed_size + if res.max_size != 0: + size = res.max_size + test_values += (_gen_random_value(res.data_type, size),) + else: + test_values += (_gen_random_value(res.data_type, size),) + if len(test_values) > 0: + raw = cmd.encode_response(test_values) + test_values2 = cmd.decode_response(raw) + self.assertTupleEqual( + test_values, + test_values2, + f"command {cmd.name} response encode -> decode does not match", + )