1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
#!/usr/bin/python3
# Tiny HTTP Proxy. Based on the work of SUZUKI Hisao.
#
# Ported to py3 and modified to remove the bits we don't need
# and modernized.
import os
import http.server
import select
import socket
import socketserver
import sys
import urllib.parse
class ProxyHandler(http.server.BaseHTTPRequestHandler):
server_version = "testsproxy/1.0"
def log_request(self, m=""):
super().log_request(m)
sys.stdout.flush()
sys.stderr.flush()
def handle(self):
(ip, port) = self.client_address
super().handle()
def _connect_to(self, netloc, soc):
i = netloc.find(":")
if i >= 0:
host_port = netloc[:i], int(netloc[i + 1 :])
else:
host_port = netloc, 80
try:
soc.connect(host_port)
except socket.error as arg:
try:
msg = arg[1]
except:
msg = arg
self.send_error(404, msg)
return False
return True
def do_CONNECT(self):
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
if self._connect_to(self.path, soc):
self.log_request(200)
s = self.protocol_version + " 200 Connection established\r\n"
self.wfile.write(s.encode())
s = "Proxy-agent: {}\r\n".format(self.version_string())
self.wfile.write(s.encode())
self.wfile.write("\r\n".encode())
self._read_write(soc, 300)
finally:
soc.close()
self.connection.close()
def do_GET(self):
(scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
self.path, "http"
)
if scm != "http" or fragment or not netloc:
s = "bad url {}".format(self.path)
self.send_error(400, s.encode())
return
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
if self._connect_to(netloc, soc):
self.log_request()
s = "{} {} {}\r\n".format(
self.command,
urllib.parse.urlunparse(("", "", path, params, query, "")),
self.request_version,
)
soc.send(s.encode())
self.headers["Connection"] = "close"
del self.headers["Proxy-Connection"]
for key, val in self.headers.items():
s = "{}: {}\r\n".format(key, val)
soc.send(s.encode())
soc.send("\r\n".encode())
self._read_write(soc)
finally:
soc.close()
self.connection.close()
def _read_write(self, soc, max_idling=20):
iw = [self.connection, soc]
ow = []
count = 0
while True:
count += 1
(ins, _, exs) = select.select(iw, ow, iw, 3)
if exs:
break
if ins:
for i in ins:
if i is soc:
out = self.connection
else:
out = soc
data = i.recv(8192)
if data:
out.send(data)
count = 0
if count == max_idling:
break
do_HEAD = do_GET
do_POST = do_GET
do_PUT = do_GET
do_DELETE = do_GET
def maybe_sd_notify(s: str) -> None:
addr = os.getenv("NOTIFY_SOCKET")
if not addr:
return
soc = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
soc.connect(addr)
soc.sendall(s.encode())
class ThreadingHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
def __init__(self, *args):
super().__init__(*args)
maybe_sd_notify("READY=1")
if __name__ == "__main__":
port = 3128
print("starting tinyproxy on port {}".format(port))
http.server.test(ProxyHandler, ThreadingHTTPServer, port=port)
|