File: httpd.py

package info (click to toggle)
tarantool 2.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 85,364 kB
  • sloc: ansic: 513,760; cpp: 69,489; sh: 25,650; python: 19,190; perl: 14,973; makefile: 4,173; yacc: 1,329; sql: 1,074; pascal: 620; ruby: 190; awk: 18; lisp: 7
file content (146 lines) | stat: -rwxr-xr-x 3,962 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python2

import sys
from gevent.pywsgi import WSGIServer
from gevent import spawn, sleep, socket

def absent():
    code = "500 Server Error"
    headers = [('Content-Type', 'application/json')]
    body = ["No such method"]
    return code, body, headers

def hello():
    code = "200 OK"
    body = ["hello world"]
    headers = [('Content-Type', 'application/json')]
    return code, body, headers

def hello1():
    code = "200 OK"
    body = [b"abc"]
    headers = [('Content-Type', 'application/json')]
    return code, body, headers

def headers():
    code = "200 OK"
    body = [b"cookies"]
    headers = [('Content-Type', 'application/json'),
               ('Content-Type', 'application/yaml'),
               ('Set-Cookie', 'likes=cheese; Expires=Wed, 21 Oct 2015 07:28:00 GMT; Secure; HttpOnly'),
               ('Set-Cookie', 'bad@name=no;'),
               ('Set-Cookie', 'badcookie'),
               ('Set-Cookie', 'good_name=yes;'),
               ('Set-Cookie', 'age = 17; NOSuchOption; EmptyOption=Value;Secure'),
               ('my_header', 'value1'),
               ('my_header', 'value2'),
               ('very_very_very_long_headers_name1', 'true'),
               ]
    return code, body, headers

def long_query():
    sleep(0.005)
    code = "200 OK"
    body = [b"abc"]
    headers = [('Content-Type', 'application/json')]
    return code, body, headers

def redirect():
    code = "302 Found"
    body = ["redirecting"]
    headers = [('Location', '/')]
    return code, body, headers

paths = {
        "/": hello,
        "/abc": hello1,
        "/absent": absent,
        "/headers": headers,
        "/long_query": long_query,
        "/redirect": redirect,
        }

def read_handle(env, response):
    code = "404 Not Found"
    headers = []
    body = ['Not Found']
    if env["PATH_INFO"] in paths:
        code, body, headers = paths[env["PATH_INFO"]]()
    for key,value in env.iteritems():
        if "HTTP_" in key:
            headers.append((key[5:].lower(), value))
    response(code, headers)
    return body

def post_handle(env, response):
    code = "200 OK"
    body = [env['wsgi.input'].read()]
    headers = []
    for key,value in env.iteritems():
        if "HTTP_" in key:
            headers.append((key[5:].lower(), value))
    response(code, headers)
    return body

def other_handle(env, response, method, code):
    headers = [('Content-Type', 'text/plain'), ("method", method)]
    body = [method]
    for key,value in env.iteritems():
        if "HTTP_" in key:
            headers.append((key[5:].lower(), value))
    response(code, headers)
    return body

OTHER_METHODS = {
    "TRACE": True,
    "CONNECT": True,
    "OPTIONS": True,
    "DELETE": True ,
    "HEAD": True
}

def handle(env, response) :
    method = env["REQUEST_METHOD"].upper()
    if method == "GET":
        return read_handle(env, response)
    elif method == "PUT" or method == "POST" or method == "PATCH":
        return post_handle(env, response)
    elif method in OTHER_METHODS:
        return other_handle(env, response, method, "200 Ok")
    return other_handle(env, response, method, "400 Bad Request")

def heartbeat():
    try:
        while True:
            sys.stdout.write("heartbeat\n")
            sys.stdout.flush()
            sleep(1e-1)
    except IOError:
        sys.exit(1)

def usage():
    sys.stderr.write("Usage: %s { --inet HOST:PORT | --unix PATH }\n" %
                     sys.argv[0])
    sys.exit(1)

if len(sys.argv) != 3:
    usage()

if sys.argv[1] == "--inet":
    host, port = sys.argv[2].split(':')
    sock_family = socket.AF_INET
    sock_addr = (host, int(port))
elif sys.argv[1] == "--unix":
    path = sys.argv[2]
    sock_family = socket.AF_UNIX
    sock_addr = path
else:
    usage()

sock = socket.socket(sock_family, socket.SOCK_STREAM)
sock.bind(sock_addr)
sock.listen(10)

server = WSGIServer(sock, handle, log=None)
spawn(heartbeat)
server.serve_forever()