File: mainloop.py

package info (click to toggle)
python-libpulse 0.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 420 kB
  • sloc: python: 4,232; makefile: 22
file content (281 lines) | stat: -rw-r--r-- 9,794 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
"""An implementation of the libpulse Main Loop based on asyncio."""

import sys
import asyncio
import logging
import time
import gc
import ctypes as ct

from .libpulse_ctypes import PulseCTypes, python_object, PulseCTypesLibError
from .pulse_enums import pulse_enums

logger = logging.getLogger('libpuls')

try:
    pulse_ctypes = PulseCTypes()
except PulseCTypesLibError as e:
    sys.exit(f'{e!r}')

pa_io_event_flags = pulse_enums['pa_io_event_flags']

def callback_func_ptr(name, python_function):
    callback = pulse_ctypes.get_callback(name)
    return callback(python_function)

def build_mainloop_api():
    """Build an instance of the libpulse Main Loop API."""

    api = {'io_new': IoEvent.io_new,
           'io_enable': IoEvent.io_enable,
           'io_free': IoEvent.io_free,
           'io_set_destroy': PulseEvent.set_destroy,
           'time_new': TimeEvent.time_new,
           'time_restart': TimeEvent.time_restart,
           'time_free': TimeEvent.time_free,
           'time_set_destroy': PulseEvent.set_destroy,
           'defer_new': DeferEvent.defer_new,
           'defer_enable': DeferEvent.defer_enable,
           'defer_free': DeferEvent.defer_free,
           'defer_set_destroy': PulseEvent.set_destroy,
           'quit': quit
           }

    class Mainloop_api(ct.Structure):
        _fields_ = [('userdata', ct.c_void_p)]
        for name in api:
            _fields_.append((name, pulse_ctypes.get_callback(name)))

    # Instantiate Mainloop_api.
    args = [callback_func_ptr(name, api[name]) for name in api]
    return Mainloop_api(None, *args)

# Main Loop API functions.
# There is only one asyncio loop and therefore only one MainLoop instance per
# thread. The MainLoop instance referenced by any callback of the API is
# obtained by calling MainLoop.get_instance().
class PulseEvent:
    DEBUG = False
    HASHES = []

    def __init__(self, c_callback, c_userdata):
        self.mainloop = MainLoop.get_instance()
        self.c_callback = c_callback
        self.c_userdata = c_userdata
        self.c_destroy_cb = None

        self.c_self = ct.cast(ct.pointer(ct.py_object(self)), ct.c_void_p)
        PulseEvent.HASHES.append(hash(self))
        self.debug(f'__init__ {self.__class__.__name__}')

    def debug(self, msg):
        if PulseEvent.DEBUG:
            index = PulseEvent.HASHES.index(hash(self)) + 1
            logger.debug(f'{index}: {msg}')

    def destroy(self):
        self.debug(f'destroy-0 {self.__class__.__name__}')
        if self.c_destroy_cb:
            self.debug(f'destroy-1 {self.__class__.__name__}')
            self.c_destroy_cb(self.mainloop.C_MAINLOOP_API, self.c_self,
                              self.c_userdata)

    def __del__(self):
        try:
            index = PulseEvent.HASHES.index(hash(self))
        except ValueError:
            return
        PulseEvent.HASHES.pop(index)

    @staticmethod
    def set_destroy(c_event, c_callback):
        event = python_object(c_event)
        event.debug(f'set_destroy {event.__class__.__name__}')
        event.c_destroy_cb = c_callback

    @classmethod
    def cleanup(cls, mainloop):
        for event in cls.EVENTS:
            event.debug(f'cleanup {cls.__name__}')
            if event.mainloop is mainloop:
                event.destroy()

class IoEvent(PulseEvent):
    EVENTS = set()

    def __init__(self, fd, c_callback, c_userdata):
        super().__init__(c_callback, c_userdata)
        self.fd = fd
        self.flags = pa_io_event_flags['PA_IO_EVENT_NULL']

    def read_callback(self):
        self.debug('read_callback IoEvent')
        self.c_callback(self.mainloop.C_MAINLOOP_API, self.c_self, self.fd,
                    pa_io_event_flags['PA_IO_EVENT_INPUT'], self.c_userdata)

    def write_callback(self):
        self.debug('write_callback IoEvent')
        self.c_callback(self.mainloop.C_MAINLOOP_API, self.c_self, self.fd,
                    pa_io_event_flags['PA_IO_EVENT_OUTPUT'], self.c_userdata)

    def enable(self, flags):
        self.debug(f'enable IoEvent: {flags}')
        aio_loop = self.mainloop.aio_loop

        pa_io_event_input = pa_io_event_flags['PA_IO_EVENT_INPUT']
        pa_io_event_output = pa_io_event_flags['PA_IO_EVENT_OUTPUT']
        if flags & pa_io_event_input and not (self.flags & pa_io_event_input):
            aio_loop.add_reader(self.fd, self.read_callback)
        if not (flags & pa_io_event_input) and self.flags & pa_io_event_input:
            aio_loop.remove_reader(self.fd)
        if (flags & pa_io_event_output and
                not (self.flags & pa_io_event_output)):
            aio_loop.add_writer(self.fd, self.write_callback)
        if (not (flags & pa_io_event_output) and
                self.flags & pa_io_event_output):
            aio_loop.remove_writer(self.fd)
        self.flags = flags

    @staticmethod
    def io_new(c_mainloop_api, fd, flags, c_callback, c_userdata):
        event = IoEvent(fd, c_callback, c_userdata)
        event.enable(flags)
        IoEvent.EVENTS.add(event)
        return event.c_self.value

    @staticmethod
    def io_enable(c_io_event, flags):
        event = python_object(c_io_event, cls=IoEvent)
        event.debug(f'io_enable {flags}')
        event.enable(flags)

    @staticmethod
    def io_free(c_io_event):
        event = python_object(c_io_event, cls=IoEvent)
        event.debug('io_free')
        event.enable(pa_io_event_flags['PA_IO_EVENT_NULL'])
        IoEvent.EVENTS.remove(event)

