File: test_responder_nosni.py

package info (click to toggle)
postfix-mta-sts-resolver 1.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 536 kB
  • sloc: python: 3,069; sh: 226; makefile: 47
file content (57 lines) | stat: -rw-r--r-- 1,829 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import sys
import asyncio
import itertools
import socket

import pytest

from postfix_mta_sts_resolver import netstring
from postfix_mta_sts_resolver.responder import STSSocketmapResponder
import postfix_mta_sts_resolver.utils as utils

from testdata import load_testdata

@pytest.fixture(scope="module")
async def responder(event_loop):
    import postfix_mta_sts_resolver.utils as utils
    cfg = utils.populate_cfg_defaults({"default_zone": {"require_sni": False}})
    cfg["zones"]["test2"] = cfg["default_zone"]
    cfg["port"] = 28461
    cache = utils.create_cache(cfg['cache']['type'],
                               cfg['cache']['options'])
    await cache.setup()
    resp = STSSocketmapResponder(cfg, event_loop, cache)
    await resp.start()
    result = resp, cfg['host'], cfg['port']
    yield result
    await resp.stop()
    await cache.teardown()

buf_sizes = [4096, 128, 16, 1]
reqresps = list(load_testdata('refdata_nosni'))
@pytest.mark.parametrize("params", tuple(itertools.product(reqresps, buf_sizes)))
@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_responder(responder, params):
    (request, response), bufsize = params
    resp, host, port = responder
    reader, writer = await asyncio.open_connection(host, port)
    stream_reader = netstring.StreamReader()
    string_reader = stream_reader.next_string()
    try:
        writer.write(netstring.encode(request))
        res = b''
        while True:
            try:
                part = string_reader.read()
            except netstring.WantRead:
                data = await reader.read(bufsize)
                assert data
                stream_reader.feed(data)
            else:
                if not part:
                    break
                res += part
        assert res == response
    finally:
        writer.close()