Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix control point opcodes #37

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions examples/rizer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import asyncio
from bleak import BleakClient

from pycycling.rizer import Rizer

""" make sure the rizer is connected to a fitness machine otherwise the steering angle will not be transmitted """


async def run(address):
async with BleakClient(address) as client:
def steering_handler(steering_angle):
print(steering_angle)

await client.is_connected()
rizer = Rizer(client)
rizer.set_steering_measurement_callback(steering_handler)
await rizer.enable_steering_measurement_notifications()
await rizer.set_transmission_rate(0) # 8 Hz
await asyncio.sleep(4)
await rizer.set_transmission_rate(1) # 16 Hz
await asyncio.sleep(4)
await rizer.set_transmission_rate(2) # 32 Hz
await asyncio.sleep(4)
# recalibrate the rizer
#await rizer.set_center()

if __name__ == "__main__":
import os

os.environ["PYTHONASYNCIODEBUG"] = str(1)

device_address = "YOUR RIZER ADDRESS HERE"
loop = asyncio.get_event_loop()
loop.run_until_complete(run(device_address))
183 changes: 179 additions & 4 deletions pycycling/fitness_machine_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from pycycling.ftms_parsers import (
parse_fitness_machine_status,
parse_indoor_bike_data,
parse_fitness_machine_feature,
parse_all_features,
parse_training_status,
parse_control_point_response,
form_ftms_control_command,
Expand Down Expand Up @@ -107,7 +107,7 @@ async def get_fitness_machine_feature(self) -> FitnessMachineFeature:
message = await self._client.read_gatt_char(
ftms_fitness_machine_feature_characteristic_id
)
return parse_fitness_machine_feature(message)
return parse_all_features(message)

# === NOTIFY Characteristics ===
# ====== Indoor Bike Data ======
Expand Down Expand Up @@ -189,6 +189,7 @@ def _control_point_response_handler(
if self._control_point_response_callback is not None:
self._control_point_response_callback(parse_control_point_response(data))

# ====== Control Point Commands ======
async def request_control(self) -> None:
message = form_ftms_control_command(FTMSControlPointOpCode.REQUEST_CONTROL)
await self._client.write_gatt_char(
Expand All @@ -201,18 +202,192 @@ async def reset(self) -> None:
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_target_speed(self, speed: int) -> None:
if speed < 0:
raise ValueError("Speed must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_SPEED, speed
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_target_incline(self, inclination: int) -> None:
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_INCLINE, inclination
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_target_resistance_level(self, level: int) -> None:
if level < 0:
raise ValueError("Resistance level must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_RESISTANCE_LEVEL, int(level)
FTMSControlPointOpCode.SET_TARGET_RESISTANCE_LEVEL, level
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_target_power(self, power: int) -> None:
if power < 0:
raise ValueError("Power must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_POWER, int(power)
FTMSControlPointOpCode.SET_TARGET_POWER, power
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_target_heart_rate(self, heart_rate: int) -> None:
if heart_rate < 0:
raise ValueError("Heart rate must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_HEART_RATE, heart_rate
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def start_or_resume(self) -> None:
message = form_ftms_control_command(FTMSControlPointOpCode.START_OR_RESUME)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def stop_or_pause(self, pause: bool) -> None:
message = form_ftms_control_command(
FTMSControlPointOpCode.STOP_OR_PAUSE, 0x02 if pause else 0x01
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_expended_energy(self, energy: int) -> None:
if energy < 0:
raise ValueError("Energy must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_EXPENDED_ENERGY, energy
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_number_of_steps(self, steps: int) -> None:
if steps < 0:
raise ValueError("Steps must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STEPS, steps
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_number_of_strides(self, strides: int) -> None:
if strides < 0:
raise ValueError("Strides must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STRIDES, strides
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_distance(self, distance: int) -> None:
if distance < 0:
raise ValueError("Distance must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_DISTANCE, distance
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_training_time(self, time: int) -> None:
if time < 0:
raise ValueError("Time must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TRAINING_TIME, time
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_time_in_two_heart_rate_zones(self, times: list) -> None:
if len(times) != 2:
raise ValueError("Times must be a list of 2 elements")
if times[0] < 0 or times[1] < 0:
raise ValueError("Times must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TIME_IN_TWO_HEART_RATE_ZONES, times
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_time_in_three_heart_rate_zones(self, times: list) -> None:
if len(times) != 3:
raise ValueError("Times must be a list of 3 elements")
if times[0] < 0 or times[1] < 0 or times[2] < 0:
raise ValueError("Times must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TIME_IN_THREE_HEART_RATE_ZONES, times
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_time_in_five_heart_rate_zones(self, times: list) -> None:
if len(times) != 5:
raise ValueError("Times must be a list of 5 elements")
if times[0] < 0 or times[1] < 0 or times[2] < 0 or times[3] < 0 or times[4] < 0:
raise ValueError("Times must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TIME_IN_FIVE_HEART_RATE_ZONES, times
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_simulation_parameters(self, wind_speed: int, grade: int, crr: int, cw: int) -> None:
if crr < 0:
raise ValueError("Crr must be non-negative")
if cw < 0:
raise ValueError("Cw must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_INDOOR_BIKE_SIMULATION_PARAMETERS, [wind_speed, grade, crr, cw]
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_wheel_circumference(self, circumference: int) -> None:
if circumference < 0:
raise ValueError("Circumference must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_WHEEL_CIRCUMFERENCE, circumference
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_spin_down_control(self, control: int) -> None:
if control < 0:
raise ValueError("Control must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_SPIN_DOWN_CONTROL, control
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)

async def set_targeted_cadence(self, cadence: int) -> None:
if cadence < 0:
raise ValueError("Cadence must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_CADENCE, cadence
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
81 changes: 74 additions & 7 deletions pycycling/ftms_parsers/control_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,38 @@ class FTMSControlPointResponseResultCode(Enum):
OPERATION_FAILED = 0x04
CONTROL_NOT_PERMITTED = 0x05


class FTMSControlPointOpCode(Enum):
REQUEST_CONTROL = 0x00
RESET = 0x01
SET_TARGET_SPEED = 0x02
SET_TARGET_INCLINE = 0x03
SET_TARGET_RESISTANCE_LEVEL = 0x04
SET_TARGET_POWER = 0x05
START_OR_RESUME = 0x06
STOP_OR_PAUSE = 0x07
SET_TARGET_HEART_RATE = 0x06
START_OR_RESUME = 0x07
STOP_OR_PAUSE = 0x08
SET_TARGETED_EXPENDED_ENERGY = 0x09
SET_TARGETED_NUMBER_OF_STEPS = 0x0A
SET_TARGETED_NUMBER_OF_STRIDES = 0x0B
SET_TARGETED_DISTANCE = 0x0C
SET_TARGETED_TRAINING_TIME = 0x0D
SET_TARGETED_TIME_IN_TWO_HEART_RATE_ZONES = 0x0E
SET_TARGETED_TIME_IN_THREE_HEART_RATE_ZONES = 0x0F
SET_TARGETED_TIME_IN_FIVE_HEART_RATE_ZONES = 0x10
SET_INDOOR_BIKE_SIMULATION_PARAMETERS = 0x11
SET_WHEEL_CIRCUMFERENCE = 0x12
SET_SPIN_DOWN_CONTROL = 0x13
SET_TARGETED_CADENCE = 0x14
RESPONSE_CODE = 0x80


def form_ftms_control_command(opcode: FTMSControlPointOpCode, parameter: int = 0):
"""
Form a FTMS control command message
:param opcode: FTMSControlPointOpCode
:param parameter: scalar or list of scalar
:return: bytearray
"""
parameter = parameter if isinstance(parameter, list) else (int)(parameter)
if opcode == FTMSControlPointOpCode.REQUEST_CONTROL:
return b"\x00"
elif opcode == FTMSControlPointOpCode.RESET:
Expand All @@ -39,13 +57,62 @@ def form_ftms_control_command(opcode: FTMSControlPointOpCode, parameter: int = 0
elif opcode == FTMSControlPointOpCode.SET_TARGET_POWER:
# parameter: sint16, 1W
return b"\x05" + parameter.to_bytes(2, "little", signed=True)
elif opcode == FTMSControlPointOpCode.SET_TARGET_HEART_RATE:
# parameter: uint8, 1bpm
return b"\x06" + parameter.to_bytes(1, "little", signed=False)
elif opcode == FTMSControlPointOpCode.START_OR_RESUME:
# parameter: 01=stop, 02=pause
return b"\x06"
return b"\x07"
elif opcode == FTMSControlPointOpCode.STOP_OR_PAUSE:
return b"\x07" + parameter.to_bytes(1, "little", signed=False)
# parameter: 01=stop, 02=pause
return b"\x08" + parameter.to_bytes(1, "little", signed=False)
elif opcode == FTMSControlPointOpCode.RESPONSE_CODE:
return b"\x80"
elif opcode == FTMSControlPointOpCode.SET_TARGETED_EXPENDED_ENERGY:
# parameter: uint16, 1calories
return b"\x09" + parameter.to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STEPS:
# parameter: uint16, 1
return b"\x0A" + parameter.to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STRIDES:
# parameter: uint16, 1
return b"\x0B" + parameter.to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_DISTANCE:
# parameter: uint24, 1m
return b"\x0C" + parameter.to_bytes(3, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_TRAINING_TIME:
# parameter: uint16, 1s
return b"\x0D" + parameter.to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_TIME_IN_TWO_HEART_RATE_ZONES:
# parameter: list of 2 uint16, 1s
return b"\x0E" + parameter[0].to_bytes(2, "little", signed=False) \
+ parameter[1].to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_TIME_IN_THREE_HEART_RATE_ZONES:
# parameter: list of 3 uint16, 1s
return b"\x0F" + parameter[0].to_bytes(2, "little", signed=False) \
+ parameter[1].to_bytes(2, "little", signed=False) \
+ parameter[2].to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_TIME_IN_FIVE_HEART_RATE_ZONES:
# parameter: list of 5 uint16, 1s
return b"\x10" + parameter[0].to_bytes(2, "little", signed=False) \
+ parameter[1].to_bytes(2, "little", signed=False) \
+ parameter[2].to_bytes(2, "little", signed=False) \
+ parameter[3].to_bytes(2, "little", signed=False) \
+ parameter[4].to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_INDOOR_BIKE_SIMULATION_PARAMETERS:
# parameter: list of int16 0.001mps, int16 0.01%, uint8 0.0001, uint8 0.01kg/m
return b"\x11" + parameter[0].to_bytes(2, "little", signed=True) \
+ parameter[1].to_bytes(2, "little", signed=True) \
+ parameter[2].to_bytes(1, "little", signed=False) \
+ parameter[3].to_bytes(1, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_WHEEL_CIRCUMFERENCE:
# parameter: uint16, 0.1mm
return b"\x12" + parameter.to_bytes(2, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_SPIN_DOWN_CONTROL:
# parameter: 01=start, 02=ignore
return b"\x13" + parameter.to_bytes(1, "little", signed=False)
elif opcode == FTMSControlPointOpCode.SET_TARGETED_CADENCE:
# parameter: uint16, 1rpm
return b"\x14" + parameter.to_bytes(1, "little", signed=False)
else:
raise ValueError("Invalid opcode")

Expand Down
Loading
Loading