class TimeEvent(PulseEvent):
    EVENTS = set()

    def __init__(self, c_callback, c_userdata):
        super().__init__(c_callback, c_userdata)
        self.timer_handle = None

    def restart(self, c_time):
        if self.timer_handle is not None:
            self.debug('restart TimeEvent - cancel')
            self.timer_handle.cancel()
            self.timer_handle = None
        if c_time is not None:
            timeval = c_time.contents
            delay = timeval.tv_sec + timeval.tv_usec / 10**6 - time.time()
            self.debug(f'restart TimeEvent - delay: {delay}')
            self.timer_handle = self.mainloop.aio_loop.call_later(
                delay, self.c_callback, self.mainloop.C_MAINLOOP_API,
                self.c_self, c_time, self.c_userdata)

    @staticmethod
    def time_new(c_mainloop_api, c_time, c_callback, c_userdata):
        event = TimeEvent(c_callback, c_userdata)
        event.restart(c_time)
        TimeEvent.EVENTS.add(event)
        return event.c_self.value

    @staticmethod
    def time_restart(c_time_event, c_time):
        event = python_object(c_time_event, cls=TimeEvent)
        event.debug('time_restart')
        event.restart(c_time)

    @staticmethod
    def time_free(c_time_event):
        event = python_object(c_time_event, cls=TimeEvent)
        event.debug('time_free')
        event.restart(None)
        TimeEvent.EVENTS.remove(event)

class DeferEvent(PulseEvent):
    EVENTS = set()

    def __init__(self, c_callback, c_userdata):
        super().__init__(c_callback, c_userdata)
        self.handle = None

    def enable(self, enable):
        self.debug(f'enable DeferEvent: {enable}')
        if self.handle is None and enable:
            self.handle = self.mainloop.aio_loop.call_soon(self.callback)
        elif self.handle is not None and not enable:
            self.handle.cancel()
            self.handle = None
        self.enabled = True if enable else False

    def callback(self):
        self.debug('callback DeferEvent')
        self.handle = None
        self.c_callback(self.mainloop.C_MAINLOOP_API, self.c_self,
                        self.c_userdata)
        if self.enabled:
            self.handle = self.mainloop.aio_loop.call_soon(self.callback)

    @staticmethod
    def defer_new(c_mainloop_api, c_callback, c_userdata):
        event = DeferEvent(c_callback, c_userdata)
        event.enable(True)
        DeferEvent.EVENTS.add(event)
        return event.c_self.value

    @staticmethod
    def defer_enable(c_defer_event, enable):
        event = python_object(c_defer_event, cls=DeferEvent)
        event.debug(f'defer_enable {enable}')
        event.enable(enable)

    @staticmethod
    def defer_free(c_defer_event):
        event = python_object(c_defer_event, cls=DeferEvent)
        event.debug('defer_free')
        event.enable(False)
        DeferEvent.EVENTS.remove(event)

def quit(c_mainloop_api, retval):
    logger.debug(f'quit() of the mainloop API called with retval={retval}')

# Keep a reference to the Python Main loop API so that it is not garbage
# collected.
_mainloop_api = build_mainloop_api()

class MainLoop:
    """An implementation of the libpulse Main Loop based on asyncio."""

    ASYNCIO_LOOPS = dict()              # {asyncio loop: MainLoop instance}
    C_MAINLOOP_API = ct.cast(ct.pointer(_mainloop_api), ct.c_void_p)

    def __init__(self, aio_loop):
        assert aio_loop not in MainLoop.ASYNCIO_LOOPS, (
            'There is already a MainLoop instance on this asyncio loop')
        self.aio_loop = aio_loop
        MainLoop.ASYNCIO_LOOPS[aio_loop] = self

    @staticmethod
    def get_instance():
        aio_loop = asyncio.get_running_loop()
        mloop = MainLoop.ASYNCIO_LOOPS.get(aio_loop)
        return mloop if mloop is not None else MainLoop(aio_loop)

    @staticmethod
    def close():
        mloop = MainLoop.get_instance()
        for cls in (IoEvent, TimeEvent, DeferEvent):
            cls.cleanup(mloop)
        gc.collect()

        for aio_loop, loop in list(MainLoop.ASYNCIO_LOOPS.items()):
            if loop is mloop:
                del MainLoop.ASYNCIO_LOOPS[aio_loop]
                break
        else:
            assert False, 'Cannot remove MainLoop instance upon closing'
        logger.info('LibPulse main loop closed')