1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
# SPDX-FileCopyrightText: 2013 Ole Martin Bjorndalen <ombdalen@gmail.com>
#
# SPDX-License-Identifier: MIT
"""A port interface around a queue.
This allows you to create internal ports to send messages between threads.
"""
import queue
import mido
from mido import ports
class QueuePort(ports.BaseIOPort):
# We don't need locking since the queue takes care of that.
_locking = False
def _open(self):
self.queue = queue.Queue()
def _send(self, msg):
self.queue.put(msg)
def _receive(self, block=True):
try:
return self.queue.get(block=block)
except queue.Empty:
return None
def _device_type(self):
return 'Queue'
def main():
msg = mido.Message('clock')
port = QueuePort()
print(port.poll())
port.send(msg)
print(port.receive())
if __name__ == '__main__':
main()
|