1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992
|
"""ctypes interface to the pulse library based on asyncio."""
import sys
import asyncio
import logging
import re
import pprint
import ctypes as ct
from abc import ABC
from functools import partialmethod
from . import __version__
from .libpulse_ctypes import PulseCTypes
from .mainloop import MainLoop, pulse_ctypes, callback_func_ptr
from .pulse_functions import pulse_functions
from .pulse_enums import pulse_enums
struct_ctypes = pulse_ctypes.struct_ctypes
logger = logging.getLogger('libpuls')
def _add_pulse_to_namespace():
def add_obj(name, obj):
assert getattr(module, name, None) is None, f'{name} is duplicated'
setattr(module, name, obj)
# Add the pulse constants and functions to the module namespace.
module = sys.modules[__name__]
for name in pulse_functions['signatures']:
func = pulse_ctypes.get_prototype(name)
add_obj(name, func)
for enum, constants in pulse_enums.items():
for name, value in constants.items():
add_obj(name, value)
_add_pulse_to_namespace()
del _add_pulse_to_namespace
# /usr/include/pulse/def.h:
# #define PA_INVALID_INDEX ((uint32_t) -1)
PA_INVALID_INDEX = ct.c_uint32(-1).value
# 'pa_volume_t' values defined in volume.h.
PA_VOLUME_NORM = 0x10000 # Normal volume (100%, 0 dB).
PA_VOLUME_MUTED = 0 # Muted (minimal valid) volume (0%, -inf dB).
PA_VOLUME_MAX = 0xffffffff # UINT32_MAX/2 Maximum volume we can store.
PA_VOLUME_INVALID = 0xffffffff # Special 'invalid' volume.
def PA_VOLUME_IS_VALID(v): return v <= PA_VOLUME_MAX
PA_CHANNELS_MAX = 32 # Defined in sample.h
C_UINT_ARRAY_32 = ct.c_uint * PA_CHANNELS_MAX
C_INT_ARRAY_32 = ct.c_int * PA_CHANNELS_MAX
# Map values to their name.
CTX_STATES = dict((eval(state), state) for state in
('PA_CONTEXT_UNCONNECTED', 'PA_CONTEXT_CONNECTING',
'PA_CONTEXT_AUTHORIZING', 'PA_CONTEXT_SETTING_NAME',
'PA_CONTEXT_READY', 'PA_CONTEXT_FAILED',
'PA_CONTEXT_TERMINATED'))
OPERATION_STATES = dict((eval(state), state) for state in
('PA_OPERATION_CANCELLED', 'PA_OPERATION_DONE',
'PA_OPERATION_RUNNING'))
def event_codes_to_names():
def build_events_dict(mask):
for fac in globals():
if fac.startswith(prefix):
val = eval(fac)
if (val & mask) and val != mask:
yield val, fac[prefix_len:].lower()
prefix = 'PA_SUBSCRIPTION_EVENT_'
prefix_len = len(prefix)
facilities = {0 : 'sink'}
facilities.update(build_events_dict(PA_SUBSCRIPTION_EVENT_FACILITY_MASK))
event_types = {0: 'new'}
event_types.update(build_events_dict(PA_SUBSCRIPTION_EVENT_TYPE_MASK))
return facilities, event_types
# Dictionaries mapping libpulse events values to their names.
EVENT_FACILITIES, EVENT_TYPES = event_codes_to_names()
def run_in_task(coro):
"""Decorator to wrap a coroutine in a task of AsyncioTasks instance."""
async def wrapper(*args, **kwargs):
def get_coro_arg():
length = len(args)
coro_arg = ''
if length >=2:
coro_arg += f'{args[1].__qualname__}('
if length >= 3:
coro_arg += f'{args[2]})'
else:
coro_arg += ')'
return coro_arg
if 0:
# When enabled while running the test suite, will print all the
# pulseaudio coroutines that are being tested.
print(f'{coro.__qualname__}({get_coro_arg()})', file=sys.stderr)
lib_pulse = LibPulse._get_instance()
if lib_pulse is None:
raise LibPulseClosedError
try:
return await lib_pulse.libpulse_tasks.create_task(
coro(*args, **kwargs))
except asyncio.CancelledError:
logger.warning(f'{coro.__qualname__}({get_coro_arg()})'
' has been cancelled')
raise
return wrapper
class LibPulseError(Exception): pass
class LibPulseClosedError(LibPulseError): pass
class LibPulseStateError(LibPulseError): pass
class LibPulseOperationError(LibPulseError): pass
class LibPulseClosedIteratorError(LibPulseError): pass
class LibPulseInstanceExistsError(LibPulseError): pass
class LibPulseArgumentError(LibPulseError): pass
class EventIterator:
"""Pulse events asynchronous iterator."""
QUEUE_CLOSED = object()
def __init__(self):
self.event_queue = asyncio.Queue()
self.closed = False
# Public methods.
def close(self):
self.closed = True
# Private methods.
def abort(self):
while True:
try:
self.event_queue.get_nowait()
except asyncio.QueueEmpty:
break
self.put_nowait(self.QUEUE_CLOSED)
def put_nowait(self, obj):
if not self.closed:
self.event_queue.put_nowait(obj)
def __aiter__(self):
return self
async def __anext__(self):
if self.closed:
logger.info('Events Asynchronous Iterator is closed')
raise StopAsyncIteration
try:
event = await self.event_queue.get()
except asyncio.CancelledError:
self.close()
raise StopAsyncIteration
if event is not self.QUEUE_CLOSED:
return event
self.close()
raise LibPulseClosedIteratorError('Got QUEUE_CLOSED')
class AsyncioTasks:
def __init__(self):
self._tasks = set()
def create_task(self, coro):
task = asyncio.create_task(coro)
self._tasks.add(task)
task.add_done_callback(lambda t: self._tasks.remove(t))
return task
def __iter__(self):
for t in self._tasks:
yield t
class PulseEvent:
"""A libpulse event.
Use the event_facilities() and event_types() static methods to get all the
values currently defined by the libpulse library for 'facility' and
'type'. They correspond to some of the variables defined in the
pulse_enums module under the pa_subscription_event_type Enum.
attributes:
facility: str - name of the facility, for example 'sink'.
index: int - index of the facility.
type: str - type of event, normaly 'new', 'change' or 'remove'.
"""
def __init__(self, event_type, index):
fac = event_type & PA_SUBSCRIPTION_EVENT_FACILITY_MASK
assert fac in EVENT_FACILITIES
self.facility = EVENT_FACILITIES[fac]
type = event_type & PA_SUBSCRIPTION_EVENT_TYPE_MASK
assert type in EVENT_TYPES
self.type = EVENT_TYPES[type]
self.index = index
@staticmethod
def event_facilities():
return list(EVENT_FACILITIES.values())
@staticmethod
def event_types():
return list(EVENT_TYPES.values())
class PropList(dict):
"""Dictionary of the elements of a proplist whose value is a string."""
def __init__(self, c_pa_proplist):
super().__init__()
null_ptr = ct.POINTER(ct.c_void_p)()
null_ptr_ptr = ct.pointer(null_ptr)
while True:
key = pa_proplist_iterate(c_pa_proplist, null_ptr_ptr)
if not key:
break
elif isinstance(key, bytes):
val = pa_proplist_gets(c_pa_proplist, key)
if val:
self[key.decode()] = val.decode()
class PulseStructure:
"""The representation of a ctypes Structure.
When returned by a callback as a pointer to a structure, one must make a
deep copy of the elements of the structure as they are only temporarily
available.
"""
ignored_pointer_names = set()
array_sizes = {
'pa_card_port_info': 'n_ports',
'pa_source_port_info': 'n_ports',
'pa_sink_port_info': 'n_ports',
'pa_card_profile_info': 'n_profiles',
'pa_card_profile_info2': 'n_profiles',
'pa_format_info': 'n_formats',
}
def __init__(self, c_struct, c_structure_type):
for name, c_type in c_structure_type._fields_:
fq_name = f'{c_structure_type.__name__}.{name}: {c_type.__name__}'
if fq_name in self.ignored_pointer_names:
continue
try:
c_struct_val = getattr(c_struct, name)
except AttributeError:
assert False, (f'{fq_name} not found while instantiating'
f' a PulseStructure')
if c_type in PulseCTypes.numeric_types.values():
setattr(self, name, c_struct_val)
# A NULL pointer.
elif not c_struct_val:
setattr(self, name, None)
elif c_type is ct.c_char_p:
setattr(self, name, c_struct_val.decode())
# A proplist pointer.
elif (isinstance(c_struct_val, ct._Pointer) and
c_struct_val._type_.__name__ == 'pa_proplist'):
setattr(self, name, PropList(c_struct_val))
# An array.
elif isinstance(c_struct_val, ct.Array):
# All libpulse arrays have a numeric type.
setattr(self, name, c_struct_val[:])
# An array of pointers.
elif (isinstance(c_struct_val, ct._Pointer) and
isinstance(c_struct_val.contents, ct._Pointer)):
ctype = c_struct_val.contents._type_
if ctype.__name__ in struct_ctypes:
val = []
ptr = c_struct_val
size_attr = self.array_sizes[ctype.__name__]
array_size = getattr(self, size_attr)
for i in range(array_size):
if not ptr[i]:
break
val.append(PulseStructure(ptr[i].contents, ctype))
setattr(self, name, val)
else:
self.ignore_member(fq_name)
# A pointer.
elif isinstance(c_struct_val, ct._Pointer):
if c_struct_val._type_.__name__ in struct_ctypes:
setattr(self, name,
PulseStructure(c_struct_val.contents,
c_struct_val._type_))
else:
self.ignore_member(fq_name)
# A structure.
else:
if c_type.__name__ in struct_ctypes:
setattr(self, name,
PulseStructure(c_struct_val, c_type))
else:
self.ignore_member(fq_name)
def ignore_member(self, name):
self.ignored_pointer_names.add(name)
logger.debug(f"Ignoring '{name}' structure member")
def __repr__(self):
return pprint.pformat(self.__dict__, sort_dicts=False)
class CtypesPulseStructure(ABC):
"""Container for an instance of a subclass of ctypes.Structure.
Scalar attributes of the instance may be updated after instanciation as
well as the scalar elements of an array, but not the array itself or the
other aggregate attributes.
"""
@staticmethod
def get_fields_names(struct_name):
return [field[0] for field in struct_ctypes[struct_name]._fields_]
def __getattr__(self, name):
if name in self.fields_names:
return getattr(self.ct_struct, name)
else:
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'")
def __setattr__(self, name, val):
if name in self.fields_names:
setattr(self.c_obj, name, val)
else:
object.__setattr__(self, name, val)
def byref(self):
return ct.byref(self.ct_struct)
def to_pulse_structure(self):
return PulseStructure(self.ct_struct, struct_ctypes[self.struct_name])
class Pa_buffer_attr(CtypesPulseStructure):
struct_name = 'pa_buffer_attr'
fields_names = CtypesPulseStructure.get_fields_names(struct_name)
def __init__(self, maxlength, tlength, prebuf, minreq, fragsize):
self.ct_struct = struct_ctypes[self.struct_name](
maxlength, tlength, prebuf, minreq, fragsize)
class Pa_cvolume(CtypesPulseStructure):
struct_name = 'pa_cvolume'
fields_names = CtypesPulseStructure.get_fields_names(struct_name)
def __init__(self, channels, values):
length = len(values)
assert length <= PA_CHANNELS_MAX
values += [0 for i in range(PA_CHANNELS_MAX - length)]
self.ct_struct = struct_ctypes[self.struct_name](
channels, C_UINT_ARRAY_32(*values))
class Pa_channel_map(CtypesPulseStructure):
struct_name = 'pa_channel_map'
fields_names = CtypesPulseStructure.get_fields_names(struct_name)
def __init__(self, channels, map):
length = len(map)
assert length <= PA_CHANNELS_MAX
map += [0 for i in range(PA_CHANNELS_MAX - length)]
self.ct_struct = struct_ctypes[self.struct_name](
channels, C_INT_ARRAY_32(*map))
class Pa_format_info(CtypesPulseStructure):
struct_name = 'pa_format_info'
fields_names = CtypesPulseStructure.get_fields_names(struct_name)
def __init__(self, encoding, plist):
# 'plist' is an instance of ctypes._Pointer that has been built using
# some of the pa_proplist_ functions.
self.ct_struct = struct_ctypes[self.struct_name](encoding, plist)
class Pa_sample_spec(CtypesPulseStructure):
struct_name = 'pa_sample_spec'
fields_names = CtypesPulseStructure.get_fields_names(struct_name)
def __init__(self, format, rate, channels):
self.ct_struct = struct_ctypes[self.struct_name](
format, rate, channels)
class LibPulse:
"""Interface to libpulse library as an asynchronous context manager."""
ASYNCIO_LOOPS = dict() # {asyncio loop: LibPulse instance}
# Function signature: (pa_operation *,
# [pa_context *, [args...], cb_t, void *])
# Callback signature: (void, [pa_context *, [objs...], void *])
context_methods = (
'pa_context_add_autoload',
'pa_context_drain',
'pa_context_get_server_info',
'pa_context_load_module',
'pa_context_play_sample_with_proplist',
# pa_context_send_message_to_object() is the only method that is not
# in the 'LibPulse.context_list_methods' list and that returns a list
# of objects. See the 'pa_context_string_cb_t' callback signature in
# the pulse_functions module.
'pa_context_send_message_to_object',
# The context state is monitored by the LibPulse instance.
# 'pa_context_set_state_callback',
# Reception of events is monitored by the LibPulse instance.
# 'pa_context_set_subscribe_callback',
'pa_context_stat',
'pa_ext_device_restore_test',
)
# Function signature: (pa_operation *,
# [pa_context *, [args...], cb_t, void *])
# Callback signature: (void, [pa_context *, int, void *])
context_success_methods = (
'pa_context_exit_daemon',
'pa_context_kill_client',
'pa_context_kill_sink_input',
'pa_context_kill_source_output',
'pa_context_move_sink_input_by_index',
'pa_context_move_sink_input_by_name',
'pa_context_move_source_output_by_index',
'pa_context_move_source_output_by_name',
'pa_context_play_sample',
'pa_context_proplist_remove',
'pa_context_proplist_update',
'pa_context_remove_autoload_by_index',
'pa_context_remove_autoload_by_name',
'pa_context_remove_sample',
'pa_context_set_card_profile_by_index',
'pa_context_set_card_profile_by_name',
'pa_context_set_default_sink',
'pa_context_set_default_source',
'pa_context_set_name',
'pa_context_set_port_latency_offset',
'pa_context_set_sink_input_mute',
'pa_context_set_sink_input_volume',
'pa_context_set_sink_mute_by_index',
'pa_context_set_sink_mute_by_name',
'pa_context_set_sink_port_by_index',
'pa_context_set_sink_port_by_name',
'pa_context_set_sink_volume_by_index',
'pa_context_set_sink_volume_by_name',
'pa_context_set_source_mute_by_index',
'pa_context_set_source_mute_by_name',
'pa_context_set_source_output_mute',
'pa_context_set_source_output_volume',
'pa_context_set_source_port_by_index',
'pa_context_set_source_port_by_name',
'pa_context_set_source_volume_by_index',
'pa_context_set_source_volume_by_name',
'pa_context_subscribe',
'pa_context_suspend_sink_by_index',
'pa_context_suspend_sink_by_name',
'pa_context_suspend_source_by_index',
'pa_context_suspend_source_by_name',
'pa_context_unload_module',
'pa_ext_device_restore_save_formats',
'pa_ext_device_restore_subscribe',
)
# Function signature: (pa_operation *, [pa_context *, cb_t, void *])
# Callback signature: (void, [pa_context *, struct *, int, void *])
context_list_methods = (
'pa_context_get_autoload_info_by_index',
'pa_context_get_autoload_info_by_name',
'pa_context_get_autoload_info_list',
'pa_context_get_card_info_by_index',
'pa_context_get_card_info_by_name',
'pa_context_get_card_info_list',
'pa_context_get_client_info',
'pa_context_get_client_info_list',
'pa_context_get_module_info',
'pa_context_get_module_info_list',
'pa_context_get_sample_info_by_index',
'pa_context_get_sample_info_by_name',
'pa_context_get_sample_info_list',
'pa_context_get_sink_info_by_index',
'pa_context_get_sink_info_by_name',
'pa_context_get_sink_info_list',
'pa_context_get_sink_input_info',
'pa_context_get_sink_input_info_list',
'pa_context_get_source_info_by_index',
'pa_context_get_source_info_by_name',
'pa_context_get_source_info_list',
'pa_context_get_source_output_info',
'pa_context_get_source_output_info_list',
'pa_ext_device_restore_read_formats',
'pa_ext_device_restore_read_formats_all',
)
# Function signature: (pa_operation *,
# [pa_stream *, [args...], cb_t, void *])
# Callback signature: (void, [pa_stream *, int, void *])
stream_success_methods = (
'pa_stream_cork',
'pa_stream_drain',
'pa_stream_flush',
'pa_stream_prebuf',
'pa_stream_proplist_remove',
'pa_stream_proplist_update',
'pa_stream_set_buffer_attr',
'pa_stream_set_name',
'pa_stream_trigger',
'pa_stream_update_sample_rate',
'pa_stream_update_timing_info',
)
def __init__(self, name, server=None, flags=PA_CONTEXT_NOAUTOSPAWN):
""" Constructor arguments:
- 'name' Name of the application.
- 'server' Server name, if 'server' is None, connect to the
default server.
- 'flags' 'flags' and 'server' are arguments of
pa_context_connect() used to connect to the server.
"""
logger.info(f'Python libpulse version {__version__}')
assert isinstance(name, str)
if server is not None:
assert isinstance(server, str)
server = server.encode()
self.server = server
self.flags = flags
self.loop = asyncio.get_running_loop()
if self.loop in self.ASYNCIO_LOOPS:
raise LibPulseInstanceExistsError
self.c_context = pa_context_new(MainLoop.C_MAINLOOP_API,
name.encode())
# From the ctypes documentation: "NULL pointers have a False
# boolean value".
if not self.c_context:
raise RuntimeError('Cannot get context from libpulse library')
self.closed = False
self.state = ('PA_CONTEXT_UNCONNECTED', 'PA_OK')
self.main_task = asyncio.current_task(self.loop)
self.libpulse_tasks = AsyncioTasks()
self.state_notification = self.loop.create_future()
self.event_iterator = None
self.ASYNCIO_LOOPS[self.loop] = self
LibPulse.add_async_methods()
# Keep a reference to prevent garbage collection.
self.c_context_state_callback = callback_func_ptr(
'pa_context_notify_cb_t', LibPulse.context_state_callback)
self.c_context_subscribe_callback = callback_func_ptr(
'pa_context_subscribe_cb_t', LibPulse.context_subscribe_callback)
# Initialisation.
@staticmethod
def add_async_methods():
# Register the partial methods.
method_types = {
'context_methods': LibPulse._pa_context_get,
'context_success_methods': LibPulse._pa_context_op_success,
'context_list_methods': LibPulse._pa_context_get_list,
'stream_success_methods': LibPulse._pa_stream_op_success,
}
this_module = sys.modules[__name__]
for method_type, libpulse_method in method_types.items():
func_names = getattr(LibPulse, method_type)
for func_name in func_names:
setattr(LibPulse, func_name,
partialmethod(libpulse_method, func_name))
if hasattr(this_module, func_name):
delattr(this_module, func_name)
@staticmethod
def _get_instance():
"""Get the LibPulse instance running on the asyncio loop.
The instance may not be yet connected or be in the closing state.
Prefer using get_current_instance().
"""
loop = asyncio.get_running_loop()
try:
return LibPulse.ASYNCIO_LOOPS[loop]
except KeyError:
return None
@staticmethod
def context_state_callback(c_context, c_userdata):
"""Call back that monitors the connection state."""
lib_pulse = LibPulse._get_instance()
if lib_pulse is None:
return
st = pa_context_get_state(c_context)
st = CTX_STATES[st]
if st in ('PA_CONTEXT_READY', 'PA_CONTEXT_FAILED',
'PA_CONTEXT_TERMINATED'):
error = pa_strerror(pa_context_errno(c_context))
state = (st, error.decode())
logger.info(f'LibPulse connection: {state}')
state_notification = lib_pulse.state_notification
lib_pulse.state = state
if not state_notification.done():
state_notification.set_result(state)
elif not lib_pulse.closed and st != 'PA_CONTEXT_READY':
# A task is used here instead of calling directly abort() so
# that pa_context_connect() has the time to handle a
# previous PA_CONTEXT_READY state.
asyncio.create_task(lib_pulse.abort(state))
else:
logger.debug(f'LibPulse connection: {st}')
@run_in_task
async def _pa_context_connect(self):
"""Connect the context to the default server."""
pa_context_set_state_callback(self.c_context,
self.c_context_state_callback, None)
rc = pa_context_connect(self.c_context, self.server, self.flags, None)
logger.debug(f'pa_context_connect return code: {rc}')
await self.state_notification
if self.state[0] != 'PA_CONTEXT_READY':
raise LibPulseStateError(self.state)
@staticmethod
def context_subscribe_callback(c_context, event_type, index, c_userdata):
"""Call back to handle pulseaudio events."""
lib_pulse = LibPulse._get_instance()
if lib_pulse is None:
return
if lib_pulse.event_iterator is not None:
lib_pulse.event_iterator.put_nowait(PulseEvent(event_type,
index))
# Libpulse async methods workers.
@staticmethod
def get_callback_data(func_name):
# Get name and signature of the callback argument of 'func_name'.
func_sig = pulse_functions['signatures'][func_name]
args = func_sig[1]
for arg in args:
if arg in pulse_functions['callbacks']:
callback_name = arg
callback_sig = pulse_functions['callbacks'][arg]
assert len(args) >= 3 and arg == args[-2]
return callback_name, callback_sig
def call_ctypes_func(self, func_name, operation_type, cb_func_ptr,
*func_args):
# Call the 'func_name' ctypes function.
args = []
for arg in func_args:
arg = arg.encode() if isinstance(arg, str) else arg
args.append(arg)
func_proto = pulse_ctypes.get_prototype(func_name)
try:
c_operation = func_proto(operation_type, *args, cb_func_ptr, None)
except ct.ArgumentError as e:
first_arg = ('c_context' if operation_type == self.c_context else
'pa_stream')
raise LibPulseArgumentError(
f"\nException reported by ctypes:\n"
f" {e!r}"
f"\nFunction arguments:\n"
f" {func_name}{(first_arg, *args, cb_func_ptr, None)}\n"
)
return c_operation
@staticmethod
async def handle_operation(c_operation, future):
# From the ctypes documentation: "NULL pointers have a False
# boolean value".
if not c_operation:
future.cancel()
error = "NULL 'pa_operation' pointer"
lib_pulse = LibPulse._get_instance()
if lib_pulse is not None:
errmsg = pa_strerror(pa_context_errno(lib_pulse.c_context))
error += f': {errmsg.decode()}'
raise LibPulseOperationError(error)
try:
await future
except asyncio.CancelledError:
pa_operation_cancel(c_operation)
raise
finally:
pa_operation_unref(c_operation)
async def _pa_get(self, func_name, operation_type, *func_args):
"""Call an asynchronous pulse function that does not return a list.
'func_args' is the sequence of the arguments of the function preceding
the callback in the function signature. The last argument
(i.e. 'userdata') is set to None by call_ctypes_func().
"""
def callback_func(c_operation_type, *c_results):
results = []
try:
for arg, c_result in zip(callback_sig[1][1:-1],
c_results[:-1]):
arg_list = arg.split()
if arg_list[-1] == '*' and arg_list[0] in struct_ctypes:
struct_name = arg_list[0]
if not c_result:
results.append(None)
else:
results.append(PulseStructure(c_result.contents,
struct_ctypes[struct_name]))
else:
if arg == 'char *':
c_result = c_result.decode()
results.append(c_result)
except Exception as e:
results = e
finally:
if not notification.done():
notification.set_result(results)
callback_data = self.get_callback_data(func_name)
assert callback_data, f'{func_name} signature without a callback'
callback_name, callback_sig = callback_data
notification = self.loop.create_future()
# Await on the future.
cb_func_ptr = callback_func_ptr(callback_name, callback_func)
c_operation = self.call_ctypes_func(func_name, operation_type,
cb_func_ptr, *func_args)
await LibPulse.handle_operation(c_operation, notification)
results = notification.result()
if isinstance(results, Exception):
raise results
for result in results:
if result is None:
raise LibPulseOperationError(
'NULL pointer result returned by the callback')
if len(results) == 1:
return results[0]
return results
async def _pa_get_list(self, func_name, operation_type, *func_args):
"""Call an asynchronous pulse function that returns a list.
'func_args' is the sequence of the arguments of the function preceding
the callback in the function signature. The last argument
(i.e. 'userdata') is set to None by call_ctypes_func().
"""
def info_callback(c_operation_type, c_info, eol, c_userdata):
# From the ctypes documentation: "NULL pointers have a False
# boolean value".
if not c_info:
if not notification.done():
notification.set_result(eol)
else:
try:
arg = callback_sig[1][1]
arg_list = arg.split()
assert arg_list[-1] == '*'
assert arg_list[0] in struct_ctypes
struct_name = arg_list[0]
infos.append(PulseStructure(c_info.contents,
struct_ctypes[struct_name]))
except Exception as e:
if not notification.done():
notification.set_result(e)
callback_data = self.get_callback_data(func_name)
assert callback_data, f'{func_name} signature without a callback'
callback_name, callback_sig = callback_data
infos = []
notification = self.loop.create_future()
# Await on the future.
cb_func_ptr = callback_func_ptr(callback_name, info_callback)
c_operation = self.call_ctypes_func(func_name, operation_type,
cb_func_ptr, *func_args)
await LibPulse.handle_operation(c_operation, notification)
eol = notification.result()
if isinstance(eol, Exception):
raise eol
if eol < 0:
error = "'eol' set to a negative value by the callback"
errmsg = pa_strerror(pa_context_errno(self.c_context))
error += f': {errmsg.decode()}'
raise LibPulseOperationError(error)
if func_name.endswith(('_by_name', '_by_index', '_info', '_formats')):
assert len(infos) == 1
return infos[0]
return infos
@run_in_task
async def _run_in_task(self, method,
func_name, operation_type, *func_args):
try:
return await method(func_name, operation_type, *func_args)
except (Exception, asyncio.CancelledError) as e:
return e
async def _pa_context_get(self, func_name, *func_args):
result = await self._run_in_task(self._pa_get, func_name,
self.c_context, *func_args)
if isinstance(result, (Exception, asyncio.CancelledError)):
raise result
return result
async def _pa_success(self, func_name, operation_type, *func_args):
success = await self._run_in_task(self._pa_get, func_name,
operation_type, *func_args)
if isinstance(success, (Exception, asyncio.CancelledError)):
raise success
return success
async def _pa_context_op_success(self, func_name, *func_args):
result = await self._pa_success(func_name, self.c_context, *func_args)
if result != PA_OPERATION_DONE:
error = pa_strerror(pa_context_errno(self.c_context))
raise LibPulseOperationError(f'Failure: {error.decode()}: '
f'{OPERATION_STATES[result]!r}')
return result
async def _pa_stream_op_success(self, func_name, pa_stream, *func_args):
return await self._pa_success(func_name, pa_stream, *func_args)
async def _pa_context_get_list(self, func_name, *func_args):
result = await self._run_in_task(self._pa_get_list, func_name,
self.c_context, *func_args)
if isinstance(result, (Exception, asyncio.CancelledError)):
raise result
else:
return result
# Context manager.
async def abort(self, state):
# Cancelling the main task does close the LibPulse context manager.
logger.error(f'The LibPulse instance has been aborted: {state}')
self.main_task.cancel()
async def close(self):
if self.closed:
return
self.closed = True
try:
for task in self.libpulse_tasks:
task.cancel()
if self.event_iterator is not None:
self.event_iterator.abort()
pa_context_set_state_callback(self.c_context, None, None)
pa_context_set_subscribe_callback(self.c_context, None, None)
if self.state[0] == 'PA_CONTEXT_READY':
try:
await self.pa_context_drain()
except (Exception, asyncio.CancelledError):
# Quoting pulse documentation:
# If there is nothing to drain, the function returns NULL.
pass
pa_context_disconnect(self.c_context)
logger.info('Disconnected from libpulse context')
finally:
pa_context_unref(self.c_context)
for loop, lib_pulse in list(self.ASYNCIO_LOOPS.items()):
if lib_pulse is self:
del self.ASYNCIO_LOOPS[loop]
break
else:
logger.error('Cannot remove LibPulse instance upon closing')
MainLoop.close()
logger.debug('LibPulse instance closed')
async def __aenter__(self):
try:
# Set up the two callbacks that live until this instance is
# closed.
self.main_task = asyncio.current_task(self.loop)
await self._pa_context_connect()
pa_context_set_subscribe_callback(self.c_context,
self.c_context_subscribe_callback, None)
return self
except asyncio.CancelledError:
await self.close()
if self.state[0] != 'PA_CONTEXT_READY':
raise LibPulseStateError(self.state)
except Exception:
await self.close()
raise
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
if exc_type is asyncio.CancelledError:
if self.state[0] != 'PA_CONTEXT_READY':
raise LibPulseStateError(self.state)
# Public methods.
@staticmethod
async def get_current_instance():
"""Get the LibPulse running instance.
Raises LibPulseStateError if the instance is not in the
PA_CONTEXT_READY state.
"""
lib_pulse = LibPulse._get_instance()
if lib_pulse is not None:
await lib_pulse.state_notification
if lib_pulse.state[0] != 'PA_CONTEXT_READY':
raise LibPulseStateError(lib_pulse.state)
return lib_pulse
def get_events_iterator(self):
"""Return an Asynchronous Iterator of libpulse events.
The iterator is used to run an async for loop over the PulseEvent
instances. The async for loop can be terminated by invoking the
close() method of the iterator from within the loop or from another
task.
"""
if self.closed:
raise LibPulseOperationError('The LibPulse instance is closed')
if self.event_iterator is not None and not self.event_iterator.closed:
raise LibPulseError('Not allowed: the current Asynchronous'
' Iterator must be closed first')
self.event_iterator = EventIterator()
return self.event_iterator
async def log_server_info(self):
if self.state[0] != 'PA_CONTEXT_READY':
raise LibPulseStateError(self.state)
server_info = await self.pa_context_get_server_info()
server_name = server_info.server_name
if re.match(r'.*\d+\.\d', server_name):
# Pipewire includes the server version in the server name.
logger.info(f'Server: {server_name}')
else:
logger.info(f'Server: {server_name} {server_info.server_version}')
version = pa_context_get_protocol_version(self.c_context)
server_ver = pa_context_get_server_protocol_version(self.c_context)
logger.debug(f'libpulse library/server versions: '
f'{version}/{server_ver}')
# 'server' is the name of the socket libpulse is connected to.
server = pa_context_get_server(self.c_context)
logger.debug(f'{server_name} connected to {server.decode()}')
|