From 6ddc15cd44838cbd659484517f9d0de2a227c396 Mon Sep 17 00:00:00 2001 From: paul bethge Date: Sat, 27 Apr 2024 04:13:28 +0800 Subject: [PATCH] add control point commands, read properties and rizer example (#38) * fix control point opcodes * add control point commands * add target setting features * add parse_target_setting_features * add rizer service and example * make sure parameters are integer or list * update example for target setting features and simulation parameters * fix linting * add pylint exception --- examples/fitness_machine_service_example.py | 106 ++++++---- examples/rizer_example.py | 34 +++ pycycling/fitness_machine_service.py | 195 +++++++++++++++++- pycycling/ftms_parsers/control_point.py | 81 +++++++- .../ftms_parsers/fitness_machine_feature.py | 69 +++++++ pycycling/rizer.py | 41 ++++ pycycling/sterzo.py | 2 +- 7 files changed, 473 insertions(+), 55 deletions(-) create mode 100644 examples/rizer_example.py create mode 100644 pycycling/rizer.py diff --git a/examples/fitness_machine_service_example.py b/examples/fitness_machine_service_example.py index 6d39bf3..fdaa09e 100644 --- a/examples/fitness_machine_service_example.py +++ b/examples/fitness_machine_service_example.py @@ -6,38 +6,49 @@ async def run(address): async with BleakClient(address, timeout=10) as client: ftms = FitnessMachineService(client) - # Print all 'read' characteristics + # Print 'read' characteristics - supported_resistance_level_range = ( - await ftms.get_supported_resistance_level_range() - ) + ### Fitness Machine Features + fitness_machine_features, target_setting_features = await ftms.get_fitness_machine_feature() + fitness_machine_features = fitness_machine_features._asdict() + target_setting_features = target_setting_features._asdict() - print("Supported resistance level range:") - print(supported_resistance_level_range) - print() - - max_resistance = supported_resistance_level_range.maximum_resistance + def print_features(features): + for key, value in features.items(): + print(f"{key}: {value}") + print() + + print("Fitness machine feature:") + print_features(fitness_machine_features) - supported_power_range = await ftms.get_supported_power_range() + print("Target setting features:") + print_features(target_setting_features) - print("Supported power range:") - print(supported_power_range) - print() + print(target_setting_features) - max_power = supported_power_range.maximum_power - fitness_machine_feature = await ftms.get_fitness_machine_feature() + if target_setting_features["resistance_target_setting_supported"]: + supported_resistance_level_range = ( + await ftms.get_supported_resistance_level_range() + ) + print("Supported resistance level range:") + print(supported_resistance_level_range) + print() + max_resistance = supported_resistance_level_range.maximum_resistance - print("Fitness machine feature:") - print(fitness_machine_feature) - print() + if target_setting_features["power_target_setting_supported"]: + supported_power_range = await ftms.get_supported_power_range() + print("Supported power range:") + print(supported_power_range) + print() + max_power = supported_power_range.maximum_power + # Start receiving and printing 'notify' characteristics def print_indoor_bike_data(data): print("Received indoor bike data:") print(data) print() - # Start receiving and printing 'notify' characteristics ftms.set_indoor_bike_data_handler(print_indoor_bike_data) await ftms.enable_indoor_bike_data_notify() @@ -73,36 +84,49 @@ def print_control_point_response(message): # 3. (recommended) 'write' a reset command await ftms.reset() - print( - "Setting target resistance level to 25 percent of maximum resistance level..." - ) - await ftms.set_target_resistance_level(max_resistance * 0.25) + # Set target resistance level + if target_setting_features["resistance_target_setting_supported"]: + print("Setting target resistance level to 25 percent of maximum resistance level...") + await ftms.set_target_resistance_level(max_resistance * 0.25) - await asyncio.sleep(5) + await asyncio.sleep(5) - print( - "Increasing target resistance level to 50 percent of maximum resistance level..." - ) - await ftms.set_target_resistance_level(max_resistance * 0.5) + print("Increasing target resistance level to 50 percent of maximum resistance level...") + await ftms.set_target_resistance_level(max_resistance * 0.5) - await asyncio.sleep(5) + await asyncio.sleep(5) - # Reset target resistance level - print("Resetting target resistance level...") + # Reset target resistance level + print("Resetting target resistance level...") - await ftms.reset() + await ftms.reset() - power_level = 4 / 100 * max_power - print(f"Increasing target power to 4 percent of maximum power ({power_level}W).") - print("The trainer will automatically adjust resistance based on your leg speed.") - print(f"Try pedaling above {power_level}W to feel decreasing resistance, and vice versa.") - await ftms.set_target_power(power_level) + # Set target power + if target_setting_features["power_target_setting_supported"]: + power_level = 4 / 100 * max_power + print(f"Increasing target power to 4 percent of maximum power ({power_level}W).") + print("The trainer will automatically adjust resistance based on your leg speed.") + print(f"Try pedaling above {power_level}W to feel decreasing resistance, and vice versa.") + await ftms.set_target_power(power_level) - await asyncio.sleep(30) + await asyncio.sleep(30) - # Reset - print("Resetting target power...") - await ftms.reset() + # Reset + print("Resetting target power...") + await ftms.reset() + + # Set simulation parameters + if target_setting_features["indoor_bike_simulation_parameters_supported"]: + print("Setting indoor bike simulation parameters to 0") + await ftms.set_simulation_parameters(0, 0, 0, 0) + await asyncio.sleep(5) + print("Setting indoor bike simulation grade to 10%") + print("if connected to a compatible machine (like elite rizer), this should set its grade to +10%") + await ftms.set_simulation_parameters(0, 1000, 0, 0) + await asyncio.sleep(5) + + print("Resetting indoor bike simulation parameters...") + await ftms.reset() if __name__ == "__main__": diff --git a/examples/rizer_example.py b/examples/rizer_example.py new file mode 100644 index 0000000..60d5e26 --- /dev/null +++ b/examples/rizer_example.py @@ -0,0 +1,34 @@ +import asyncio +from bleak import BleakClient + +from pycycling.rizer import Rizer + +""" make sure the rizer is connected to a fitness machine otherwise the steering angle will not be transmitted """ + + +async def run(address): + async with BleakClient(address) as client: + def steering_handler(steering_angle): + print(steering_angle) + + await client.is_connected() + rizer = Rizer(client) + rizer.set_steering_measurement_callback(steering_handler) + await rizer.enable_steering_measurement_notifications() + await rizer.set_transmission_rate(0) # 8 Hz + await asyncio.sleep(4) + await rizer.set_transmission_rate(1) # 16 Hz + await asyncio.sleep(4) + await rizer.set_transmission_rate(2) # 32 Hz + await asyncio.sleep(4) + # recalibrate the rizer + #await rizer.set_center() + +if __name__ == "__main__": + import os + + os.environ["PYTHONASYNCIODEBUG"] = str(1) + + device_address = "YOUR RIZER ADDRESS HERE" + loop = asyncio.get_event_loop() + loop.run_until_complete(run(device_address)) diff --git a/pycycling/fitness_machine_service.py b/pycycling/fitness_machine_service.py index 3c91b3f..dadc414 100644 --- a/pycycling/fitness_machine_service.py +++ b/pycycling/fitness_machine_service.py @@ -22,12 +22,13 @@ .. literalinclude:: ../examples/fitness_machine_service_example.py """ + from collections import namedtuple from pycycling.ftms_parsers import ( parse_fitness_machine_status, parse_indoor_bike_data, - parse_fitness_machine_feature, + parse_all_features, parse_training_status, parse_control_point_response, form_ftms_control_command, @@ -60,7 +61,9 @@ ) -def _parse_supported_resistance_level_range(message: bytearray) -> SupportedResistanceLevelRange: +def _parse_supported_resistance_level_range( + message: bytearray, +) -> SupportedResistanceLevelRange: minimum_resistance = int.from_bytes(message[0:2], "little") maximum_resistance = int.from_bytes(message[2:4], "little") minimum_increment = int.from_bytes(message[4:6], "little") @@ -91,7 +94,9 @@ def __init__(self, client): self._training_status_callback = None # === READ Characteristics === - async def get_supported_resistance_level_range(self) -> SupportedResistanceLevelRange: + async def get_supported_resistance_level_range( + self, + ) -> SupportedResistanceLevelRange: message = await self._client.read_gatt_char( ftms_supported_resistance_level_range_characteristic_id ) @@ -107,7 +112,7 @@ async def get_fitness_machine_feature(self) -> FitnessMachineFeature: message = await self._client.read_gatt_char( ftms_fitness_machine_feature_characteristic_id ) - return parse_fitness_machine_feature(message) + return parse_all_features(message) # === NOTIFY Characteristics === # ====== Indoor Bike Data ====== @@ -189,6 +194,7 @@ def _control_point_response_handler( if self._control_point_response_callback is not None: self._control_point_response_callback(parse_control_point_response(data)) + # ====== Control Point Commands ====== async def request_control(self) -> None: message = form_ftms_control_command(FTMSControlPointOpCode.REQUEST_CONTROL) await self._client.write_gatt_char( @@ -201,17 +207,194 @@ async def reset(self) -> None: ftms_fitness_machine_control_point_characteristic_id, message, True ) + async def set_target_speed(self, speed: int) -> None: + if speed < 0: + raise ValueError("Speed must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGET_SPEED, speed + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_target_incline(self, inclination: int) -> None: + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGET_INCLINE, inclination + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + async def set_target_resistance_level(self, level: int) -> None: + if level < 0: + raise ValueError("Resistance level must be non-negative") message = form_ftms_control_command( - FTMSControlPointOpCode.SET_TARGET_RESISTANCE_LEVEL, int(level) + FTMSControlPointOpCode.SET_TARGET_RESISTANCE_LEVEL, level ) await self._client.write_gatt_char( ftms_fitness_machine_control_point_characteristic_id, message, True ) async def set_target_power(self, power: int) -> None: + if power < 0: + raise ValueError("Power must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGET_POWER, power + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_target_heart_rate(self, heart_rate: int) -> None: + if heart_rate < 0: + raise ValueError("Heart rate must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGET_HEART_RATE, heart_rate + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def start_or_resume(self) -> None: + message = form_ftms_control_command(FTMSControlPointOpCode.START_OR_RESUME) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def stop_or_pause(self, pause: bool) -> None: + message = form_ftms_control_command( + FTMSControlPointOpCode.STOP_OR_PAUSE, 0x02 if pause else 0x01 + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_expended_energy(self, energy: int) -> None: + if energy < 0: + raise ValueError("Energy must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_EXPENDED_ENERGY, energy + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_number_of_steps(self, steps: int) -> None: + if steps < 0: + raise ValueError("Steps must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STEPS, steps + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_number_of_strides(self, strides: int) -> None: + if strides < 0: + raise ValueError("Strides must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STRIDES, strides + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_distance(self, distance: int) -> None: + if distance < 0: + raise ValueError("Distance must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_DISTANCE, distance + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_training_time(self, time: int) -> None: + if time < 0: + raise ValueError("Time must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_TRAINING_TIME, time + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_time_in_two_heart_rate_zones(self, times: list) -> None: + if len(times) != 2: + raise ValueError("Times must be a list of 2 elements") + if times[0] < 0 or times[1] < 0: + raise ValueError("Times must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_TIME_IN_TWO_HEART_RATE_ZONES, times + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_time_in_three_heart_rate_zones(self, times: list) -> None: + if len(times) != 3: + raise ValueError("Times must be a list of 3 elements") + if times[0] < 0 or times[1] < 0 or times[2] < 0: + raise ValueError("Times must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_TIME_IN_THREE_HEART_RATE_ZONES, times + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_time_in_five_heart_rate_zones(self, times: list) -> None: + if len(times) != 5: + raise ValueError("Times must be a list of 5 elements") + if times[0] < 0 or times[1] < 0 or times[2] < 0 or times[3] < 0 or times[4] < 0: + raise ValueError("Times must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_TARGETED_TIME_IN_FIVE_HEART_RATE_ZONES, times + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_simulation_parameters( + self, wind_speed: int, grade: int, crr: int, cw: int + ) -> None: + if crr < 0: + raise ValueError("Crr must be non-negative") + if cw < 0: + raise ValueError("Cw must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_INDOOR_BIKE_SIMULATION_PARAMETERS, + [wind_speed, grade, crr, cw], + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_wheel_circumference(self, circumference: int) -> None: + if circumference < 0: + raise ValueError("Circumference must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_WHEEL_CIRCUMFERENCE, circumference + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_spin_down_control(self, control: int) -> None: + if control < 0: + raise ValueError("Control must be non-negative") + message = form_ftms_control_command( + FTMSControlPointOpCode.SET_SPIN_DOWN_CONTROL, control + ) + await self._client.write_gatt_char( + ftms_fitness_machine_control_point_characteristic_id, message, True + ) + + async def set_targeted_cadence(self, cadence: int) -> None: + if cadence < 0: + raise ValueError("Cadence must be non-negative") message = form_ftms_control_command( - FTMSControlPointOpCode.SET_TARGET_POWER, int(power) + FTMSControlPointOpCode.SET_TARGETED_CADENCE, cadence ) await self._client.write_gatt_char( ftms_fitness_machine_control_point_characteristic_id, message, True diff --git a/pycycling/ftms_parsers/control_point.py b/pycycling/ftms_parsers/control_point.py index ac9d596..d3f9052 100644 --- a/pycycling/ftms_parsers/control_point.py +++ b/pycycling/ftms_parsers/control_point.py @@ -9,7 +9,6 @@ class FTMSControlPointResponseResultCode(Enum): OPERATION_FAILED = 0x04 CONTROL_NOT_PERMITTED = 0x05 - class FTMSControlPointOpCode(Enum): REQUEST_CONTROL = 0x00 RESET = 0x01 @@ -17,12 +16,31 @@ class FTMSControlPointOpCode(Enum): SET_TARGET_INCLINE = 0x03 SET_TARGET_RESISTANCE_LEVEL = 0x04 SET_TARGET_POWER = 0x05 - START_OR_RESUME = 0x06 - STOP_OR_PAUSE = 0x07 + SET_TARGET_HEART_RATE = 0x06 + START_OR_RESUME = 0x07 + STOP_OR_PAUSE = 0x08 + SET_TARGETED_EXPENDED_ENERGY = 0x09 + SET_TARGETED_NUMBER_OF_STEPS = 0x0A + SET_TARGETED_NUMBER_OF_STRIDES = 0x0B + SET_TARGETED_DISTANCE = 0x0C + SET_TARGETED_TRAINING_TIME = 0x0D + SET_TARGETED_TIME_IN_TWO_HEART_RATE_ZONES = 0x0E + SET_TARGETED_TIME_IN_THREE_HEART_RATE_ZONES = 0x0F + SET_TARGETED_TIME_IN_FIVE_HEART_RATE_ZONES = 0x10 + SET_INDOOR_BIKE_SIMULATION_PARAMETERS = 0x11 + SET_WHEEL_CIRCUMFERENCE = 0x12 + SET_SPIN_DOWN_CONTROL = 0x13 + SET_TARGETED_CADENCE = 0x14 RESPONSE_CODE = 0x80 - def form_ftms_control_command(opcode: FTMSControlPointOpCode, parameter: int = 0): + """ + Form a FTMS control command message + :param opcode: FTMSControlPointOpCode + :param parameter: scalar or list of scalar + :return: bytearray + """ + parameter = parameter if isinstance(parameter, list) else (int)(parameter) if opcode == FTMSControlPointOpCode.REQUEST_CONTROL: return b"\x00" elif opcode == FTMSControlPointOpCode.RESET: @@ -39,13 +57,62 @@ def form_ftms_control_command(opcode: FTMSControlPointOpCode, parameter: int = 0 elif opcode == FTMSControlPointOpCode.SET_TARGET_POWER: # parameter: sint16, 1W return b"\x05" + parameter.to_bytes(2, "little", signed=True) + elif opcode == FTMSControlPointOpCode.SET_TARGET_HEART_RATE: + # parameter: uint8, 1bpm + return b"\x06" + parameter.to_bytes(1, "little", signed=False) elif opcode == FTMSControlPointOpCode.START_OR_RESUME: - # parameter: 01=stop, 02=pause - return b"\x06" + return b"\x07" elif opcode == FTMSControlPointOpCode.STOP_OR_PAUSE: - return b"\x07" + parameter.to_bytes(1, "little", signed=False) + # parameter: 01=stop, 02=pause + return b"\x08" + parameter.to_bytes(1, "little", signed=False) elif opcode == FTMSControlPointOpCode.RESPONSE_CODE: return b"\x80" + elif opcode == FTMSControlPointOpCode.SET_TARGETED_EXPENDED_ENERGY: + # parameter: uint16, 1calories + return b"\x09" + parameter.to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STEPS: + # parameter: uint16, 1 + return b"\x0A" + parameter.to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STRIDES: + # parameter: uint16, 1 + return b"\x0B" + parameter.to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_DISTANCE: + # parameter: uint24, 1m + return b"\x0C" + parameter.to_bytes(3, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_TRAINING_TIME: + # parameter: uint16, 1s + return b"\x0D" + parameter.to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_TIME_IN_TWO_HEART_RATE_ZONES: + # parameter: list of 2 uint16, 1s + return b"\x0E" + parameter[0].to_bytes(2, "little", signed=False) \ + + parameter[1].to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_TIME_IN_THREE_HEART_RATE_ZONES: + # parameter: list of 3 uint16, 1s + return b"\x0F" + parameter[0].to_bytes(2, "little", signed=False) \ + + parameter[1].to_bytes(2, "little", signed=False) \ + + parameter[2].to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_TIME_IN_FIVE_HEART_RATE_ZONES: + # parameter: list of 5 uint16, 1s + return b"\x10" + parameter[0].to_bytes(2, "little", signed=False) \ + + parameter[1].to_bytes(2, "little", signed=False) \ + + parameter[2].to_bytes(2, "little", signed=False) \ + + parameter[3].to_bytes(2, "little", signed=False) \ + + parameter[4].to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_INDOOR_BIKE_SIMULATION_PARAMETERS: + # parameter: list of int16 0.001mps, int16 0.01%, uint8 0.0001, uint8 0.01kg/m + return b"\x11" + parameter[0].to_bytes(2, "little", signed=True) \ + + parameter[1].to_bytes(2, "little", signed=True) \ + + parameter[2].to_bytes(1, "little", signed=False) \ + + parameter[3].to_bytes(1, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_WHEEL_CIRCUMFERENCE: + # parameter: uint16, 0.1mm + return b"\x12" + parameter.to_bytes(2, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_SPIN_DOWN_CONTROL: + # parameter: 01=start, 02=ignore + return b"\x13" + parameter.to_bytes(1, "little", signed=False) + elif opcode == FTMSControlPointOpCode.SET_TARGETED_CADENCE: + # parameter: uint16, 1rpm + return b"\x14" + parameter.to_bytes(1, "little", signed=False) else: raise ValueError("Invalid opcode") diff --git a/pycycling/ftms_parsers/fitness_machine_feature.py b/pycycling/ftms_parsers/fitness_machine_feature.py index 77b1d25..5d48740 100644 --- a/pycycling/ftms_parsers/fitness_machine_feature.py +++ b/pycycling/ftms_parsers/fitness_machine_feature.py @@ -24,6 +24,31 @@ ) +TargetSettingFeatures = namedtuple( + "TargetSettingFeatures", + [ + "speed_target_setting_supported", + "inclination_target_setting_supported", + "resistance_target_setting_supported", + "power_target_setting_supported", + "heart_rate_target_setting_supported", + "targeted_expended_energy_configuration_supported", + "targeted_step_number_configuration_supported", + "targeted_stride_number_configuration_supported", + "targeted_distance_configuration_supported", + "targeted_training_time_configuration_supported", + "targeted_time_in_two_heart_rate_zones_configuration_supported", + "targeted_time_in_three_heart_rate_zones_configuration_supported", + "targeted_time_in_five_heart_rate_zones_configuration_supported", + "indoor_bike_simulation_parameters_supported", + "wheel_circumference_configuration_supported", + "spin_down_control_supported", + "targeted_cadence_configuration_supported", + ], +) + + + def parse_fitness_machine_feature(message: bytearray) -> FitnessMachineFeature: """Bit flags are set across two message""" avg_speed_supported = bool(message[0] & 0b00000001) @@ -62,3 +87,47 @@ def parse_fitness_machine_feature(message: bytearray) -> FitnessMachineFeature: force_on_belt_and_power_output_supported, user_data_retention_supported, ) + + +def parse_target_setting_features(message: bytearray) -> TargetSettingFeatures: + speed_target_setting_supported = bool(message[0] & 0b00000001) + inclination_target_setting_supported = bool(message[0] & 0b00000010) + resistance_target_setting_supported = bool(message[0] & 0b00000100) + power_target_setting_supported = bool(message[0] & 0b00001000) + heart_rate_target_setting_supported = bool(message[0] & 0b00010000) + targeted_expended_energy_configuration_supported = bool(message[0] & 0b00100000) + targeted_step_number_configuration_supported = bool(message[0] & 0b01000000) + targeted_stride_number_configuration_supported = bool(message[0] & 0b10000000) + + targeted_distance_configuration_supported = bool(message[1] & 0b00000001) + targeted_training_time_configuration_supported = bool(message[1] & 0b00000010) + targeted_time_in_two_heart_rate_zones_configuration_supported = bool(message[1] & 0b00000100) + targeted_time_in_three_heart_rate_zones_configuration_supported = bool(message[1] & 0b00001000) + targeted_time_in_five_heart_rate_zones_configuration_supported = bool(message[1] & 0b00010000) + indoor_bike_simulation_parameters_supported = bool(message[1] & 0b00100000) + wheel_circumference_configuration_supported = bool(message[1] & 0b01000000) + spin_down_control_supported = bool(message[1] & 0b10000000) + + targeted_cadence_configuration_supported = bool(message[2] & 0b00000001) + return TargetSettingFeatures( + speed_target_setting_supported, + inclination_target_setting_supported, + resistance_target_setting_supported, + power_target_setting_supported, + heart_rate_target_setting_supported, + targeted_expended_energy_configuration_supported, + targeted_step_number_configuration_supported, + targeted_stride_number_configuration_supported, + targeted_distance_configuration_supported, + targeted_training_time_configuration_supported, + targeted_time_in_two_heart_rate_zones_configuration_supported, + targeted_time_in_three_heart_rate_zones_configuration_supported, + targeted_time_in_five_heart_rate_zones_configuration_supported, + indoor_bike_simulation_parameters_supported, + wheel_circumference_configuration_supported, + spin_down_control_supported, + targeted_cadence_configuration_supported, + ) + +def parse_all_features(message: bytearray): + return parse_fitness_machine_feature(message[0:4]), parse_target_setting_features(message[4:8]) diff --git a/pycycling/rizer.py b/pycycling/rizer.py new file mode 100644 index 0000000..f555215 --- /dev/null +++ b/pycycling/rizer.py @@ -0,0 +1,41 @@ +import struct + +rizer_measurement_id = "347b0030-7635-408b-8918-8ff3949ce592" +rizer_control_point_id = "347b0031-7635-408b-8918-8ff3949ce592" + + +class Rizer: + def __init__(self, client): + self._client = client + self._steering_measurement_callback = None + self._latest_challenge = None + + async def enable_steering_measurement_notifications(self): + await self._client.start_notify( + rizer_measurement_id, self._steering_measurement_notification_handler + ) + + async def disable_steering_measurement_notifications(self): + await self._client.stop_notify(rizer_measurement_id) + + def set_steering_measurement_callback(self, callback): + self._steering_measurement_callback = callback + + def _steering_measurement_notification_handler( + self, sender, data + ): # pylint: disable=unused-argument + [steering_angle] = struct.unpack(" 2: + raise ValueError( + "Invalid rate: choose 0, 1, or 2 for 8, 16, or 32 Hz respectively" + ) + byte_array = b"\x02" + rate.to_bytes(1, "little", signed=False) + await self._client.write_gatt_char(rizer_control_point_id, byte_array) + + async def set_center(self): + """sets the zero position of the rizer""" + await self._client.write_gatt_char(rizer_control_point_id, b"\x01") diff --git a/pycycling/sterzo.py b/pycycling/sterzo.py index 02bc6a6..81f6c92 100644 --- a/pycycling/sterzo.py +++ b/pycycling/sterzo.py @@ -28,7 +28,7 @@ async def _activate_steering_measurements(self): if sys.version_info >= (3, 11): challenge_file = importlib.resources.files(pycycling.data).joinpath('sterzo-challenge-codes.dat').open('rb') else: # legacy support < 3.9 - challenge_file = importlib.resources.open_binary(pycycling.data, 'sterzo-challenge-codes.dat') + challenge_file = importlib.resources.open_binary(pycycling.data, 'sterzo-challenge-codes.dat') # pylint: disable=deprecated-method with challenge_file: challenge_file.seek(self._latest_challenge * 2, 1)