File: webserver.py

package info (click to toggle)
python-stubserver 1.1-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 188 kB
  • sloc: python: 574; sh: 8; makefile: 5
file content (274 lines) | stat: -rwxr-xr-x 9,143 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import sys
import threading
import re
import time
if sys.version_info[0] < 3:
    import BaseHTTPServer
else:
    import http.server as BaseHTTPServer
if sys.version_info[0] < 3:
    from urllib import urlopen
else:
    from urllib.request import urlopen


class StoppableHTTPServer(BaseHTTPServer.HTTPServer):
    """
    Python 2.5 HTTPServer does not close down properly when calling server_close.
    The implementation below was based on the comments in the below article:
    http://stackoverflow.com/questions/268629/how-to-stop-basehttpserver-serveforever-in-a-basehttprequesthandler-subclass
    """
    stopped = False
    allow_reuse_address = True

    def __init__(self, *args, **kw):
        BaseHTTPServer.HTTPServer.__init__(self, *args, **kw)

    def serve_forever(self):
        while not self.stopped:
            self.handle_request()

    def server_close(self):
        BaseHTTPServer.HTTPServer.server_close(self)
        self.stopped = True
        self._create_dummy_request()
        time.sleep(0.5)

    def shutdown(self):
        pass

    def _create_dummy_request(self):
        f = urllib.urlopen("http://localhost:" + str(self.server_port) + "/__shutdown")
        f.read()
        f.close()


if sys.version_info < (2, 6):
    HTTPServer = StoppableHTTPServer
else:
    HTTPServer = BaseHTTPServer.HTTPServer


class StubServer(object):
    def __init__(self, port=8080, address='localhost'):
        self._expectations = []
        self.port = port
        self.address = address

    def run(self):
        server_address = (self.address, self.port)
        self.httpd = HTTPServer(server_address, StubResponse(self._expectations))
        thread = threading.Thread(target=self._run)
        thread.start()

    def stop(self):
        self.httpd.shutdown()
        self.httpd.server_close()
        self.verify()

    def _run(self):
        try:
            self.httpd.serve_forever()
        except:
            pass

    def verify(self):
        """
        Check all exceptation has been made.

        :raises: Exception: If one them isn't made.
        """
        failures = []
        for expectation in self._expectations:
            if not expectation.satisfied:
                failures.append(str(expectation))
        del self._expectations[:]
        if failures:
            raise Exception("Unsatisfied expectations: " + "\n".join(failures))

    def expect(self, method="GET", url="^UrlRegExpMather$", data=None, data_capture=None,
               file_content=None):
        """
        Prepare :class:`StubServer` to handle an HTTP request.

        :param method: HTTP method
        :type method: ``str``

        :param url: Regex matching with path part of an URL
        :type url: Raw ``str``

        :param data: Excepted data
        :type data: ``None`` or other

        :param data_capture: Dictionary given by user for gather data returned
                             by server.
        :type data_capture: ``dict``

        :param file_content: Unsed

        :return: Expectation object initilized
        :rtype: :class:`Expectation`
        """
        expected = Expectation(method, url, data, data_capture)
        self._expectations.append(expected)
        return expected


class Expectation(object):
    def __init__(self, method, url, data, data_capture):
        """
        :param method: HTTP method
        :type method: ``str``

        :param url: Regex matching with path part of an URL
        :type url: ``str``

        :param data: Excepted data
        :type data: ``None`` or other

        :param data_capture: Dictionary given by user for gather data returned
                             by server.
        :type data_capture: ``dict``
        """
        if data_capture is None:
            data_capture = {}
        self.method = method
        self.url = url
        self.data = data
        self.data_capture = data_capture
        self.satisfied = False

    def and_return(self, mime_type="text/html", reply_code=200, content="", file_content=None, headers=None):
        """
        Define the response created by the expectation.

        :param mime_type: Define content type of HTTP response
        :type mime_type: ``str``

        :param reply_code: Define response code of HTTP response
        :type reply_code: ``int``

        :param content: Define response's content
        :type content: ``str``

        :param file_content: Define response's content from a file
        :type file_content: ``str``

        :param headers: Additional HTTP header fields to be sent
        :type headers: ``iterable of tuples (header field name, value)``
        """
        if file_content:
            f = open(file_content, "r")
            content = f.read()
            f.close()
        self.response = (reply_code, mime_type, content, headers)

    def __str__(self):
        return "%s %s \n data_capture: %s\n" % (self.method, self.url, self.data_capture)


