Skip to content

Commit

Permalink
Merge pull request #92 from yuyangh/yuyangh/add_ASHA_GATT
Browse files Browse the repository at this point in the history
add ASHA profile
  • Loading branch information
barbibulle authored Dec 7, 2022
2 parents c86125d + 52db1cf commit 297246f
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
16 changes: 16 additions & 0 deletions bumble/gatt.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')

# ASHA Service
GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID('f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint')
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus')
GATT_ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')

# Misc
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
Expand Down Expand Up @@ -203,6 +211,14 @@ def __init__(self, uuid, characteristics: list[Characteristic], primary=True):
self.characteristics = characteristics[:]
self.primary = primary

def get_advertising_data(self):
"""
Get Service specific advertising data
Defined by each Service, default value is empty
:return Service data for advertising
"""
return None

def __str__(self):
return f'Service(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}){"" if self.primary else "*"}'

Expand Down
6 changes: 6 additions & 0 deletions bumble/gatt_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def send_gatt_pdu(self, connection_handle, pdu):
def next_handle(self):
return 1 + len(self.attributes)

def get_advertising_service_data(self):
return {
attribute: data for attribute in self.attributes
if isinstance(attribute, Service) and (data := attribute.get_advertising_data())
}

def get_attribute(self, handle):
attribute = self.attributes_by_handle.get(handle)
if attribute:
Expand Down
141 changes: 141 additions & 0 deletions bumble/profiles/asha_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
import logging
from ..core import AdvertisingData
from ..gatt import (
GATT_ASHA_SERVICE,
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
GATT_ASHA_VOLUME_CHARACTERISTIC,
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
PackedCharacteristicAdapter
)

# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)


# -----------------------------------------------------------------------------
class AshaService(TemplateService):
UUID = GATT_ASHA_SERVICE
OPCODE_START = 1
OPCODE_STOP = 2
OPCODE_STATUS = 3
PROTOCOL_VERSION = 0x01
RESERVED_FOR_FUTURE_USE = [00, 00]
FEATURE_MAP = [0x01] # [LE CoC audio output streaming supported]
SUPPORTED_CODEC_ID = [0x02, 0x01] # Codec IDs [G.722 at 16 kHz]
RENDER_DELAY = [00, 00]

def __init__(self, capability: int, hisyncid: [int]):
self.hisyncid = hisyncid
self.capability = capability # Device Capabilities [Left, Monaural]

# Handler for volume control
def on_volume_write(connection, value):
logger.info(f'--- VOLUME Write:{value[0]}')

# Handler for audio control commands
def on_audio_control_point_write(connection, value):
logger.info(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
opcode = value[0]
if opcode == AshaService.OPCODE_START:
# Start
audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
logger.info(
f'### START: codec={value[1]}, audio_type={audio_type}, volume={value[3]}, otherstate={value[4]}')
elif opcode == AshaService.OPCODE_STOP:
logger.info('### STOP')
elif opcode == AshaService.OPCODE_STATUS:
logger.info(f'### STATUS: connected={value[1]}')

# TODO Respond with a status
# asyncio.create_task(device.notify_subscribers(audio_status_characteristic, force=True))

self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([
AshaService.PROTOCOL_VERSION, # Version
self.capability,
]) +
bytes(self.hisyncid) +
bytes(AshaService.FEATURE_MAP) +
bytes(AshaService.RENDER_DELAY) +
bytes(AshaService.RESERVED_FOR_FUTURE_USE) +
bytes(AshaService.SUPPORTED_CODEC_ID)
)

self.audio_control_point_characteristic = Characteristic(
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write)
)
self.audio_status_characteristic = Characteristic(
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([0])
)
self.volume_characteristic = Characteristic(
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write)
)

# TODO add real psm value
self.psm = 0x0080
# self.psm = device.register_l2cap_channel_server(0, on_coc, 8)
self.le_psm_out_characteristic = Characteristic(
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
struct.pack('<H', self.psm)
)

characteristics = [self.read_only_properties_characteristic,
self.audio_control_point_characteristic,
self.audio_status_characteristic,
self.volume_characteristic,
self.le_psm_out_characteristic]

super().__init__(characteristics)

def get_advertising_data(self):
# Advertisement only uses 4 least significant bytes of the HiSyncId.
return bytes(
AdvertisingData([
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(GATT_ASHA_SERVICE)),
(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(GATT_ASHA_SERVICE) + bytes([
AshaService.PROTOCOL_VERSION,
self.capability,
]) + bytes(self.hisyncid[:4]))
])
)

0 comments on commit 297246f

Please sign in to comment.