File: web_log.py

package info (click to toggle)
python-aiohttp 3.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 5,612 kB
  • sloc: python: 36,917; ansic: 15,734; makefile: 365; sh: 83
file content (236 lines) | stat: -rw-r--r-- 8,270 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import datetime
import functools
import logging
import os
import re
from collections import namedtuple
from typing import Callable, Dict, Iterable, List, Tuple  # noqa

from .abc import AbstractAccessLogger
from .web_request import BaseRequest
from .web_response import StreamResponse


KeyMethod = namedtuple('KeyMethod', 'key method')


class AccessLogger(AbstractAccessLogger):
    """Helper object to log access.

    Usage:
        log = logging.getLogger("spam")
        log_format = "%a %{User-Agent}i"
        access_logger = AccessLogger(log, log_format)
        access_logger.log(request, response, time)

    Format:
        %%  The percent sign
        %a  Remote IP-address (IP-address of proxy if using reverse proxy)
        %t  Time when the request was started to process
        %P  The process ID of the child that serviced the request
        %r  First line of request
        %s  Response status code
        %b  Size of response in bytes, including HTTP headers
        %T  Time taken to serve the request, in seconds
        %Tf Time taken to serve the request, in seconds with floating fraction
            in .06f format
        %D  Time taken to serve the request, in microseconds
        %{FOO}i  request.headers['FOO']
        %{FOO}o  response.headers['FOO']
        %{FOO}e  os.environ['FOO']

    """
    LOG_FORMAT_MAP = {
        'a': 'remote_address',
        't': 'request_start_time',
        'P': 'process_id',
        'r': 'first_request_line',
        's': 'response_status',
        'b': 'response_size',
        'T': 'request_time',
        'Tf': 'request_time_frac',
        'D': 'request_time_micro',
        'i': 'request_header',
        'o': 'response_header',
    }

    LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
    FORMAT_RE = re.compile(r'%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)')
    CLEANUP_RE = re.compile(r'(%[^s])')
    _FORMAT_CACHE = {}  # type: Dict[str, Tuple[str, List[KeyMethod]]]

    def __init__(self, logger: logging.Logger,
                 log_format: str=LOG_FORMAT) -> None:
        """Initialise the logger.

        logger is a logger object to be used for logging.
        log_format is a string with apache compatible log format description.

        """
        super().__init__(logger, log_format=log_format)

        _compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
        if not _compiled_format:
            _compiled_format = self.compile_format(log_format)
            AccessLogger._FORMAT_CACHE[log_format] = _compiled_format

        self._log_format, self._methods = _compiled_format

    def compile_format(self, log_format: str) -> Tuple[str, List[KeyMethod]]:
        """Translate log_format into form usable by modulo formatting

        All known atoms will be replaced with %s
        Also methods for formatting of those atoms will be added to
        _methods in appropriate order

        For example we have log_format = "%a %t"
        This format will be translated to "%s %s"
        Also contents of _methods will be
        [self._format_a, self._format_t]
        These method will be called and results will be passed
        to translated string format.

        Each _format_* method receive 'args' which is list of arguments
        given to self.log

        Exceptions are _format_e, _format_i and _format_o methods which
        also receive key name (by functools.partial)

        """
        # list of (key, method) tuples, we don't use an OrderedDict as users
        # can repeat the same key more than once
        methods = list()

        for atom in self.FORMAT_RE.findall(log_format):
            if atom[1] == '':
                format_key1 = self.LOG_FORMAT_MAP[atom[0]]
                m = getattr(AccessLogger, '_format_%s' % atom[0])
                key_method = KeyMethod(format_key1, m)
            else:
                format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
                m = getattr(AccessLogger, '_format_%s' % atom[2])
                key_method = KeyMethod(format_key2,
                                       functools.partial(m, atom[1]))

            methods.append(key_method)

        log_format = self.FORMAT_RE.sub(r'%s', log_format)
        log_format = self.CLEANUP_RE.sub(r'%\1', log_format)
        return log_format, methods

    @staticmethod
    def _format_i(key: str,
                  request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        if request is None:
            return '(no headers)'

        # suboptimal, make istr(key) once
        return request.headers.get(key, '-')

    @staticmethod
    def _format_o(key: str,
                  request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        # suboptimal, make istr(key) once
        return response.headers.get(key, '-')

    @staticmethod
    def _format_a(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        if request is None:
            return '-'
        ip = request.remote
        return ip if ip is not None else '-'

    @staticmethod
    def _format_t(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        now = datetime.datetime.utcnow()
        start_time = now - datetime.timedelta(seconds=time)
        return start_time.strftime('[%d/%b/%Y:%H:%M:%S +0000]')

    @staticmethod
    def _format_P(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        return "<%s>" % os.getpid()

    @staticmethod
    def _format_r(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        if request is None:
            return '-'
        return '%s %s HTTP/%s.%s' % (request.method, request.path_qs,
                                     request.version.major,
                                     request.version.minor)

    @staticmethod
    def _format_s(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> int:
        return response.status

    @staticmethod
    def _format_b(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> int:
        return response.body_length

    @staticmethod
    def _format_T(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        return str(round(time))

    @staticmethod
    def _format_Tf(request: BaseRequest,
                   response: StreamResponse,
                   time: float) -> str:
        return '%06f' % time

    @staticmethod
    def _format_D(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        return str(round(time * 1000000))

    def _format_line(self,
                     request: BaseRequest,
                     response: StreamResponse,
                     time: float) -> Iterable[Tuple[str,
                                                    Callable[[BaseRequest,
                                                              StreamResponse,
                                                              float],
                                                             str]]]:
        return [(key, method(request, response, time))
                for key, method in self._methods]

    def log(self,
            request: BaseRequest,
            response: StreamResponse,
            time: float) -> None:
        try:
            fmt_info = self._format_line(request, response, time)

            values = list()
            extra = dict()
            for key, value in fmt_info:
                values.append(value)

                if key.__class__ is str:
                    extra[key] = value
                else:
                    k1, k2 = key
                    dct = extra.get(k1, {})
                    dct[k2] = value  # type: ignore
                    extra[k1] = dct  # type: ignore

            self.logger.info(self._log_format % tuple(values), extra=extra)
        except Exception:
            self.logger.exception("Error in logging")