1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
import io
import aiohttp
from aiohttp.streams import DataQueue
from molotov.api import get_fixture
_UNREADABLE = "***WARNING: Molotov can't display this body***"
_BINARY = "**** Binary content ****"
_FILE = "**** File content ****"
_COMPRESSED = ("gzip", "compress", "deflate", "identity", "br")
class BaseListener:
async def __call__(self, event, **options):
attr = getattr(self, "on_" + event, None)
if attr is not None:
await attr(**options)
class Writer:
def __init__(self):
self.buffer = bytearray()
async def write(self, data):
self.buffer.extend(data)
class StdoutListener(BaseListener):
def __init__(self, **options):
self.verbose = options.get("verbose", 0)
self.loop = options.pop("loop", None)
self.console = options["console"]
async def _body2str(self, body):
if body is None:
return ""
if isinstance(body, aiohttp.multipart.MultipartWriter):
writer = Writer()
await body.write(writer)
body = writer.buffer.decode("utf8")
try:
from aiohttp.payload import Payload
except ImportError:
Payload = None
if Payload is not None and isinstance(body, Payload):
body = body._value
if isinstance(body, io.IOBase):
return _FILE
if not isinstance(body, str):
try:
body = str(body, "utf8")
except UnicodeDecodeError:
return _UNREADABLE
return body
async def on_sending_request(self, session, request):
if self.verbose < 2:
return
raw = "\n" + request.method + " " + str(request.url)
if len(request.headers) > 0:
headers = "\n".join("{}: {}".format(k, v) for k, v in request.headers.items())
raw += "\n" + headers
if request.headers.get("Content-Encoding") in _COMPRESSED:
raw += "\n\n" + _BINARY + "\n"
elif request.body:
str_body = await self._body2str(request.body)
raw += "\n\n" + str_body + "\n"
self.console.print_error(raw)
async def on_response_received(self, session, response, request):
if self.verbose < 2:
return
raw = "HTTP/1.1 %d %s\n" % (response.status, response.reason)
items = response.headers.items()
headers = "\n".join(f"{k}: {v}" for k, v in items)
raw += headers
if response.headers.get("Content-Encoding") in _COMPRESSED:
raw += "\n\n" + _BINARY
elif response.content:
content = await response.content.read()
if len(content) > 0:
# put back the data in the content
response.content = DataQueue(loop=self.loop)
response.content.feed_data(content)
response.content.feed_eof()
try:
raw += "\n\n" + content.decode()
except UnicodeDecodeError:
raw += "\n\n" + _UNREADABLE
else:
raw += "\n\n"
self.console.print_error(raw)
self.console.print_error("")
self.console.print_error("")
class CustomListener:
def __init__(self, fixture):
self.fixture = fixture
async def __call__(self, event, **options):
await self.fixture(event, **options)
class EventSender:
def __init__(self, console, listeners=None):
self.console = console
if listeners is None:
listeners = []
self._listeners = listeners
self._stopped = False
fixture_listeners = get_fixture("events")
if fixture_listeners is not None:
for listener in fixture_listeners:
self.add_listener(CustomListener(listener))
def add_listener(self, listener):
self._listeners.append(listener)
async def stop(self):
self._stopped = True
def stopped(self):
return self._stopped
async def send_event(self, event, *args, **options):
for listener in self._listeners:
try:
await listener(event, *args, **options)
except Exception as e:
self.console.print_error(e)
|