1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
# SPDX-FileCopyrightText: 2017 Ole Martin Bjorndalen <ombdalen@gmail.com>
#
# SPDX-License-Identifier: MIT
import pytest
from mido.messages import Message
from mido.ports import BaseIOPort
class TestIOPort:
class Port(BaseIOPort):
def _open(self):
self.close_called = False
def _send(self, message):
self._messages.append(message)
def _close(self):
self.close_called = True
@pytest.fixture
def port(self):
with self.Port('Name') as p:
yield p
def test_basic(self, port):
assert port.name == 'Name'
assert not port.closed
assert port._messages is port._parser.messages
with pytest.raises(TypeError):
port.send('not a message')
def test_recv_non_blocking(self, port):
message = Message('note_on')
port.send(message)
port.poll()
assert port.poll() is None
def test_send_message(self, port):
message = Message('note_on')
port.send(message)
port.send(message)
def test_port_close(self, port):
port.close()
assert port.close_called
port.close_called = False
port.close()
assert port.closed
assert not port.close_called
def test_mido_port_non_blocking_recv(self, port):
assert port.receive(block=False) is None
def test_close_inside_iteration():
# This type of port can close when it runs out of messages.
# (And example of this is socket ports.)
#
# Iteration should then stop after all messages in the
# internal queue have been received.
message = Message('note_on')
class Port(BaseIOPort):
def __init__(self, messages):
BaseIOPort.__init__(self)
# Simulate some messages that arrived
# earlier.
self._messages.extend(messages)
self.closed = False
def _receive(self, block=True):
# Oops, the other end hung up.
if self._messages:
return self._messages.popleft()
else:
self.close()
return None
message = Message('note_on')
with Port([message, message]) as port:
assert len(list(port)) == 2
|