File: http_protocol.py

package info (click to toggle)
python-daphne 4.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 392 kB
  • sloc: python: 2,593; makefile: 28
file content (416 lines) | stat: -rwxr-xr-x 16,424 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import logging
import time
import traceback
from urllib.parse import unquote

from twisted.internet.defer import inlineCallbacks, maybeDeferred
from twisted.internet.interfaces import IProtocolNegotiationFactory
from twisted.protocols.policies import ProtocolWrapper
from twisted.web import http
from zope.interface import implementer

from .utils import HEADER_NAME_RE, parse_x_forwarded_for

logger = logging.getLogger(__name__)


class WebRequest(http.Request):
    """
    Request that either hands off information to channels, or offloads
    to a WebSocket class.

    Does some extra processing over the normal Twisted Web request to separate
    GET and POST out.
    """

    error_template = (
        """
        <html>
            <head>
                <title>%(title)s</title>
                <style>
                    body { font-family: sans-serif; margin: 0; padding: 0; }
                    h1 { padding: 0.6em 0 0.2em 20px; color: #896868; margin: 0; }
                    p { padding: 0 0 0.3em 20px; margin: 0; }
                    footer { padding: 1em 0 0.3em 20px; color: #999; font-size: 80%%; font-style: italic; }
                </style>
            </head>
            <body>
                <h1>%(title)s</h1>
                <p>%(body)s</p>
                <footer>Daphne</footer>
            </body>
        </html>
    """.replace(
            "\n", ""
        )
        .replace("    ", " ")
        .replace("   ", " ")
        .replace("  ", " ")
    )  # Shorten it a bit, bytes wise

    def __init__(self, *args, **kwargs):
        self.client_addr = None
        self.server_addr = None
        try:
            http.Request.__init__(self, *args, **kwargs)
            # Easy server link
            self.server = self.channel.factory.server
            self.application_queue = None
            self._response_started = False
            self.server.protocol_connected(self)
        except Exception:
            logger.error(traceback.format_exc())
            raise

    ### Twisted progress callbacks

    @inlineCallbacks
    def process(self):
        try:
            self.request_start = time.time()

            # Validate header names.
            for name, _ in self.requestHeaders.getAllRawHeaders():
                if not HEADER_NAME_RE.fullmatch(name):
                    self.basic_error(400, b"Bad Request", "Invalid header name")
                    return

            # Get upgrade header
            upgrade_header = None
            if self.requestHeaders.hasHeader(b"Upgrade"):
                upgrade_header = self.requestHeaders.getRawHeaders(b"Upgrade")[0]
            # Get client address if possible
            if hasattr(self.client, "host") and hasattr(self.client, "port"):
                # client.host and host.host are byte strings in Python 2, but spec
                # requires unicode string.
                self.client_addr = [str(self.client.host), self.client.port]
                self.server_addr = [str(self.host.host), self.host.port]

            self.client_scheme = "https" if self.isSecure() else "http"

            # See if we need to get the address from a proxy header instead
            if self.server.proxy_forwarded_address_header:
                self.client_addr, self.client_scheme = parse_x_forwarded_for(
                    self.requestHeaders,
                    self.server.proxy_forwarded_address_header,
                    self.server.proxy_forwarded_port_header,
                    self.server.proxy_forwarded_proto_header,
                    self.client_addr,
                    self.client_scheme,
                )
            # Check for unicodeish path (or it'll crash when trying to parse)
            try:
                self.path.decode("ascii")
            except UnicodeDecodeError:
                self.path = b"/"
                self.basic_error(400, b"Bad Request", "Invalid characters in path")
                return
            # Calculate query string
            self.query_string = b""
            if b"?" in self.uri:
                self.query_string = self.uri.split(b"?", 1)[1]
                try:
                    self.query_string.decode("ascii")
                except UnicodeDecodeError:
                    self.basic_error(400, b"Bad Request", "Invalid query string")
                    return
            # Is it WebSocket? IS IT?!
            if upgrade_header and upgrade_header.lower() == b"websocket":
                # Make WebSocket protocol to hand off to
                protocol = self.server.ws_factory.buildProtocol(
                    self.transport.getPeer()
                )
                if not protocol:
                    # If protocol creation fails, we signal "internal server error"
                    self.setResponseCode(500)
                    logger.warn("Could not make WebSocket protocol")
                    self.finish()
                # Give it the raw query string
                protocol._raw_query_string = self.query_string
                # Port across transport
                transport, self.transport = self.transport, None
                if isinstance(transport, ProtocolWrapper):
                    # i.e. TLS is a wrapping protocol
                    transport.wrappedProtocol = protocol
                else:
                    transport.protocol = protocol
                protocol.makeConnection(transport)
                # Re-inject request
                data = self.method + b" " + self.uri + b" HTTP/1.1\x0d\x0a"
                for h in self.requestHeaders.getAllRawHeaders():
                    data += h[0] + b": " + b",".join(h[1]) + b"\x0d\x0a"
                data += b"\x0d\x0a"
                data += self.content.read()
                protocol.dataReceived(data)
                # Remove our HTTP reply channel association
                logger.debug("Upgraded connection %s to WebSocket", self.client_addr)
                self.server.protocol_disconnected(self)
                # Resume the producer so we keep getting data, if it's available as a method
                self.channel._networkProducer.resumeProducing()

            # Boring old HTTP.
            else:
                # Sanitize and decode headers, potentially extracting root path
                self.clean_headers = []
                self.root_path = self.server.root_path
                for name, values in self.requestHeaders.getAllRawHeaders():
                    # Prevent CVE-2015-0219
                    if b"_" in name:
                        continue
                    for value in values:
                        if name.lower() == b"daphne-root-path":
                            self.root_path = unquote(value.decode("ascii"))
                        else:
                            self.clean_headers.append((name.lower(), value))
                logger.debug("HTTP %s request for %s", self.method, self.client_addr)
                self.content.seek(0, 0)
                # Work out the application scope and create application
                self.application_queue = yield maybeDeferred(
                    self.server.create_application,
                    self,
                    {
                        "type": "http",
                        # TODO: Correctly say if it's 1.1 or 1.0
                        "http_version": self.clientproto.split(b"/")[-1].decode(
                            "ascii"
                        ),
                        "method": self.method.decode("ascii"),
                        "path": unquote(self.path.decode("ascii")),
                        "raw_path": self.path,
                        "root_path": self.root_path,
                        "scheme": self.client_scheme,
                        "query_string": self.query_string,
                        "headers": self.clean_headers,
                        "client": self.client_addr,
                        "server": self.server_addr,
                    },
                )
                # Check they didn't close an unfinished request
                if self.application_queue is None or self.content.closed:
                    # Not much we can do, the request is prematurely abandoned.
                    return
                # Run application against request
                buffer_size = self.server.request_buffer_size
                while True:
                    chunk = self.content.read(buffer_size)
                    more_body = not (len(chunk) < buffer_size)
                    payload = {
                        "type": "http.request",
                        "body": chunk,
                        "more_body": more_body,
                    }
                    self.application_queue.put_nowait(payload)
                    if not more_body:
                        break

        except Exception:
            logger.error(traceback.format_exc())
            self.basic_error(
                500, b"Internal Server Error", "Daphne HTTP processing error"
            )

    def connectionLost(self, reason):
        """
        Cleans up reply channel on close.
        """
        if self.application_queue:
            self.send_disconnect()
        logger.debug("HTTP disconnect for %s", self.client_addr)
        http.Request.connectionLost(self, reason)
        self.server.protocol_disconnected(self)

    def finish(self):
        """
        Cleans up reply channel on close.
        """
        if self.application_queue:
            self.send_disconnect()
        logger.debug("HTTP close for %s", self.client_addr)
        http.Request.finish(self)
        self.server.protocol_disconnected(self)

    ### Server reply callbacks

    def handle_reply(self, message):
        """
        Handles a reply from the client
        """
        # Handle connections that are already closed
        if self.finished or self.channel is None:
            return
        # Check message validity
        if "type" not in message:
            raise ValueError("Message has no type defined")
        # Handle message
        if message["type"] == "http.response.start":
            if self._response_started:
                raise ValueError("HTTP response has already been started")
            self._response_started = True
            if "status" not in message:
                raise ValueError(
                    "Specifying a status code is required for a Response message."
                )
            # Set HTTP status code
            self.setResponseCode(message["status"])
            # Write headers
            for header, value in message.get("headers", {}):
                self.responseHeaders.addRawHeader(header, value)
            if self.server.server_name and not self.responseHeaders.hasHeader("server"):
                self.setHeader(b"server", self.server.server_name.encode())
            logger.debug(
                "HTTP %s response started for %s", message["status"], self.client_addr
            )
        elif message["type"] == "http.response.body":
            if not self._response_started:
                raise ValueError(
                    "HTTP response has not yet been started but got %s"
                    % message["type"]
                )
            # Write out body
            http.Request.write(self, message.get("body", b""))
            # End if there's no more content
            if not message.get("more_body", False):
                self.finish()
                logger.debug("HTTP response complete for %s", self.client_addr)
                try:
                    uri = self.uri.decode("ascii")
                except UnicodeDecodeError:
                    # The path is malformed somehow - do our best to log something
                    uri = repr(self.uri)
                try:
                    self.server.log_action(
                        "http",
                        "complete",
                        {
                            "path": uri,
                            "status": self.code,
                            "method": self.method.decode("ascii", "replace"),
                            "client": (
                                "%s:%s" % tuple(self.client_addr)
                                if self.client_addr
                                else None
                            ),
                            "time_taken": self.duration(),
                            "size": self.sentLength,
                        },
                    )
                except Exception:
                    logger.error(traceback.format_exc())
            else:
                logger.debug("HTTP response chunk for %s", self.client_addr)
        else:
            raise ValueError("Cannot handle message type %s!" % message["type"])

    def handle_exception(self, exception):
        """
        Called by the server when our application tracebacks
        """
        self.basic_error(500, b"Internal Server Error", "Exception inside application.")

    def check_timeouts(self):
        """
        Called periodically to see if we should timeout something
        """
        # Web timeout checking
        if self.server.http_timeout and self.duration() > self.server.http_timeout:
            if self._response_started:
                logger.warning("Application timed out while sending response")
                self.finish()
            else:
                self.basic_error(
                    503,
                    b"Service Unavailable",
                    "Application failed to respond within time limit.",
                )

    ### Utility functions

    def send_disconnect(self):
        """
        Sends a http.disconnect message.
        Useful only really for long-polling.
        """
        # If we don't yet have a path, then don't send as we never opened.
        if self.path:
            self.application_queue.put_nowait({"type": "http.disconnect"})

    def duration(self):
        """
        Returns the time since the start of the request.
        """
        if not hasattr(self, "request_start"):
            return 0
        return time.time() - self.request_start

    def basic_error(self, status, status_text, body):
        """
        Responds with a server-level error page (very basic)
        """
        self.handle_reply(
            {
                "type": "http.response.start",
                "status": status,
                "headers": [(b"Content-Type", b"text/html; charset=utf-8")],
            }
        )
        self.handle_reply(
            {
                "type": "http.response.body",
                "body": (
                    self.error_template
                    % {
                        "title": str(status) + " " + status_text.decode("ascii"),
                        "body": body,
                    }
                ).encode("utf8"),
            }
        )

    def __hash__(self):
        return hash(id(self))

    def __eq__(self, other):
        return id(self) == id(other)


@implementer(IProtocolNegotiationFactory)
class HTTPFactory(http.HTTPFactory):
    """
    Factory which takes care of tracking which protocol
    instances or request instances are responsible for which
    named response channels, so incoming messages can be
    routed appropriately.
    """

    def __init__(self, server):
        http.HTTPFactory.__init__(self)
        self.server = server

    def buildProtocol(self, addr):
        """
        Builds protocol instances. This override is used to ensure we use our
        own Request object instead of the default.
        """
        try:
            protocol = http.HTTPFactory.buildProtocol(self, addr)
            protocol.requestFactory = WebRequest
            return protocol
        except Exception:
            logger.error("Cannot build protocol: %s" % traceback.format_exc())
            raise

    # IProtocolNegotiationFactory
    def acceptableProtocols(self):
        """
        Protocols this server can speak after ALPN negotiation. Currently that
        is HTTP/1.1 and optionally HTTP/2. Websockets cannot be negotiated
        using ALPN, so that doesn't go here: anyone wanting websockets will
        negotiate HTTP/1.1 and then do the upgrade dance.
        """
        baseProtocols = [b"http/1.1"]

        if http.H2_ENABLED:
            baseProtocols.insert(0, b"h2")

        return baseProtocols