1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
|
# Copyright 2014-2021 The aiosmtpd Developers
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
import multiprocessing as MP
import os
import time
from contextlib import contextmanager
from multiprocessing.synchronize import Event as MP_Event
from smtplib import SMTP as SMTPClient
from smtplib import SMTP_SSL
from typing import Generator
import pytest
from pytest_mock import MockFixture
from aiosmtpd import __version__
from aiosmtpd.handlers import Debugging
from aiosmtpd.main import main, parseargs
from aiosmtpd.testing.helpers import catchup_delay
from aiosmtpd.testing.statuscodes import SMTP_STATUS_CODES as S
from aiosmtpd.tests.conftest import AUTOSTOP_DELAY, SERVER_CRT, SERVER_KEY
try:
import pwd
except ImportError:
pwd = None
HAS_SETUID = hasattr(os, "setuid")
MAIL_LOG = logging.getLogger("mail.log")
# region ##### Custom Handlers ########################################################
class FromCliHandler:
def __init__(self, called: bool):
self.called = called
@classmethod
def from_cli(cls, parser, *args):
return cls(*args)
class NullHandler:
pass
# endregion
# region ##### Fixtures ###############################################################
@pytest.fixture
def nobody_uid() -> Generator[int, None, None]:
if pwd is None:
pytest.skip("No pwd module available")
try:
pw = pwd.getpwnam("nobody")
except KeyError:
pytest.skip("'nobody' not available")
else:
yield pw.pw_uid
@pytest.fixture
def setuid(mocker: MockFixture):
if not HAS_SETUID:
pytest.skip("setuid is unavailable")
mocker.patch("aiosmtpd.main.pwd", None)
mocker.patch("os.setuid", side_effect=PermissionError)
mocker.patch("aiosmtpd.main.partial", side_effect=RuntimeError)
# endregion
# region ##### Helper Funcs ###########################################################
def watch_for_tls(ready_flag: MP_Event, retq: MP.Queue):
has_tls = False
req_tls = False
ready_flag.set()
start = time.monotonic()
delay = AUTOSTOP_DELAY * 4
while (time.monotonic() - start) <= delay:
try:
with SMTPClient("localhost", 8025, timeout=0.1) as client:
resp = client.docmd("HELP", "HELO")
if resp == S.S530_STARTTLS_FIRST:
req_tls = True
client.ehlo("exemple.org")
if "starttls" in client.esmtp_features:
has_tls = True
break
except Exception:
time.sleep(0.05)
retq.put(has_tls)
retq.put(req_tls)
def watch_for_smtps(ready_flag: MP_Event, retq: MP.Queue):
has_smtps = False
ready_flag.set()
start = time.monotonic()
delay = AUTOSTOP_DELAY * 1.5
while (time.monotonic() - start) <= delay:
try:
with SMTP_SSL("localhost", 8025, timeout=0.1) as client:
client.ehlo("exemple.org")
has_smtps = True
break
except Exception:
time.sleep(0.05)
retq.put(has_smtps)
def main_n(*args):
main(("-n",) + args)
@contextmanager
def watcher_process(func):
redy = MP.Event()
retq = MP.Queue()
proc = MP.Process(target=func, args=(redy, retq))
proc.start()
redy.wait()
yield retq
proc.join()
# endregion
@pytest.mark.usefixtures("autostop_loop")
class TestMain:
def test_setuid(self, nobody_uid, mocker):
mock = mocker.patch("os.setuid")
main(args=())
mock.assert_called_with(nobody_uid)
def test_setuid_permission_error(self, nobody_uid, mocker, capsys):
mock = mocker.patch("os.setuid", side_effect=PermissionError)
with pytest.raises(SystemExit) as excinfo:
main(args=())
assert excinfo.value.code == 1
mock.assert_called_with(nobody_uid)
assert (
capsys.readouterr().err
== 'Cannot setuid "nobody"; try running with -n option.\n'
)
def test_setuid_no_pwd_module(self, nobody_uid, mocker, capsys):
mocker.patch("aiosmtpd.main.pwd", None)
with pytest.raises(SystemExit) as excinfo:
main(args=())
assert excinfo.value.code == 1
# On Python 3.8 on Linux, a bunch of "RuntimeWarning: coroutine
# 'AsyncMockMixin._execute_mock_call' was never awaited" messages
# gets mixed up into stderr causing test fail.
# Therefore, we use assertIn instead of assertEqual here, because
# the string DOES appear in stderr, just buried.
assert (
'Cannot import module "pwd"; try running with -n option.\n'
in capsys.readouterr().err
)
def test_n(self, setuid):
with pytest.raises(RuntimeError):
main_n()
def test_nosetuid(self, setuid):
with pytest.raises(RuntimeError):
main(("--nosetuid",))
def test_debug_0(self):
# For this test, the test runner likely has already set the log level
# so it may not be logging.ERROR.
default_level = MAIL_LOG.getEffectiveLevel()
main_n()
assert MAIL_LOG.getEffectiveLevel() == default_level
def test_debug_1(self):
main_n("-d")
assert MAIL_LOG.getEffectiveLevel() == logging.INFO
def test_debug_2(self):
main_n("-dd")
assert MAIL_LOG.getEffectiveLevel() == logging.DEBUG
def test_debug_3(self):
main_n("-ddd")
assert MAIL_LOG.getEffectiveLevel() == logging.DEBUG
assert asyncio.get_event_loop().get_debug()
class TestMainByWatcher:
def test_tls(self, temp_event_loop):
with watcher_process(watch_for_tls) as retq:
temp_event_loop.call_later(AUTOSTOP_DELAY, temp_event_loop.stop)
main_n("--tlscert", str(SERVER_CRT), "--tlskey", str(SERVER_KEY))
catchup_delay()
has_starttls = retq.get()
assert has_starttls is True
require_tls = retq.get()
assert require_tls is True
def test_tls_noreq(self, temp_event_loop):
with watcher_process(watch_for_tls) as retq:
temp_event_loop.call_later(AUTOSTOP_DELAY, temp_event_loop.stop)
main_n(
"--tlscert",
str(SERVER_CRT),
"--tlskey",
str(SERVER_KEY),
"--no-requiretls",
)
catchup_delay()
has_starttls = retq.get()
assert has_starttls is True
require_tls = retq.get()
assert require_tls is False
def test_smtps(self, temp_event_loop):
with watcher_process(watch_for_smtps) as retq:
temp_event_loop.call_later(AUTOSTOP_DELAY, temp_event_loop.stop)
main_n("--smtpscert", str(SERVER_CRT), "--smtpskey", str(SERVER_KEY))
catchup_delay()
has_smtps = retq.get()
assert has_smtps is True
class TestParseArgs:
def test_defaults(self):
parser, args = parseargs(tuple())
assert args.classargs == tuple()
assert args.classpath == "aiosmtpd.handlers.Debugging"
assert args.debug == 0
assert isinstance(args.handler, Debugging)
assert args.host == "localhost"
assert args.listen is None
assert args.port == 8025
assert args.setuid is True
assert args.size is None
assert args.smtputf8 is False
assert args.smtpscert is None
assert args.smtpskey is None
assert args.tlscert is None
assert args.tlskey is None
assert args.requiretls is True
def test_handler_from_cli(self):
parser, args = parseargs(
("-c", "aiosmtpd.tests.test_main.FromCliHandler", "--", "FOO")
)
assert isinstance(args.handler, FromCliHandler)
assert args.handler.called == "FOO"
def test_handler_no_from_cli(self):
parser, args = parseargs(("-c", "aiosmtpd.tests.test_main.NullHandler"))
assert isinstance(args.handler, NullHandler)
def test_handler_from_cli_exception(self):
with pytest.raises(TypeError):
parseargs(("-c", "aiosmtpd.tests.test_main.FromCliHandler", "FOO", "BAR"))
def test_handler_no_from_cli_exception(self, capsys):
with pytest.raises(SystemExit) as excinfo:
parseargs(("-c", "aiosmtpd.tests.test_main.NullHandler", "FOO", "BAR"))
assert excinfo.value.code == 2
assert (
"Handler class aiosmtpd.tests.test_main takes no arguments"
in capsys.readouterr().err
)
@pytest.mark.parametrize(
("args", "exp_host", "exp_port"),
[
((), "localhost", 8025),
(("-l", "foo:25"), "foo", 25),
(("--listen", "foo:25"), "foo", 25),
(("-l", "foo"), "foo", 8025),
(("-l", ":25"), "localhost", 25),
(("-l", "::0:25"), "::0", 25),
],
)
def test_host_port(self, args, exp_host, exp_port):
parser, args_ = parseargs(args=args)
assert args_.host == exp_host
assert args_.port == exp_port
def test_bad_port_number(self, capsys):
with pytest.raises(SystemExit) as excinfo:
parseargs(("-l", ":foo"))
assert excinfo.value.code == 2
assert "Invalid port number: foo" in capsys.readouterr().err
@pytest.mark.parametrize("opt", ["--version", "-v"])
def test_version(self, capsys, mocker, opt):
mocker.patch("aiosmtpd.main.PROGRAM", "smtpd")
with pytest.raises(SystemExit) as excinfo:
parseargs((opt,))
assert excinfo.value.code == 0
assert capsys.readouterr().out == f"smtpd {__version__}\n"
@pytest.mark.parametrize("args", [("--smtpscert", "x"), ("--smtpskey", "x")])
def test_smtps(self, capsys, mocker, args):
mocker.patch("aiosmtpd.main.PROGRAM", "smtpd")
with pytest.raises(SystemExit) as exc:
parseargs(args)
assert exc.value.code == 2
assert (
"--smtpscert and --smtpskey must be specified together"
in capsys.readouterr().err
)
@pytest.mark.parametrize("args", [("--tlscert", "x"), ("--tlskey", "x")])
def test_tls(self, capsys, mocker, args):
mocker.patch("aiosmtpd.main.PROGRAM", "smtpd")
with pytest.raises(SystemExit) as exc:
parseargs(args)
assert exc.value.code == 2
assert (
"--tlscert and --tlskey must be specified together"
in capsys.readouterr().err
)
def test_norequiretls(self, capsys, mocker):
mocker.patch("aiosmtpd.main.PROGRAM", "smtpd")
parser, args = parseargs(("--no-requiretls",))
assert args.requiretls is False
@pytest.mark.parametrize(
("certfile", "keyfile", "expect"),
[
("x", "x", "Cert file x not found"),
(SERVER_CRT, "x", "Key file x not found"),
("x", SERVER_KEY, "Cert file x not found"),
],
ids=["x-x", "cert-x", "x-key"],
)
@pytest.mark.parametrize("meth", ["smtps", "tls"])
def test_ssl_files_err(self, capsys, mocker, meth, certfile, keyfile, expect):
mocker.patch("aiosmtpd.main.PROGRAM", "smtpd")
with pytest.raises(SystemExit) as exc:
parseargs((f"--{meth}cert", certfile, f"--{meth}key", keyfile))
assert exc.value.code == 2
assert expect in capsys.readouterr().err
class TestSigint:
def test_keyboard_interrupt(self, temp_event_loop):
"""main() must close loop gracefully on KeyboardInterrupt."""
def interrupt():
raise KeyboardInterrupt
temp_event_loop.call_later(1.0, interrupt)
try:
main_n()
except Exception:
pytest.fail("main() should've closed cleanly without exceptions!")
else:
assert not temp_event_loop.is_running()
|