File: test_asdnotify.py

package info (click to toggle)
postfix-mta-sts-resolver 1.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 536 kB
  • sloc: python: 3,069; sh: 226; makefile: 47
file content (121 lines) | stat: -rw-r--r-- 3,665 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import contextlib
import socket
import asyncio
import os
import sys

import pytest

from postfix_mta_sts_resolver.asdnotify import AsyncSystemdNotifier

@contextlib.contextmanager
def set_env(**environ):
    old_environ = dict(os.environ)
    os.environ.update(environ)
    try:
        yield
    finally:
        os.environ.clear()
        os.environ.update(old_environ)

class UnixDatagramReceiver:
    def __init__(self, loop):
        self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        self._sock.setblocking(0)
        self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._sock.bind('')
        self._name = self._sock.getsockname()
        self._incoming = asyncio.Queue()
        self._loop = loop
        loop.add_reader(self._sock.fileno(), self._read_handler)

    def _read_handler(self):
        try:
            while True:
                msg = self._sock.recv(4096)
                self._incoming.put_nowait(msg)
        except BlockingIOError:  # pragma: no cover
            pass

    async def recvmsg(self):
        return await self._incoming.get()

    @property
    def name(self):
        return self._name

    @property
    def asciiname(self):
        sockname = self.name
        if isinstance(sockname, bytes):
            sockname = sockname.decode('ascii')
        if sockname.startswith('\x00'):
            sockname = '@' + sockname[1:]
        return sockname

    def close(self):
        self._loop.remove_reader(self._sock.fileno())
        self._sock.close()
        self._sock = None

pytestmark = pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")

@pytest.fixture(scope="module")
def unix_dgram_receiver(event_loop):
    udr = UnixDatagramReceiver(event_loop)
    yield udr
    udr.close()

@pytest.mark.timeout(5)
@pytest.mark.asyncio
async def test_message_sent(unix_dgram_receiver):
    sockname = unix_dgram_receiver.asciiname
    msg = b"READY=1"
    with set_env(NOTIFY_SOCKET=sockname):
        async with AsyncSystemdNotifier() as notifier:
            await notifier.notify(msg)
            assert await unix_dgram_receiver.recvmsg() == msg

@pytest.mark.timeout(5)
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
@pytest.mark.asyncio
async def test_message_flow(unix_dgram_receiver):
    sockname = unix_dgram_receiver.asciiname
    msgs = [b"READY=1", b'STOPPING=1'] * 500
    with set_env(NOTIFY_SOCKET=sockname):
        async with AsyncSystemdNotifier() as notifier:
            for msg in msgs:
                await notifier.notify(msg)
                assert await unix_dgram_receiver.recvmsg() == msg

@pytest.mark.timeout(5)
@pytest.mark.asyncio
async def test_not_started():
    async with AsyncSystemdNotifier() as notifier:
        assert not notifier.started

@pytest.mark.timeout(5)
@pytest.mark.asyncio
async def test_started(unix_dgram_receiver):
    with set_env(NOTIFY_SOCKET=unix_dgram_receiver.asciiname):
        async with AsyncSystemdNotifier() as notifier:
            assert notifier.started

@pytest.mark.timeout(5)
@pytest.mark.asyncio
async def test_send_never_fails():
    with set_env(NOTIFY_SOCKET='abc'):
        async with AsyncSystemdNotifier() as notifier:
            await notifier.notify(b'!!!')


@pytest.mark.timeout(5)
@pytest.mark.asyncio
async def test_socket_create_failure(monkeypatch):
    class mocksock:
        def __init__(self, *args, **kwargs):
            raise OSError()
    monkeypatch.setattr(socket, "socket", mocksock)
    with set_env(NOTIFY_SOCKET='abc'):
        async with AsyncSystemdNotifier() as notifier:
            await notifier.notify(b'!!!')