File: listeners.py

package info (click to toggle)
python-molotov 2.7-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,264 kB
  • sloc: python: 4,121; makefile: 60
file content (141 lines) | stat: -rw-r--r-- 4,287 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import io

import aiohttp
from aiohttp.streams import DataQueue

from molotov.api import get_fixture

_UNREADABLE = "***WARNING: Molotov can't display this body***"
_BINARY = "**** Binary content ****"
_FILE = "**** File content ****"
_COMPRESSED = ("gzip", "compress", "deflate", "identity", "br")


class BaseListener:
    async def __call__(self, event, **options):
        attr = getattr(self, "on_" + event, None)
        if attr is not None:
            await attr(**options)


class Writer:
    def __init__(self):
        self.buffer = bytearray()

    async def write(self, data):
        self.buffer.extend(data)


class StdoutListener(BaseListener):
    def __init__(self, **options):
        self.verbose = options.get("verbose", 0)
        self.loop = options.pop("loop", None)
        self.console = options["console"]

    async def _body2str(self, body):
        if body is None:
            return ""

        if isinstance(body, aiohttp.multipart.MultipartWriter):
            writer = Writer()
            await body.write(writer)
            body = writer.buffer.decode("utf8")

        try:
            from aiohttp.payload import Payload
        except ImportError:
            Payload = None

        if Payload is not None and isinstance(body, Payload):
            body = body._value

        if isinstance(body, io.IOBase):
            return _FILE

        if not isinstance(body, str):
            try:
                body = str(body, "utf8")
            except UnicodeDecodeError:
                return _UNREADABLE

        return body

    async def on_sending_request(self, session, request):
        if self.verbose < 2:
            return
        raw = "\n" + request.method + " " + str(request.url)
        if len(request.headers) > 0:
            headers = "\n".join("{}: {}".format(k, v) for k, v in request.headers.items())
            raw += "\n" + headers
        if request.headers.get("Content-Encoding") in _COMPRESSED:
            raw += "\n\n" + _BINARY + "\n"
        elif request.body:
            str_body = await self._body2str(request.body)
            raw += "\n\n" + str_body + "\n"

        self.console.print_error(raw)

    async def on_response_received(self, session, response, request):
        if self.verbose < 2:
            return
        raw = "HTTP/1.1 %d %s\n" % (response.status, response.reason)
        items = response.headers.items()
        headers = "\n".join(f"{k}: {v}" for k, v in items)
        raw += headers
        if response.headers.get("Content-Encoding") in _COMPRESSED:
            raw += "\n\n" + _BINARY
        elif response.content:
            content = await response.content.read()
            if len(content) > 0:
                # put back the data in the content
                response.content = DataQueue(loop=self.loop)
                response.content.feed_data(content)
                response.content.feed_eof()
                try:
                    raw += "\n\n" + content.decode()
                except UnicodeDecodeError:
                    raw += "\n\n" + _UNREADABLE
            else:
                raw += "\n\n"

        self.console.print_error(raw)
        self.console.print_error("")
        self.console.print_error("")


class CustomListener:
    def __init__(self, fixture):
        self.fixture = fixture

    async def __call__(self, event, **options):
        await self.fixture(event, **options)


class EventSender:
    def __init__(self, console, listeners=None):
        self.console = console
        if listeners is None:
            listeners = []
        self._listeners = listeners
        self._stopped = False

        fixture_listeners = get_fixture("events")
        if fixture_listeners is not None:
            for listener in fixture_listeners:
                self.add_listener(CustomListener(listener))

    def add_listener(self, listener):
        self._listeners.append(listener)

    async def stop(self):
        self._stopped = True

    def stopped(self):
        return self._stopped

    async def send_event(self, event, *args, **options):
        for listener in self._listeners:
            try:
                await listener(event, *args, **options)
            except Exception as e:
                self.console.print_error(e)