1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
|
"""
Implements handlers required on top of aiosmtpd for testing.
"""
import asyncio
import logging
from email.errors import HeaderParseError
from email.message import EmailMessage, Message
from typing import Any, AnyStr, Optional, Union
from aiosmtpd.handlers import Message as MessageHandler
from aiosmtpd.smtp import MISSING
from aiosmtpd.smtp import SMTP as SMTPD
from aiosmtpd.smtp import Envelope, Session, _Missing
from aiosmtplib import SMTPStatus
log = logging.getLogger("mail.log")
class RecordingHandler(MessageHandler):
def __init__(
self,
messages_list: list[Union[EmailMessage, Message]],
commands_list: list[tuple[str, tuple[Any, ...]]],
responses_list: list[str],
):
self.messages = messages_list
self.commands = commands_list
self.responses = responses_list
super().__init__(message_class=EmailMessage)
def record_command(self, command: str, *args: Any) -> None:
self.commands.append((command, tuple(args)))
def record_server_response(self, status: str) -> None:
self.responses.append(status)
def handle_message(self, message: Union[EmailMessage, Message]) -> None:
self.messages.append(message)
async def handle_EHLO(
self,
server: SMTPD,
session: Session,
envelope: Envelope,
hostname: str,
responses: list[str],
) -> list[str]:
"""Advertise auth login support."""
session.host_name = hostname # type: ignore
if server._tls_protocol:
return ["250-AUTH LOGIN"] + responses
else:
return responses
class TestSMTPD(SMTPD):
transport: Optional[asyncio.BaseTransport] # type: ignore
def _getaddr(self, arg: str) -> tuple[Optional[str], Optional[str]]:
"""
Don't raise an exception on unparsable email address
"""
address: Optional[str] = None
rest: Optional[str] = ""
try:
address, rest = super()._getaddr(arg)
except HeaderParseError:
pass
return address, rest
async def _call_handler_hook(self, command: str, *args: Any) -> Any:
self.event_handler.record_command(command, *args)
return await super()._call_handler_hook(command, *args)
async def push(self, status: AnyStr) -> None:
await super().push(status)
self.event_handler.record_server_response(status)
async def smtp_EXPN(self, arg: str) -> None:
"""
Pass EXPN to handler hook.
"""
status = await self._call_handler_hook("EXPN")
await self.push(
"502 EXPN not implemented" if isinstance(status, _Missing) else status
)
async def smtp_HELP(self, arg: str) -> None:
"""
Override help to pass to handler hook.
"""
status = await self._call_handler_hook("HELP")
if status is MISSING:
await super().smtp_HELP(arg)
else:
await self.push(status)
async def smtp_STARTTLS(self, arg: str) -> None:
await super().smtp_STARTTLS(arg)
self.event_handler.record_command("STARTTLS", arg)
async def mock_response_delayed_ok(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
await asyncio.sleep(1.0)
await smtpd.push("250 all done")
async def mock_response_delayed_read(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
await smtpd.push("220-hi")
await asyncio.sleep(1.0)
async def mock_response_done(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
if args and args[0]:
smtpd.session.host_name = args[0]
await smtpd.push("250 done")
async def mock_response_done_then_close(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
if args and args[0]:
smtpd.session.host_name = args[0]
await smtpd.push("250 done")
await smtpd.push("221 bye now")
smtpd.transport.close()
async def mock_response_delayed_close(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
if args and args[0]:
smtpd.session.host_name = args[0]
await smtpd.push("250 done")
smtpd.loop.call_later(0.1, smtpd.transport.close)
async def mock_response_error_disconnect(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push("501 error")
smtpd.transport.close()
async def mock_response_bad_data(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
smtpd._writer.write(b"250 \xff\xff\xff\xff\r\n")
await smtpd._writer.drain()
async def mock_response_gibberish(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
smtpd._writer.write("wefpPSwrsfa2sdfsdf")
await smtpd._writer.drain()
async def mock_response_expn(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
await smtpd.push(
"""250-Joseph Blow <jblow@example.com>
250 Alice Smith <asmith@example.com>"""
)
async def mock_response_ehlo_minimal(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
if args and args[0]:
smtpd.session.host_name = args[0]
await smtpd.push("250 HELP")
async def mock_response_ehlo_full(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
if args and args[0]:
smtpd.session.host_name = args[0]
await smtpd.push(
"""250-localhost
250-PIPELINING
250-8BITMIME
250-SIZE 512000
250-DSN
250-ENHANCEDSTATUSCODES
250-EXPN
250-HELP
250-SAML
250-SEND
250-SOML
250-TURN
250-XADR
250-XSTA
250-ETRN
250 XGEN"""
)
async def mock_response_unavailable(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
await smtpd.push("421 retry in 5 minutes")
smtpd.transport.close()
async def mock_response_tls_not_available(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push("454 please login")
async def mock_response_tls_ready_disconnect(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push("220 go for it")
smtpd.transport.close()
async def mock_response_start_data_disconnect(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push("354 ok")
smtpd.transport.close()
async def mock_response_disconnect(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
smtpd.transport.close()
async def mock_response_eof(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
smtpd.transport.write_eof()
async def mock_response_mailbox_unavailable(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push(f"{SMTPStatus.mailbox_unavailable} error")
async def mock_response_unrecognized_command(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push(f"{SMTPStatus.unrecognized_command} error")
async def mock_response_bad_command_sequence(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push(f"{SMTPStatus.bad_command_sequence} error")
async def mock_response_syntax_error(smtpd: SMTPD, *args: Any, **kwargs: Any) -> None:
await smtpd.push(f"{SMTPStatus.syntax_error} error")
async def mock_response_syntax_error_and_cleanup(
smtpd: SMTPD, *args: Any, **kwargs: Any
) -> None:
await smtpd.push(f"{SMTPStatus.syntax_error} error")
if smtpd._handler_coroutine:
smtpd._handler_coroutine.cancel()
if smtpd.transport:
smtpd.transport.close()
|