File: flashproxy-reg-email

package info (click to toggle)
flashproxy 1.7-4
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid, stretch
  • size: 936 kB
  • ctags: 876
  • sloc: python: 3,708; sh: 823; makefile: 246; lisp: 15
file content (119 lines) | stat: -rwxr-xr-x 4,246 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python
"""Register with a facilitator using the email method."""

import argparse
import flashproxy
import os
import re
import smtplib
import sys

from flashproxy.keys import PIN_GOOGLE_CA_CERT, PIN_GOOGLE_PUBKEY_SHA1, check_certificate_pin, ensure_M2Crypto, temp_cert
from flashproxy.reg import build_reg_b64enc
from flashproxy.util import parse_addr_spec, format_addr, safe_format_addr

try:
    from M2Crypto import SSL
except ImportError:
    # Defer the error reporting so that --help works even without M2Crypto.
    pass

DEFAULT_EMAIL_ADDRESS = "flashproxyreg.a@gmail.com"
# dig MX gmail.com
DEFAULT_SMTP = ("gmail-smtp-in.l.google.com", 25)

# Use this to prevent Python smtplib from guessing and leaking our hostname.
EHLO_FQDN = "[127.0.0.1]"
FROM_EMAIL_ADDRESS = "nobody@localhost"

parser = argparse.ArgumentParser(
    usage="%(prog)s [OPTIONS] [REMOTE][:PORT]",
    description="Register with a flash proxy facilitator through email. Makes "
    "a STARTTLS connection to an SMTP server and sends mail with a client IP "
    "address to a designated address. If only the external port is given, the "
    "external address is guessed from the SMTP EHLO response.",
    epilog="Using an SMTP server or email address other than the defaults will "
    "not work unless you have made special arrangements to connect them to a "
    "facilitator.")
flashproxy.util.add_module_opts(parser)
flashproxy.keys.add_module_opts(parser)
flashproxy.reg.add_registration_args(parser)
# specific opts
parser.add_argument("-e", "--email", metavar="ADDRESS",
    help="send mail to ADDRESS, default %(default)s.",
    default=DEFAULT_EMAIL_ADDRESS)
parser.add_argument("-s", "--smtp", metavar="HOST[:PORT]",
    help="use the given SMTP server, default %s." % format_addr(DEFAULT_SMTP),
    default="", type=lambda x: parse_addr_spec(x, *DEFAULT_SMTP))
parser.add_argument("-d", "--debug",
    help="enable debugging output (Python smtplib messages).",
    action="store_true")

options = parser.parse_args(sys.argv[1:])
flashproxy.util.enforce_address_family(options.address_family)

ensure_M2Crypto()

smtp = smtplib.SMTP(options.smtp[0], options.smtp[1], EHLO_FQDN)

if options.debug:
    smtp.set_debuglevel(1)

try:
    ctx = SSL.Context("tlsv1")
    ctx.set_verify(SSL.verify_peer, 3)

    with temp_cert(PIN_GOOGLE_CA_CERT) as ca_filename:
        # We roll our own initial EHLO/STARTTLS because smtplib.SMTP.starttls
        # doesn't allow enough certificate validation.
        code, msg = smtp.docmd("EHLO", EHLO_FQDN)
        if code != 250:
            raise ValueError("Got code %d after EHLO" % code)
        code, msg = smtp.docmd("STARTTLS")
        if code != 220:
            raise ValueError("Got code %d after STARTTLS" % code)
        ret = ctx.load_verify_locations(ca_filename)
        assert ret == 1

    smtp.sock = SSL.Connection(ctx, smtp.sock)
    smtp.sock.setup_ssl()
    smtp.sock.set_connect_state()
    smtp.sock.connect_ssl()
    smtp.file = smtp.sock.makefile()

    check_certificate_pin(smtp.sock, PIN_GOOGLE_PUBKEY_SHA1)
    smtp.ehlo(EHLO_FQDN)

    if not options.remote_addr[0]:
        # Grep the EHLO response for our public IP address.
        m = re.search(r'at your service, \[([0-9a-fA-F.:]+)\]', smtp.ehlo_resp)
        if not m:
            raise ValueError("Could not guess external IP address from EHLO response")
        spec = m.group(1)
        if ":" in spec:
            # Guess IPv6.
            spec = "[" + spec + "]"
        options.remote_addr = parse_addr_spec(spec, *options.remote_addr)

    body = build_reg_b64enc(options.remote_addr, options.transport)

    # Add a random subject to keep Gmail from threading everything.
    rand_string = os.urandom(5).encode("hex")
    smtp.sendmail(options.email, options.email, """\
To: %(to_addr)s\r
From: %(from_addr)s\r
Subject: client reg %(rand_string)s\r
\r
%(body)s
""" % {
        "to_addr": options.email,
        "from_addr": FROM_EMAIL_ADDRESS,
        "rand_string": rand_string,
        "body": body,
    })
    smtp.quit()
except Exception, e:
    print >> sys.stderr, "Failed to register: %s" % str(e)
    sys.exit(1)

print "Registered \"%s\" with %s." % (safe_format_addr(options.remote_addr), options.email)