1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from bumble import data_types
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import HCI_Reset_Command
from bumble.profiles.device_information_service import DeviceInformationService
from bumble.profiles.heart_rate_service import HeartRateService
from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
class HeartRateMonitor:
def __init__(self, hci_source, hci_sink):
self.heart_rate = 60
self.device = Device.with_hci(
'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
)
device_information_service = DeviceInformationService(
manufacturer_name='ACME',
model_number='HR-102',
serial_number='7654321',
hardware_revision='1.1.3',
software_revision='2.5.6',
system_id=(0x123456, 0x8877665544),
)
self.heart_rate_service = HeartRateService(
read_heart_rate_measurement=lambda _: HeartRateService.HeartRateMeasurement(
heart_rate=self.heart_rate,
sensor_contact_detected=True,
),
body_sensor_location=HeartRateService.BodySensorLocation.WRIST,
reset_energy_expended=self.reset_energy_expended,
)
# Notify subscribers of the current value as soon as they subscribe
@self.heart_rate_service.heart_rate_measurement_characteristic.on(
'subscription'
)
def on_subscription(_, notify_enabled, indicate_enabled):
if notify_enabled or indicate_enabled:
self.notify_heart_rate()
self.device.add_services([device_information_service, self.heart_rate_service])
self.device.advertising_data = bytes(
AdvertisingData(
[
data_types.Flags(
AdvertisingData.Flags.LE_GENERAL_DISCOVERABLE_MODE
| AdvertisingData.Flags.BR_EDR_NOT_SUPPORTED
),
data_types.CompleteLocalName('Bumble Heart'),
data_types.IncompleteListOf16BitServiceUUIDs(
[self.heart_rate_service.uuid]
),
data_types.Appearance(
data_types.Appearance.Category.HEART_RATE_SENSOR,
data_types.Appearance.HeartRateSensorSubcategory.GENERIC_HEART_RATE_SENSOR,
),
]
)
)
async def start(self):
print('### Starting Monitor')
await self.device.power_on()
await self.device.start_advertising(auto_restart=True)
print('### Monitor started')
async def stop(self):
# TODO: replace this once a proper reset is implemented in the lib.
await self.device.host.send_command(HCI_Reset_Command())
await self.device.power_off()
print('### Monitor stopped')
def notify_heart_rate(self):
AsyncRunner.spawn(
self.device.notify_subscribers(
self.heart_rate_service.heart_rate_measurement_characteristic
)
)
def set_heart_rate(self, heart_rate):
self.heart_rate = heart_rate
self.notify_heart_rate()
def reset_energy_expended(self, _):
print('<<< Reset Energy Expended')
# -----------------------------------------------------------------------------
def main(hci_source, hci_sink):
return HeartRateMonitor(hci_source, hci_sink)
|