File: flashproxy-reg-appspot

package info (click to toggle)
flashproxy 1.7-4
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid, stretch
  • size: 936 kB
  • ctags: 876
  • sloc: python: 3,708; sh: 823; makefile: 246; lisp: 15
file content (137 lines) | stat: -rwxr-xr-x 4,590 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
"""Register with a facilitator through Google App Engine."""

import argparse
import flashproxy
import httplib
import socket
import sys
import urlparse
import urllib2

from flashproxy.keys import PIN_GOOGLE_CA_CERT, PIN_GOOGLE_PUBKEY_SHA1, check_certificate_pin, ensure_M2Crypto, temp_cert
from flashproxy.reg import build_reg_b64enc
from flashproxy.util import parse_addr_spec, safe_str, safe_format_addr

try:
    from M2Crypto import SSL
except ImportError:
    # Defer the error reporting so that --help works even without M2Crypto.
    pass

# The domain to which requests appear to go.
FRONT_DOMAIN = "www.google.com"
# The value of the Host header within requests.
TARGET_DOMAIN = "fp-reg-a.appspot.com"

# Like socket.create_connection in that it tries resolving different address
# families, but doesn't connect the socket.
def create_socket(address, timeout = None):
    host, port = address
    addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
    if not addrs:
        raise socket.error("getaddrinfo returns an empty list")
    err = None
    for addr in addrs:
        try:
            s = socket.socket(addr[0], addr[1], addr[2])
            if timeout is not None and type(timeout) == float:
                s.settimeout(timeout)
            return s
        except Exception, e:
            err = e
    raise err

# Certificate validation and pinning for urllib2. Inspired by
# http://web.archive.org/web/20110125104752/http://www.muchtooscrawled.com/2010/03/https-certificate-verification-in-python-with-urllib2/.

class PinHTTPSConnection(httplib.HTTPSConnection):
    def connect(self):
        sock = create_socket((self.host, self.port), self.timeout)
        if self._tunnel_host:
            self.sock = sock
            self._tunnel()

        ctx = SSL.Context("tlsv1")
        ctx.set_verify(SSL.verify_peer, 3)

        with temp_cert(PIN_GOOGLE_CA_CERT) as ca_filename:
            ret = ctx.load_verify_locations(ca_filename)
            assert ret == 1

        self.sock = SSL.Connection(ctx, sock)
        self.sock.connect((self.host, self.port))

        check_certificate_pin(self.sock, PIN_GOOGLE_PUBKEY_SHA1)

class PinHTTPSHandler(urllib2.HTTPSHandler):
    def https_open(self, req):
        return self.do_open(PinHTTPSConnection, req)

def urlopen(url):
    req = urllib2.Request(url)
    req.add_header("Host", TARGET_DOMAIN)
    opener = urllib2.build_opener(PinHTTPSHandler())
    return opener.open(req)

def get_external_ip():
    f = urlopen(urlparse.urlunparse(("https", FRONT_DOMAIN, "/ip", "", "", "")))
    try:
        return f.read().strip()
    finally:
        f.close()

parser = argparse.ArgumentParser(
    usage="%(prog)s [OPTIONS] [REMOTE][:PORT]",
    description="Register with a facilitator through a Google App Engine app. "
    "If only the external port is given, the remote server guesses our "
    "external address.")
flashproxy.util.add_module_opts(parser)
flashproxy.keys.add_module_opts(parser)
flashproxy.reg.add_registration_args(parser)

options = parser.parse_args(sys.argv[1:])
flashproxy.util.enforce_address_family(options.address_family)
remote_addr = options.remote_addr

ensure_M2Crypto()

if not remote_addr[0]:
    try:
        ip = get_external_ip()
    except urllib2.HTTPError, e:
        print >> sys.stderr, "Status code was %d, not 200" % e.code
        sys.exit(1)
    except urllib2.URLError, e:
        print >> sys.stderr, "Failed to get external IP address: %s" % str(e.reason)
        sys.exit(1)
    except Exception, e:
        print >> sys.stderr, "Failed to get external IP address: %s" % str(e)
        sys.exit(1)
    try:
        remote_addr = parse_addr_spec(ip, *remote_addr)
    except ValueError, e:
        print >> sys.stderr, "Error parsing external IP address %s: %s" % (safe_str(repr(ip)), str(e))
        sys.exit(1)

try:
    reg = build_reg_b64enc(remote_addr, options.transport, urlsafe=True)
    url = urlparse.urljoin(urlparse.urlunparse(("https", FRONT_DOMAIN, "/", "", "", "")), "reg/" + reg)
except Exception, e:
    print >> sys.stderr, "Error generating URL: %s" % str(e)
    sys.exit(1)

try:
    http = urlopen(url)
except urllib2.HTTPError, e:
    print >> sys.stderr, "Status code was %d, not 200" % e.code
    sys.exit(1)
except urllib2.URLError, e:
    print >> sys.stderr, "Failed to register: %s" % str(e.reason)
    sys.exit(1)
except Exception, e:
    print >> sys.stderr, "Failed to register: %s" % str(e)
    sys.exit(1)
http.close()

print "Registered \"%s\" with %s." % (safe_format_addr(remote_addr), TARGET_DOMAIN)