File: btmgmt_callback.py

package info (click to toggle)
python-btsocket 0.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 232 kB
  • sloc: python: 1,687; sh: 20; makefile: 6
file content (123 lines) | stat: -rw-r--r-- 4,044 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
Use callback-based programming style to read and write to BlueZ Management API
"""
import asyncio
from collections import deque

from btsocket import btmgmt_socket
from btsocket import btmgmt_protocol
from btsocket import tools

logger = tools.create_module_logger(__name__)


class Mgmt:

    def __init__(self):
        # Setup read and write sockets
        self.sock = btmgmt_socket.open()
        self.loop = asyncio.get_event_loop()
        # Store for event callbacks
        self._event_callbacks = dict()
        # Queue for commands to be written to BlueZ socket
        self.cmd_queue = deque()
        self.running = False

    def add_event_callback(self, event, callback):
        """
        Assign a callback to be called when a specific event happens.
        The callback should take two arguments.
          1) The response packet
          2) the AsyncMgmt() class instance object

        :param event: An entry from the enum btmgmt.Events
        :param callback: A callback function
        """
        self._event_callbacks[event] = callback

    def reader(self):
        """
        Read callback is called when data available on Bluetooth socket.
        Processes packet and hands-off to event callbacks that have subscribed
        to events.
        """
        logger.debug('Reader callback')
        data = self.sock.recv(100)
        pkt = btmgmt_protocol.reader(data)
        logger.info('pkt: [%s]', pkt)
        if pkt.header.event_code in self._event_callbacks:
            self._event_callbacks[pkt.header.event_code](pkt, self)
        if not self.running:
            self.stop()

    def writer(self):
        """
        Write callback when Bluetooth socket is available for writing.
        Takes commands that are on the cmd_queue and sends.
        """
        logger.debug('Writer callback')
        if len(self.cmd_queue) > 0:
            this_cmd = self.cmd_queue.popleft()
            logger.info('sending pkt [%s]', tools.format_pkt(this_cmd))
            self.sock.send(this_cmd)
        if not self.running and len(self.cmd_queue) == 0:
            self.loop.stop()
            # Do one more read to get the response from the last command
            self.reader()

    @staticmethod
    def _as_packet(pkt_objs):
        """Pack bytes together for sending"""
        full_pkt = b''
        for frame in pkt_objs:
            if frame:
                full_pkt += frame.octets
        return full_pkt

    def send(self, cmd, ctrl_idx, *params):
        """
        Add commands onto the queue ready to be sent.
        Basic structure of the command
        send(<command_name>, <adapter index>, <positional paramters>)

        :param cmd: A value from btmgmt.Commands
        :param ctrl_idx: The index of the controller [0xFFFF is non-controller]
        :param params: 0 or more input parameters for command
        """
        pkt_objs = btmgmt_protocol.command(cmd, ctrl_idx, *params)
        cmd_pkt = self._as_packet(pkt_objs)
        logger.debug('Queue command: %s', tools.format_pkt(cmd_pkt))
        self.cmd_queue.append(cmd_pkt)

    def stop(self):
        """
        Once all commands have been sent, exit the event loop
        """
        self.running = False

        self.loop.remove_writer(self.sock)
        self.loop.remove_reader(self.sock)
        self.loop.stop()

    def close(self):
        """
        Stop the event loop and close sockets etc.
        """
        btmgmt_socket.close(self.sock)
        # Stop the event loop
        self.loop.close()

    def start(self):
        self.running = True
        # Setup reader and writer for socket streams
        self.loop.add_reader(self.sock, self.reader)
        self.loop.add_writer(self.sock, self.writer)
        logger.debug('Starting event loop...')
        try:
            # Run the event loop
            self.loop.run_forever()
        except KeyboardInterrupt:
            self.loop.stop()
        finally:
            # We are done. Close sockets and the event loop.
            self.close()