1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
import sys
import asyncio
import itertools
import socket
import pytest
from postfix_mta_sts_resolver import netstring
from postfix_mta_sts_resolver.responder import STSSocketmapResponder
import postfix_mta_sts_resolver.utils as utils
@pytest.fixture
async def responder(event_loop):
import postfix_mta_sts_resolver.utils as utils
cfg = utils.populate_cfg_defaults(None)
cfg["port"] = 38461
cfg["shutdown_timeout"] = 1
cfg["cache_grace"] = 0
cfg["zones"]["test2"] = cfg["default_zone"]
cache = utils.create_cache(cfg['cache']['type'],
cfg['cache']['options'])
await cache.setup()
resp = STSSocketmapResponder(cfg, event_loop, cache)
await resp.start()
result = resp, cfg['host'], cfg['port']
yield result
await resp.stop()
await cache.teardown()
@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_hanging_stop(responder):
resp, host, port = responder
reader, writer = await asyncio.open_connection(host, port)
await resp.stop()
assert await reader.read() == b''
writer.close()
@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_inprogress_stop(responder):
resp, host, port = responder
reader, writer = await asyncio.open_connection(host, port)
writer.write(netstring.encode(b'test blackhole.loc'))
await writer.drain()
await asyncio.sleep(0.2)
await resp.stop()
assert await reader.read() == b''
writer.close()
@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_extended_stop(responder):
resp, host, port = responder
reader, writer = await asyncio.open_connection(host, port)
writer.write(netstring.encode(b'test blackhole.loc'))
writer.write(netstring.encode(b'test blackhole.loc'))
writer.write(netstring.encode(b'test blackhole.loc'))
await writer.drain()
await asyncio.sleep(0.2)
await resp.stop()
assert await reader.read() == b''
writer.close()
@pytest.mark.asyncio
@pytest.mark.timeout(7)
async def test_grace_expired(responder):
resp, host, port = responder
reader, writer = await asyncio.open_connection(host, port)
stream_reader = netstring.StreamReader()
async def answer():
string_reader = stream_reader.next_string()
res = b''
while True:
try:
part = string_reader.read()
except netstring.WantRead:
data = await reader.read(4096)
assert data
stream_reader.feed(data)
else:
if not part:
break
res += part
return res
try:
writer.write(netstring.encode(b'test good.loc'))
answer_a = await answer()
await asyncio.sleep(2)
writer.write(netstring.encode(b'test good.loc'))
answer_b = await answer()
assert answer_a == answer_b
finally:
writer.close()
@pytest.mark.asyncio
@pytest.mark.timeout(7)
async def test_fast_expire(responder):
resp, host, port = responder
reader, writer = await asyncio.open_connection(host, port)
stream_reader = netstring.StreamReader()
async def answer():
string_reader = stream_reader.next_string()
res = b''
while True:
try:
part = string_reader.read()
except netstring.WantRead:
data = await reader.read(4096)
assert data
stream_reader.feed(data)
else:
if not part:
break
res += part
return res
try:
writer.write(netstring.encode(b'test fast-expire.loc'))
answer_a = await answer()
await asyncio.sleep(2)
writer.write(netstring.encode(b'test fast-expire.loc'))
answer_b = await answer()
assert answer_a == answer_b == b'OK secure match=mail.loc servername=hostname'
finally:
writer.close()
|