File: rtmidi_python.py

package info (click to toggle)
python-mido 1.3.3-0.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 920 kB
  • sloc: python: 4,006; makefile: 127; sh: 4
file content (137 lines) | stat: -rw-r--r-- 3,891 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# SPDX-FileCopyrightText: 2013 Ole Martin Bjorndalen <ombdalen@gmail.com>
#
# SPDX-License-Identifier: MIT

"""Backend for rtmidi-python:

https://pypi.python.org/pypi/rtmidi-python

To use this backend copy (or link) it to somewhere in your Python path
and call:

    mido.set_backend('mido.backends.rtmidi_python')

or set shell variable $MIDO_BACKEND to mido.backends.rtmidi_python

TODO:

* add support for APIs.

* active_sensing is still filtered. (The same is true for
  mido.backends.rtmidi.)There may be a way to remove this filtering.

"""
import queue

import rtmidi_python as rtmidi

# TODO: change this to a relative import if the backend is included in
# the package.
from ..ports import BaseInput, BaseOutput


def get_devices(api=None, **kwargs):
    devices = {}

    input_names = rtmidi.MidiIn().ports
    output_names = rtmidi.MidiOut().ports

    for name in input_names + output_names:
        if name not in devices:
            devices[name] = {
                'name': name,
                'is_input': name in input_names,
                'is_output': name in output_names,
            }

    return list(devices.values())


class PortCommon:
    def _open(self, virtual=False, **kwargs):

        self._queue = queue.Queue()
        self._callback = None

        # rtapi = _get_api_id(api)
        opening_input = hasattr(self, 'receive')

        if opening_input:
            self._rt = rtmidi.MidiIn()
            self._rt.ignore_types(False, False, True)
            self.callback = kwargs.get('callback')
        else:
            self._rt = rtmidi.MidiOut()  # rtapi=rtapi)
            # Turn of ignore of sysex, time and active_sensing.

        ports = self._rt.ports

        if virtual:
            if self.name is None:
                raise OSError('virtual port must have a name')
            self._rt.open_virtual_port(self.name)
        else:
            if self.name is None:
                # TODO: this could fail if list is empty.
                # In RtMidi, the default port is the first port.
                try:
                    self.name = ports[0]
                except IndexError as ie:
                    raise OSError('no ports available') from ie

            try:
                port_id = ports.index(self.name)
            except ValueError as ve:
                raise OSError(f'unknown port {self.name!r}') from ve

            try:
                self._rt.open_port(port_id)
            except RuntimeError as err:
                raise OSError(*err.args) from err

        # api = _api_to_name[self._rt.get_current_api()]
        api = ''
        self._device_type = f'RtMidi/{api}'
        if virtual:
            self._device_type = f'virtual {self._device_type}'

    @property
    def callback(self):
        return self._callback

    @callback.setter
    def callback(self, func):
        self._callback = func
        if func is None:
            self._rt.callback = None
        else:
            self._rt.callback = self._callback_wrapper

    def _callback_wrapper(self, msg_bytes, timestamp):
        self._parser.feed(msg_bytes)
        for message in self._parser:
            if self.callback:
                self.callback(message)

    def _close(self):
        self._rt.close_port()
        del self._rt  # Virtual ports are closed when this is deleted.


class Input(PortCommon, BaseInput):
    def _receive(self, block=True):
        # Since there is no blocking read in RtMidi, the block
        # flag is ignored and the enclosing receive() takes care
        # of blocking.

        while True:
            message_data, timestamp = self._rt.get_message()
            if message_data is None:
                break
            else:
                self._parser.feed(message_data)


class Output(PortCommon, BaseOutput):
    def _send(self, message):
        self._rt.send_message(message.bytes())