File: __main__.py

package info (click to toggle)
python-rtmidi 1.5.8-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,248 kB
  • sloc: cpp: 4,228; python: 2,853; makefile: 287; sh: 109; ansic: 19
file content (133 lines) | stat: -rw-r--r-- 3,667 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# midifilter/__main__.py
#
"""Simple MIDI filter / processor."""

from __future__ import absolute_import

import argparse
import logging
import sys
import threading
import time

try:
    import Queue as queue
except ImportError:  # Python 3
    import queue

from rtmidi.midiutil import open_midiport

from .filters import MapControllerValue, MonoPressureToCC, Transpose


log = logging.getLogger("midifilter")


class MidiDispatcher(threading.Thread):
    def __init__(self, midiin, midiout, *filters):
        super(MidiDispatcher, self).__init__()
        self.midiin = midiin
        self.midiout = midiout
        self.filters = filters
        self._wallclock = time.time()
        self.queue = queue.Queue()

    def __call__(self, event, data=None):
        message, deltatime = event
        self._wallclock += deltatime
        log.debug("IN: @%0.6f %r", self._wallclock, message)
        self.queue.put((message, self._wallclock))

    def run(self):
        log.debug("Attaching MIDI input callback handler.")
        self.midiin.set_callback(self)

        while True:
            event = self.queue.get()

            if event is None:
                break

            events = [event]

            for filter_ in self.filters:
                events = list(filter_.process(events))

            for event in events:
                log.debug("Out: @%0.6f %r", event[1], event[0])
                self.midiout.send_message(event[0])

    def stop(self):
        self.queue.put(None)


def main(args=None):
    parser = argparse.ArgumentParser(prog='midifilter', description=__doc__)
    padd = parser.add_argument
    padd('-m', '--mpresstocc', action="store_true",
         help='Map mono pressure (channel aftertouch) to CC')
    padd('-r', '--mapccrange', action="store_true",
         help='Map controller value range to min/max value range')
    padd('-t', '--transpose', action="store_true",
         help='Transpose note on/off event note values')
    padd('-i', '--inport',
         help='MIDI input port number (default: ask)')
    padd('-o', '--outport',
         help='MIDI output port number (default: ask)')
    padd('-v', '--verbose', action="store_true",
         help='verbose output')
    padd('filterargs', nargs="*", type=int,
         help='MIDI filter argument(s)')

    args = parser.parse_args(args)

    logging.basicConfig(format="%(name)s: %(levelname)s - %(message)s",
                        level=logging.DEBUG if args.verbose else logging.INFO)

    try:
        midiin, inport_name = open_midiport(args.inport, "input")
        midiout, outport_name = open_midiport(args.outport, "output")
    except IOError as exc:
        print(exc)
        return 1
    except (EOFError, KeyboardInterrupt):
        return 0

    filters = []
    #filters = [CCToBankChange(cc=99, channel=15, msb=0, lsb=1, program=99)]

    if args.transpose:
        filters.append(Transpose(transpose=args.filterargs[0]))
    if args.mpresstocc:
        filters.append(MonoPressureToCC(cc=args.filterargs[0]))
    if args.mapccrange:
        filters.append(MapControllerValue(*args.filterargs))

    dispatcher = MidiDispatcher(midiin, midiout, *filters)

    print("Entering main loop. Press Control-C to exit.")
    try:
        dispatcher.start()
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        dispatcher.stop()
        dispatcher.join()
        print('')
    finally:
        print("Exit.")

        midiin.close_port()
        midiout.close_port()

        del midiin
        del midiout

    return 0


if __name__ == '__main__':
    sys.exit(main() or 0)