File: asyncworker.py

package info (click to toggle)
python-gammu 3.2.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,024 kB
  • sloc: ansic: 11,446; python: 3,323; sql: 527; makefile: 6
file content (154 lines) | stat: -rw-r--r-- 5,468 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Async extensions for gammu."""
import asyncio

import gammu
import gammu.worker


class GammuAsyncThread(gammu.worker.GammuThread):
    """Thread for phone communication."""

    def __init__(self, queue, config, callback, pull_func):
        """Initialize thread."""
        super().__init__(queue, config, callback, pull_func)

    def _do_command(self, future, cmd, params, percentage=100):
        """Execute single command on phone."""
        func = getattr(self._sm, cmd)
        result = None
        try:
            if params is None:
                result = func()
            elif isinstance(params, dict):
                result = func(**params)
            else:
                result = func(*params)
        except gammu.GSMError as info:
            errcode = info.args[0]["Code"]
            error = gammu.ErrorNumbers[errcode]
            self._callback(future, result, error, percentage)
        except Exception as exception:  # pylint: disable=broad-except
            self._callback(future, None, exception, percentage)
        else:
            self._callback(future, result, None, percentage)


def gammu_pull_device(sm):
    sm.ReadDevice()


class GammuAsyncWorker(gammu.worker.GammuWorker):
    """Extend gammu worker class for async operations."""

    def worker_callback(self, name, result, error, percents):
        """Execute command from the thread worker."""
        future = None
        if name == "Init" and self._init_future is not None:
            future = self._init_future
        elif name == "Terminate" and self._terminate_future is not None:
            # Set _kill to true on the base class to avoid waiting for termination
            self._thread._kill = True  # pylint: disable=protected-access
            future = self._terminate_future
        elif hasattr(name, "set_result"):
            future = name

        if future is not None:
            if error is None:
                self._loop.call_soon_threadsafe(future.set_result, result)
            else:
                exception = error
                if not isinstance(error, Exception):
                    exception = gammu.GSMError(error)
                self._loop.call_soon_threadsafe(future.set_exception, exception)

    def __init__(self, pull_func=gammu_pull_device):
        """Initialize the worker class.

        @param callback: See L{GammuThread.__init__} for description.
        """
        super().__init__(self.worker_callback, pull_func)
        self._loop = asyncio.get_event_loop()
        self._init_future = None
        self._terminate_future = None
        self._thread = None
        self._pull_func = pull_func

    async def init_async(self):
        """Connect to phone."""
        self._init_future = self._loop.create_future()

        self._thread = GammuAsyncThread(
            self._queue, self._config, self._callback, self._pull_func
        )
        self._thread.start()

        await self._init_future
        self._init_future = None

    async def get_imei_async(self):
        """Get the IMEI of the device."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("GetIMEI", ())])
        return await future

    async def get_network_info_async(self):
        """Get the network info in the device."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("GetNetworkInfo", ())])
        return await future

    async def get_manufacturer_async(self):
        """Get the manufacturer of the device."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("GetManufacturer", ())])
        return await future

    async def get_model_async(self):
        """Get the model of the device."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("GetModel", ())])
        return await future

    async def get_firmware_async(self):
        """Get the firmware version of the device."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("GetFirmware", ())])
        return await future

    async def get_signal_quality_async(self):
        """Get signal quality from phone."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("GetSignalQuality", ())])
        result = await future
        return result

    async def send_sms_async(self, message):
        """Send sms message via the phone."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("SendSMS", [message])])
        result = await future
        return result

    async def set_incoming_callback_async(self, callback):
        """Set the callback to call from phone."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("SetIncomingCallback", [callback])])
        result = await future
        return result

    async def set_incoming_sms_async(self):
        """Activate SMS notifications from phone."""
        future = self._loop.create_future()
        self.enqueue(future, commands=[("SetIncomingSMS", ())])
        result = await future
        return result

    async def terminate_async(self):
        """Terminate phone communication."""
        self._terminate_future = self._loop.create_future()
        self.enqueue("Terminate")
        await self._terminate_future

        while self._thread.is_alive():
            await asyncio.sleep(5)
        self._thread = None