File: session.py

package info (click to toggle)
python-molotov 2.7-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,268 kB
  • sloc: python: 4,121; makefile: 60
file content (120 lines) | stat: -rw-r--r-- 3,932 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import socket
from collections import namedtuple
from time import perf_counter
from types import SimpleNamespace

from aiohttp import TCPConnector, TraceConfig
from aiohttp.client import ClientRequest, ClientResponse, ClientSession

from molotov.api import create_session
from molotov.listeners import EventSender, StdoutListener

_HOST = socket.gethostname()


class LoggedClientRequest(ClientRequest):
    """Printable Request."""

    tracer = None

    async def send(self, *args, **kw):
        if self.tracer:
            await self.tracer.send_event("sending_request", request=self)
        response = await super().send(*args, **kw)
        response.request = self
        return response


class LoggedClientResponse(ClientResponse):
    request = None


class SessionTracer(TraceConfig):
    def __init__(self, loop, console, verbose, statsd):
        super().__init__(trace_config_ctx_factory=self._trace_config_ctx_factory)
        self.loop = loop
        self.console = console
        self.verbose = verbose
        self.eventer = EventSender(
            console,
            [StdoutListener(verbose=self.verbose, console=self.console, loop=self.loop)],
        )
        self.on_request_start.append(self._request_start)
        self.on_request_end.append(self._request_end)
        self.context = namedtuple("context", ["statsd"])
        self.context.statsd = statsd

    def _trace_config_ctx_factory(self, trace_request_ctx):
        return SimpleNamespace(trace_request_ctx=trace_request_ctx, context=self.context)

    def add_listener(self, listener):
        return self.eventer.add_listener(listener)

    async def send_event(self, event, **options):
        await self.eventer.send_event(event, session=self, **options)

    async def _request_start(self, session, trace_config_ctx, params):
        if self.context.statsd:
            prefix = "molotov.%(hostname)s.%(method)s.%(host)s.%(path)s"
            data = {
                "method": params.method,
                "hostname": _HOST,
                "host": params.url.host,
                "path": params.url.path,
            }
            label = prefix % data
            trace_config_ctx.start = perf_counter()
            trace_config_ctx.label = label
            trace_config_ctx.data = data

    async def _request_end(self, session, trace_config_ctx, params):
        if self.context.statsd:
            duration = int((perf_counter() - trace_config_ctx.start) * 1000)
            self.context.statsd.timing(trace_config_ctx.label, value=duration)
            self.context.statsd.increment(
                trace_config_ctx.label + "." + str(params.response.status)
            )
        await self.send_event(
            "response_received",
            response=params.response,
            request=params.response.request,
        )


def get_session(loop, console, verbose=0, statsd=None, kind="http", **kw):
    trace_config = SessionTracer(loop, console, verbose, statsd)

    if kind != "http":
        return create_session(kind, loop, console, verbose, statsd, trace_config, **kw)

    connector = kw.pop("connector", None)
    if connector is None:
        connector = TCPConnector(limit=None, ttl_dns_cache=None)

    request_class = LoggedClientRequest
    request_class.verbose = verbose
    request_class.response_class = LoggedClientResponse
    request_class.tracer = trace_config
    session = ClientSession(
        request_class=request_class,
        response_class=LoggedClientResponse,
        connector=connector,
        trace_configs=[trace_config],
        **kw,
    )

    return session


def get_eventer(session):
    for trace in session._trace_configs:
        if isinstance(trace, SessionTracer):
            return trace
    return None


def get_context(session):
    for trace in session._trace_configs:
        if isinstance(trace, SessionTracer):
            return trace.context
    return None