class StubResponse(BaseHTTPServer.BaseHTTPRequestHandler):
    def __call__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        try:
            self.setup()
            self.handle()
        finally:
            self.finish()

    def __init__(self, expectations):
        self.expected = expectations

    def _get_data(self):
        max_chunk_size = 10 * 1024 * 1024
        if "content-length" not in self.headers:
            return b''
        size_remaining = int(self.headers["content-length"])
        data = []
        while size_remaining:
            chunk_size = min(size_remaining, max_chunk_size)
            data.append(self.rfile.read(chunk_size))
            size_remaining -= len(data[-1])
        return b''.join(data)

    def handle_one_request(self):
        """Handle a single HTTP request.

        You normally don't need to override this method; see the class
        __doc__ string for information on how to handle specific HTTP
        commands such as GET and POST.
        """

        def send_headers(exp):
            self.send_header("Content-Type", exp.response[1])
            headers = exp.response[3]
            if headers:
                for header in headers:
                    self.send_header(header[0], header[1])
            self.end_headers()

        self.raw_requestline = self.rfile.readline()
        if not self.raw_requestline:
            self.close_connection = 1
            return
        if not self.parse_request():  # An error code has been sent, just exit
            return
        method = self.command
        if self.path == "/__shutdown":
            self.send_response(200, "Python")

        data = self._get_data().decode('utf-8')

        expectations_matching_url = [x for x in self.expected if re.search(x.url, self.path)]
        expectations_matching_method = [x for x in expectations_matching_url if x.method == method]
        matching_expectations = [x for x in expectations_matching_method if not x.satisfied]
        matching_expectations_with_data = [x for x in matching_expectations if x.data and x.data == data]

        err_code = err_message = err_body = None
        if len(matching_expectations_with_data) > 0:
            exp = matching_expectations_with_data[0]
            self.send_response(exp.response[0], "Python")
            send_headers(exp)
            self.wfile.write(exp.response[2].encode('utf-8'))
            exp.satisfied = True
            exp.data_capture["body"] = data
        elif len(matching_expectations) > 0:
            exp = matching_expectations[0]
            if exp.data:
                err_code = 403
                err_message = "Payload missing or incorrect"
                err_body = "This URL expects data: {0}. Query provided: {1}".format(exp.data, data)
            else:
                self.send_response(exp.response[0], "Python")
                send_headers(exp)
                self.wfile.write(exp.response[2].encode('utf-8'))
                exp.satisfied = True
                exp.data_capture["body"] = data
        elif len(expectations_matching_method) > 0:
            # All expectations have been fulfilled
            err_code = 400
            err_message = "Expectations exhausted"
            err_body = "Expectations at this URL have already been satisfied.\n" + str(expectations_matching_method)
        elif len(expectations_matching_url) > 0:
            # Method not allowed
            err_code = 405
            err_message = "Method not allowed"
            err_body = "Method " + method + " not allowed.\n" + str(expectations_matching_url)
        else:
            # not found
            err_code = 404
            err_message = "Not found"
            err_body = "No URL pattern matched."

        if err_code is not None:
            self.send_response(err_code, err_message)
            self.send_header("Content-Type", "text/plain")
            self.end_headers()
            self.wfile.write(err_body.encode('utf-8'))

        self.wfile.flush()

    def log_request(code=None, size=None):
        pass