1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
# SPDX-FileCopyrightText: 2013 Ole Martin Bjorndalen <ombdalen@gmail.com>
#
# SPDX-License-Identifier: MIT
"""Backend for rtmidi-python:
https://pypi.python.org/pypi/rtmidi-python
To use this backend copy (or link) it to somewhere in your Python path
and call:
mido.set_backend('mido.backends.rtmidi_python')
or set shell variable $MIDO_BACKEND to mido.backends.rtmidi_python
TODO:
* add support for APIs.
* active_sensing is still filtered. (The same is true for
mido.backends.rtmidi.)There may be a way to remove this filtering.
"""
import queue
import rtmidi_python as rtmidi
# TODO: change this to a relative import if the backend is included in
# the package.
from ..ports import BaseInput, BaseOutput
def get_devices(api=None, **kwargs):
devices = {}
input_names = rtmidi.MidiIn().ports
output_names = rtmidi.MidiOut().ports
for name in input_names + output_names:
if name not in devices:
devices[name] = {
'name': name,
'is_input': name in input_names,
'is_output': name in output_names,
}
return list(devices.values())
class PortCommon:
def _open(self, virtual=False, **kwargs):
self._queue = queue.Queue()
self._callback = None
# rtapi = _get_api_id(api)
opening_input = hasattr(self, 'receive')
if opening_input:
self._rt = rtmidi.MidiIn()
self._rt.ignore_types(False, False, True)
self.callback = kwargs.get('callback')
else:
self._rt = rtmidi.MidiOut() # rtapi=rtapi)
# Turn of ignore of sysex, time and active_sensing.
ports = self._rt.ports
if virtual:
if self.name is None:
raise OSError('virtual port must have a name')
self._rt.open_virtual_port(self.name)
else:
if self.name is None:
# TODO: this could fail if list is empty.
# In RtMidi, the default port is the first port.
try:
self.name = ports[0]
except IndexError as ie:
raise OSError('no ports available') from ie
try:
port_id = ports.index(self.name)
except ValueError as ve:
raise OSError(f'unknown port {self.name!r}') from ve
try:
self._rt.open_port(port_id)
except RuntimeError as err:
raise OSError(*err.args) from err
# api = _api_to_name[self._rt.get_current_api()]
api = ''
self._device_type = f'RtMidi/{api}'
if virtual:
self._device_type = f'virtual {self._device_type}'
@property
def callback(self):
return self._callback
@callback.setter
def callback(self, func):
self._callback = func
if func is None:
self._rt.callback = None
else:
self._rt.callback = self._callback_wrapper
def _callback_wrapper(self, msg_bytes, timestamp):
self._parser.feed(msg_bytes)
for message in self._parser:
if self.callback:
self.callback(message)
def _close(self):
self._rt.close_port()
del self._rt # Virtual ports are closed when this is deleted.
class Input(PortCommon, BaseInput):
def _receive(self, block=True):
# Since there is no blocking read in RtMidi, the block
# flag is ignored and the enclosing receive() takes care
# of blocking.
while True:
message_data, timestamp = self._rt.get_message()
if message_data is None:
break
else:
self._parser.feed(message_data)
class Output(PortCommon, BaseOutput):
def _send(self, message):
self._rt.send_message(message.bytes())
|