File: test_responder_expiration.py

package info (click to toggle)
postfix-mta-sts-resolver 1.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 536 kB
  • sloc: python: 3,069; sh: 226; makefile: 47
file content (77 lines) | stat: -rw-r--r-- 2,417 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import asyncio
import tempfile
import os
import contextlib

import pytest

from postfix_mta_sts_resolver import netstring
from postfix_mta_sts_resolver.responder import STSSocketmapResponder
import postfix_mta_sts_resolver.utils as utils
import postfix_mta_sts_resolver.base_cache as base_cache

@contextlib.contextmanager
def set_env(**environ):
    old_environ = dict(os.environ)
    os.environ.update(environ)
    try:
        yield
    finally:
        os.environ.clear()
        os.environ.update(old_environ)

@pytest.mark.asyncio
@pytest.mark.timeout(10)
async def test_responder_expiration(event_loop):
    async def query(host, port, domain):
        reader, writer = await asyncio.open_connection(host, port)
        stream_reader = netstring.StreamReader()
        string_reader = stream_reader.next_string()
        writer.write(netstring.encode(b'test ' + domain.encode('ascii')))
        try:
            res = b''
            while True:
                try:
                    part = string_reader.read()
                except netstring.WantRead:
                    data = await reader.read(4096)
                    assert data
                    stream_reader.feed(data)
                else:
                    if not part:
                        break
                    res += part
            return res
        finally:
            writer.close()
    with tempfile.NamedTemporaryFile() as cachedb:
        cfg = {}
        cfg["port"] = 18461
        cfg["cache_grace"] = 0
        cfg["shutdown_timeout"] = 1
        cfg["cache"] = {
            "type": "sqlite",
            "options": {
                "filename": cachedb.name,
            },
        }
        cfg = utils.populate_cfg_defaults(cfg)
        cache = utils.create_cache(cfg['cache']['type'],
                                   cfg['cache']['options'])
        await cache.setup()
        pol_body = {
            "version": "STSv1",
            "mode": "enforce",
            "mx": [ "mail.loc" ],
            "max_age": 1,
        }
        await cache.set("no-record.loc", base_cache.CacheEntry(0, "0", pol_body))

        resp = STSSocketmapResponder(cfg, event_loop, cache)
        await resp.start()
        try:
            result = await query(cfg['host'], cfg['port'], 'no-record.loc')
            assert result == b'NOTFOUND '
        finally:
            await resp.stop()
            await cache.teardown()