File: smtpd.py

package info (click to toggle)
aiosmtplib 4.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 572 kB
  • sloc: python: 5,516; makefile: 20; sh: 6
file content (259 lines) | stat: -rw-r--r-- 7,234 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"""
Implements handlers required on top of aiosmtpd for testing.
"""

import asyncio
import logging
from email.errors import HeaderParseError
from email.message import EmailMessage, Message
from typing import Any, AnyStr, Optional, Union

from aiosmtpd.handlers import Message as MessageHandler
from aiosmtpd.smtp import MISSING
from aiosmtpd.smtp import SMTP as SMTPD
from aiosmtpd.smtp import Envelope, Session, _Missing

from aiosmtplib import SMTPStatus


log = logging.getLogger("mail.log")


class RecordingHandler(MessageHandler):
    def __init__(
        self,
        messages_list: list[Union[EmailMessage, Message]],
        commands_list: list[tuple[str, tuple[Any, ...]]],
        responses_list: list[str],
    ):
        self.messages = messages_list
        self.commands = commands_list
        self.responses = responses_list
        super().__init__(message_class=EmailMessage)

    def record_command(self, command: str, *args: Any) -> None:
        self.commands.append((command, tuple(args)))

    def record_server_response(self, status: str) -> None:
        self.responses.append(status)

    def handle_message(self, message: Union[EmailMessage, Message]) -> None:
        self.messages.append(message)

    async def handle_EHLO(
        self,
        server: SMTPD,
        session: Session,
        envelope: Envelope,
        hostname: str,
        responses: list[str],
    ) -> list[str]:
        """Advertise auth login support."""
        session.host_name = hostname  # type: ignore
        if server._tls_protocol:
            return ["250-AUTH LOGIN"] + responses
        else:
            return responses


class TestSMTPD(SMTPD):
    transport: Optional[asyncio.BaseTransport]  # type: ignore

    def _getaddr(self, arg: str) -> tuple[Optional[str], Optional[str]]:
        """
        Don't raise an exception on unparsable email address
        """
        address: Optional[str] = None
        rest: Optional[str] = ""
        try:
            address, rest = super()._getaddr(arg)
        except HeaderParseError:
            pass

        return address, rest

    async def _call_handler_hook(self, command: str, *args: Any) -> Any:
        self.event_handler.record_command(command, *args)
        return await super()._call_handler_hook(command, *args)

    async def push(self, status: AnyStr) -> None:
        await super().push(status)
        self.event_handler.record_server_response(status)

    async def smtp_EXPN(self, arg: str) -> None:
        """
        Pass EXPN to handler hook.
        """
        status = await self._call_handler_hook("EXPN")
        await self.push(
            "502 EXPN not implemented" if isinstance(status, _Missing) else status
        )

    async def smtp_HELP(self, arg: str) -> None:
        """
        Override help to pass to handler hook.
        """
        status = await self._call_handler_hook("HELP")
        if status is MISSING:
            await super().smtp_HELP(arg)
        else:
            await self.push(status)

    async def smtp_STARTTLS(self, arg: str) -> None:
        await super().smtp_STARTTLS(arg)
        self.event_handler.record_command("STARTTLS", arg)


async def mock_response_delayed_ok(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    await asyncio.sleep(1.0)
    await smtpd.push("250 all done")


async def mock_response_delayed_read(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    await smtpd.push("220-hi")
    await asyncio.sleep(1.0)


async def mock_response_done(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    if args and args[0]:
        smtpd.session.host_name = args[0]
    await smtpd.push("250 done")


async def mock_response_done_then_close(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    if args and args[0]:
        smtpd.session.host_name = args[0]
    await smtpd.push("250 done")
    await smtpd.push("221 bye now")
    smtpd.transport.close()


async def mock_response_delayed_close(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    if args and args[0]:
        smtpd.session.host_name = args[0]
    await smtpd.push("250 done")

    smtpd.loop.call_later(0.1, smtpd.transport.close)


async def mock_response_error_disconnect(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push("501 error")
    smtpd.transport.close()


async def mock_response_bad_data(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    smtpd._writer.write(b"250 \xff\xff\xff\xff\r\n")
    await smtpd._writer.drain()


async def mock_response_gibberish(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    smtpd._writer.write("wefpPSwrsfa2sdfsdf")
    await smtpd._writer.drain()


async def mock_response_expn(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    await smtpd.push(
        """250-Joseph Blow <jblow@example.com>
250 Alice Smith <asmith@example.com>"""
    )


async def mock_response_ehlo_minimal(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    if args and args[0]:
        smtpd.session.host_name = args[0]

    await smtpd.push("250 HELP")


async def mock_response_ehlo_full(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    if args and args[0]:
        smtpd.session.host_name = args[0]

    await smtpd.push(
        """250-localhost
250-PIPELINING
250-8BITMIME
250-SIZE 512000
250-DSN
250-ENHANCEDSTATUSCODES
250-EXPN
250-HELP
250-SAML
250-SEND
250-SOML
250-TURN
250-XADR
250-XSTA
250-ETRN
250 XGEN"""
    )


async def mock_response_unavailable(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    await smtpd.push("421 retry in 5 minutes")
    smtpd.transport.close()


async def mock_response_tls_not_available(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push("454 please login")


async def mock_response_tls_ready_disconnect(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push("220 go for it")
    smtpd.transport.close()


async def mock_response_start_data_disconnect(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push("354 ok")
    smtpd.transport.close()


async def mock_response_disconnect(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    smtpd.transport.close()


async def mock_response_eof(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    smtpd.transport.write_eof()


async def mock_response_mailbox_unavailable(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push(f"{SMTPStatus.mailbox_unavailable} error")


async def mock_response_unrecognized_command(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push(f"{SMTPStatus.unrecognized_command} error")


async def mock_response_bad_command_sequence(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push(f"{SMTPStatus.bad_command_sequence} error")


async def mock_response_syntax_error(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
    await smtpd.push(f"{SMTPStatus.syntax_error} error")


async def mock_response_syntax_error_and_cleanup(
    smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
    await smtpd.push(f"{SMTPStatus.syntax_error} error")

    if smtpd._handler_coroutine:
        smtpd._handler_coroutine.cancel()
    if smtpd.transport:
        smtpd.transport.close()