1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
# Copyright (C) 2015-2016, 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of pyrcb2.
#
# pyrcb2 is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# As an additional permission under GNU GPL version 3 section 7, you may
# distribute non-source forms of comments (lines beginning with "#") and
# strings (text enclosed in quotation marks) in pyrcb2 source code without
# the copy of the GNU GPL normally required by section 4, provided you
# include a URL through which recipients can obtain a copy of the
# Corresponding Source and the GPL at no charge.
#
# pyrcb2 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with pyrcb2. If not, see <http://www.gnu.org/licenses/>.
from unittest import mock
import asyncio
import time
class BaseMock(mock.Mock):
# Override in subclasses.
spec = None
def __init__(self, spec=None, **kwargs):
spec = spec or self.spec
kwargs.setdefault("side_effect", TypeError("Object is not callable."))
super().__init__(spec=spec, **kwargs)
def wrap_mock(self, *names):
for name in names:
method = getattr(self, name)
mock_method = mock.Mock(spec=method, side_effect=method)
setattr(self, name, mock_method)
@classmethod
def get_mock_class(cls, instance=None, spec=None):
spec = spec or cls.spec
instance = instance or cls(spec=spec)
mock_cls = mock.Mock(spec=spec, return_value=instance)
return mock_cls
def _get_child_mock(self, **kwargs):
return mock.Mock(**kwargs)
class MockReader(BaseMock):
spec = asyncio.StreamReader
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.wrap_mock("readline")
self.reset()
def reset(self):
self.lines = asyncio.Queue()
self.lines_empty = asyncio.Event()
self.alive = True
def add_line(self, line):
self.lines.put_nowait(line)
self.lines_empty = asyncio.Event()
async def readline(self):
if not self.alive:
return b""
if self.lines.empty():
self.lines_empty.set()
line = await self.lines.get()
if line is None:
self.alive = False
return b""
return line.encode() + b"\r\n"
class MockWriter(BaseMock):
spec = asyncio.StreamWriter
def __init__(self, clock, reader=None, **kwargs):
super().__init__(**kwargs)
self.wrap_mock("write", "close")
self.clock = clock
self.reader = reader
self.lines = []
self.lines_with_time = []
self.data_received = asyncio.Event()
def write(self, data):
line = data.decode().rstrip("\r\n")
self.lines.append(line)
self.lines_with_time.append((line, self.clock.time))
self.data_received.set()
self.data_received = asyncio.Event()
def close(self):
if self.reader is not None:
self.reader.alive = False
if self.reader.lines.empty():
self.reader.lines.put_nowait(None)
class MockOpenConnection(BaseMock):
def __init__(self, reader, writer, **kwargs):
super().__init__(side_effect=self._call, **kwargs)
self.reader = reader
self.writer = writer
def _call(self, *args, **kwargs):
future = asyncio.get_event_loop().create_future()
future.set_result((self.reader, self.writer))
return future
class MockClock(BaseMock):
spec = time.monotonic
def __init__(self, **kwargs):
super().__init__(side_effect=self._call, **kwargs)
self.time = 0
def _call(self):
return self.time
class MockAsyncSleep(BaseMock):
spec = asyncio.sleep
def __init__(self, clock, **kwargs):
super().__init__(side_effect=self._call, **kwargs)
self.clock = clock
def _call(self, delay, result=None):
# Keep asyncio.sleep()'s original behavior if delay is 0.
if delay == 0:
@asyncio.coroutine
def coroutine():
yield
return coroutine()
self.clock.time += delay
future = asyncio.get_event_loop().create_future()
future.set_result(result)
return future
|