File: callback_get_beacon_data.py

package info (click to toggle)
python-btsocket 0.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 232 kB
  • sloc: python: 1,687; sh: 20; makefile: 6
file content (79 lines) | stat: -rw-r--r-- 2,569 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import logging
from btsocket import btmgmt_callback
from btsocket import btmgmt_protocol

scan_loop = 0
max_scans = 4
beacon_address = 'DC:76:F7:E1:62:E0'

logger = logging.getLogger('btsocket.btmgmt_protocol')
logger.setLevel(logging.INFO)


def print_sensor_data(data):
    """
    Function to print sensor beacon data in pretty format
    """
    if len(data) == 14:
        battery_lvl = data[1]
        inst_temp = int.from_bytes(data[6:8],
                                   byteorder='big', signed=True) / 10
        humidity = int.from_bytes(data[8:10],
                                  byteorder='big', signed=True) / 10
        print(f'\tTemperature:   {inst_temp}\u00B0C')
        print(f'\tHumidity:      {humidity}%')
        print(f'\tBattery Level:  {battery_lvl}%')


def close_loop(data, mgmt_class):
    """
    Callback for when discovering event has happened. In this example
    check to see how many times discovery has timed out. If it is `max_scans
    then exit

    :param data: discovering event packet
    :param mgmt_class: Mgmt class object to access enums etc
    """
    global scan_loop
    if data.event_frame.discovering == 0:
        print(f'Discovering loop {scan_loop} ended')
        scan_loop += 1
        if scan_loop < max_scans:
            mgmt_class.send('StartDiscovery', 0,
                            [btmgmt_protocol.AddressType.LEPublic,
                             btmgmt_protocol.AddressType.LERandom])
        else:
            mgmt_class.stop()


def device_found(data, mgmt_class):
    """
    Callback for when device is found

    :param data: Device found event packet
    :param mgmt_class: Mgmt class object to access enums etc
    """
    if data.event_frame.address == beacon_address:
        eir_data = data.event_frame.eir_data
        manuf_data = eir_data.get(btmgmt_protocol.ADType.ManufacturerData)
        if all((manuf_data,
                manuf_data.startswith(b'\x33\01'),
                len(manuf_data) == 16)):
            print_sensor_data(manuf_data[2:])
            mgmt_class.stop()


def app():
    mgmt = btmgmt_callback.Mgmt()
    mgmt.add_event_callback(btmgmt_protocol.Events.DiscoveringEvent,
                            close_loop)
    mgmt.add_event_callback(btmgmt_protocol.Events.DeviceFoundEvent,
                            device_found)
    mgmt.send('StartDiscovery', 0, [btmgmt_protocol.AddressType.LEPublic,
                                    btmgmt_protocol.AddressType.LERandom])
    print('Starting search...')
    mgmt.start()


if __name__ == '__main__':
    app()