File: btmgmt_socket.py

package info (click to toggle)
python-btsocket 0.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 232 kB
  • sloc: python: 1,687; sh: 20; makefile: 6
file content (120 lines) | stat: -rw-r--r-- 3,008 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import asyncio
import ctypes
import socket
import sys


AF_BLUETOOTH = 31
PF_BLUETOOTH = AF_BLUETOOTH
SOCK_RAW = 3
BTPROTO_HCI = 1
SOCK_CLOEXEC = 524288
SOCK_NONBLOCK = 2048
HCI_CHANNEL_CONTROL = 3
HCI_DEV_NONE = 0xffff


class BluetoothSocketError(BaseException):
    pass


class BluetoothCommandError(BaseException):
    pass


class SocketAddr(ctypes.Structure):
    _fields_ = [
        ("hci_family", ctypes.c_ushort),
        ("hci_dev", ctypes.c_ushort),
        ("hci_channel", ctypes.c_ushort),
    ]


def open():
    """
    Because of the following issue with Python the Bluetooth User socket
    on linux needs to be done with lower level calls.
    https://bugs.python.org/issue36132
    Based on mgmt socket at:
    https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/mgmt-api.txt
    """

    sockaddr_hcip = ctypes.POINTER(SocketAddr)
    ctypes.cdll.LoadLibrary("libc.so.6")
    libc = ctypes.CDLL("libc.so.6")

    libc_socket = libc.socket
    libc_socket.argtypes = (ctypes.c_int, ctypes.c_int, ctypes.c_int)
    libc_socket.restype = ctypes.c_int

    bind = libc.bind
    bind.argtypes = (ctypes.c_int, ctypes.POINTER(SocketAddr), ctypes.c_int)
    bind.restype = ctypes.c_int

    # fd = libc_socket(PF_BLUETOOTH, SOCK_RAW | SOCK_CLOEXEC | SOCK_NONBLOCK,
    #               BTPROTO_HCI)
    fd = libc_socket(PF_BLUETOOTH, SOCK_RAW, BTPROTO_HCI)

    if fd < 0:
        raise BluetoothSocketError("Unable to open PF_BLUETOOTH socket")

    addr = SocketAddr()
    addr.hci_family = AF_BLUETOOTH  # AF_BLUETOOTH
    addr.hci_dev = HCI_DEV_NONE  # adapter index
    addr.hci_channel = HCI_CHANNEL_CONTROL  # HCI_USER_CHANNEL
    r = bind(fd, sockaddr_hcip(addr), ctypes.sizeof(addr))
    if r < 0:
        raise BluetoothSocketError("Unable to bind %s", r)

    sock_fd = socket.socket(AF_BLUETOOTH, SOCK_RAW, BTPROTO_HCI, fileno=fd)
    return sock_fd


def close(bt_socket):
    """Close the open socket"""
    fd = bt_socket.detach()
    socket.close(fd)


def test_asyncio_usage():
    sock = open()

    if sys.version_info < (3, 10):
        loop = asyncio.get_event_loop()
    else:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()

        asyncio.set_event_loop(loop)

    def reader():
        data = sock.recv(100)
        print("Received:", data)

        # We are done: unregister the file descriptor
        loop.remove_reader(sock)

        # Stop the event loop
        loop.stop()

    # Register the file descriptor for read event
    loop.add_reader(sock, reader)

    # Write a command to the socket
    # Read Management Version Information Command
    # b'\x01\x00\xff\xff\x00\x00'
    loop.call_soon(sock.send, b'\x01\x00\xff\xff\x00\x00')

    try:
        # Run the event loop
        loop.run_forever()
    finally:
        # We are done. Close sockets and the event loop.
        close(sock)
        loop.close()


if __name__ == '__main__':
    test_asyncio_usage()