File: dummysmtpd.py

package info (click to toggle)
mercurial 7.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 45,080 kB
  • sloc: python: 208,589; ansic: 56,460; tcl: 3,715; sh: 1,839; lisp: 1,483; cpp: 864; makefile: 769; javascript: 649; xml: 36
file content (169 lines) | stat: -rwxr-xr-x 4,430 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3

"""dummy SMTP server for use in tests"""


import io
import optparse
import os
import socket
import ssl
import sys

from mercurial import (
    pycompat,
    server,
    sslutil,
    ui as uimod,
)

if pycompat.iswindows:
    sys.stdout = io.TextIOWrapper(
        sys.stdout.buffer,
        sys.stdout.encoding,
        sys.stdout.errors,
        newline="\n",
    )

if os.environ.get('HGIPV6', '0') == '1':
    family = socket.AF_INET6
else:
    family = socket.AF_INET


def log(msg):
    sys.stdout.write(msg)
    sys.stdout.flush()


def mocksmtpserversession(conn, addr):
    conn.send(b'220 smtp.example.com ESMTP\r\n')

    try:
        # Newer versions of OpenSSL raise on EOF
        line = conn.recv(1024)
    except ssl.SSLError:
        log('no hello: EOF\n')
        return

    if not line.lower().startswith(b'ehlo '):
        # Older versions of OpenSSl don't raise
        log('no hello: %s\n' % line)
        return

    conn.send(b'250 Hello\r\n')

    line = conn.recv(1024)
    if not line.lower().startswith(b'mail from:'):
        log('no mail from: %s\n' % line)
        return
    mailfrom = line[10:].decode().rstrip()
    if mailfrom.startswith('<') and mailfrom.endswith('>'):
        mailfrom = mailfrom[1:-1]

    conn.send(b'250 Ok\r\n')

    rcpttos = []
    while True:
        line = conn.recv(1024)
        if not line.lower().startswith(b'rcpt to:'):
            break
        rcptto = line[8:].decode().rstrip()
        if rcptto.startswith('<') and rcptto.endswith('>'):
            rcptto = rcptto[1:-1]
        rcpttos.append(rcptto)

        conn.send(b'250 Ok\r\n')

    if not line.lower().strip() == b'data':
        log('no rcpt to or data: %s' % line)

    conn.send(b'354 Go ahead\r\n')

    data = b''
    while True:
        line = conn.recv(1024)
        if not line:
            log('connection closed before end of data')
            break
        data += line
        if data.endswith(b'\r\n.\r\n'):
            data = data[:-5]
            break

    conn.send(b'250 Ok\r\n')

    log(
        '%s from=%s to=%s\n%s\n'
        % (addr[0], mailfrom, ', '.join(rcpttos), data.decode())
    )


def run(host, port, certificate):
    ui = uimod.ui.load()
    with socket.socket(family, socket.SOCK_STREAM) as s:
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((host, port))
        # log('listening at %s:%d\n' % (host, port))
        s.listen(1)
        try:
            while True:
                conn, addr = s.accept()
                if certificate:
                    try:
                        conn = sslutil.wrapserversocket(
                            conn, ui, certfile=certificate
                        )
                    except ssl.SSLError as e:
                        log('%s ssl error: %s\n' % (addr[0], e))
                        conn.close()
                        continue
                log("connection from %s:%s\n" % addr)
                mocksmtpserversession(conn, addr)
                conn.close()
        except KeyboardInterrupt:
            pass


def _encodestrsonly(v):
    if isinstance(v, str):
        return v.encode('ascii')
    return v


def bytesvars(obj):
    unidict = vars(obj)
    bd = {k.encode('ascii'): _encodestrsonly(v) for k, v in unidict.items()}
    if bd[b'daemon_postexec'] is not None:
        bd[b'daemon_postexec'] = [
            _encodestrsonly(v) for v in bd[b'daemon_postexec']
        ]
    return bd


def main():
    op = optparse.OptionParser()
    op.add_option('-d', '--daemon', action='store_true')
    op.add_option('--daemon-postexec', action='append')
    op.add_option('-p', '--port', type=int, default=8025)
    op.add_option('-a', '--address', default='localhost')
    op.add_option('--pid-file', metavar='FILE')
    op.add_option('--tls', choices=['none', 'smtps'], default='none')
    op.add_option('--certificate', metavar='FILE')
    op.add_option('--logfile', metavar='FILE')

    opts, args = op.parse_args()
    if (opts.tls == 'smtps') != bool(opts.certificate):
        op.error('--certificate must be specified with --tls=smtps')

    server.runservice(
        bytesvars(opts),
        runfn=lambda: run(opts.address, opts.port, opts.certificate),
        runargs=[pycompat.sysexecutable, pycompat.fsencode(__file__)]
        + pycompat.sysargv[1:],
        logfile=opts.logfile,
    )


if __name__ == '__main__':
    main()