File: conftest.py

package info (click to toggle)
poolcounter 1.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 244 kB
  • sloc: ansic: 1,065; python: 458; makefile: 33
file content (141 lines) | stat: -rw-r--r-- 4,005 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import pytest
import socket
import subprocess
import time

socket.setdefaulttimeout(5)


class ClientPool:
    """hands out Client instances and tears them down"""
    def __init__(self):
        self.local_ports = set()
        self.clients = set()

    def get(self, count):
        ret = []
        for i in range(1, count + 1):
            try:
                local_port = self.local_ports.pop()
            except KeyError:
                local_port = 0
            client = Client(str(i), local_port)
            ret.append(client)
            self.clients.add(client)

        if count == 1:
            return ret[0]
        else:
            return ret

    def teardown(self):
        for client in self.clients:
            port = client.close()
            if port:
                # Recycle!
                self.local_ports.add(port)
        self.clients.clear()


class Client:
    """Wrapper around socket"""
    host = '127.0.0.1'
    port = 7531

    def __init__(self, name, local_port=0):
        self.name = name
        self.local_port = local_port
        self._socket = None

    @property
    def socket(self):
        if self._socket is None:
            tries = 5
            connected = False
            while not connected:
                self._socket = socket.socket()
                # TODO: is the getaddrinfo indirection necessary?
                remote = socket.getaddrinfo(self.host, self.port)
                local = socket.getaddrinfo(self.host, self.local_port)
                self._socket.setsockopt(
                    socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
                )
                try:
                    # We try and reuse the local port if we can. The only
                    # way to do that is is to bind on 0, copy the ephemeral
                    # port we're given, and then rebind on that the next
                    # time we open the socket.
                    self._socket.bind(local[0][4])
                    self._socket.connect(remote[0][4])
                    connected = True
                except (ConnectionRefusedError, OSError):
                    tries -= 1
                    if tries <= 0:
                        raise
                    # Wait for poolcounter to start?
                    time.sleep(2)
                    # else try again!

        return self._socket

    def send(self, command):
        print('%s -> ' % self.name + command)
        self.socket.send(command.encode() + b'\n')

    def receive(self):
        recv = self.socket.recv(4096).strip().decode()
        print('%s <- ' % self.name + recv)
        return recv

    def close(self):
        if self._socket:
            # Save the local_port for next time
            self.local_port = self._socket.getsockname()[1]
            # TODO: If _socket isn't connected, trying to shut
            # it down will error again
            self._socket.shutdown(socket.SHUT_RDWR)
            self._socket.close()
            self._socket = None
        return self.local_port


def find_poolcounterd():
    # env POOLCOUNTERD can point to the binary if you want to run tests
    # against an installed version
    path = os.environ.get(
        'POOLCOUNTERD',
        os.path.join(os.path.dirname(os.path.dirname(__file__)), 'poolcounterd')
    )
    if not os.path.exists(path):
        pytest.skip('%s does not exist' % path)
    return path


@pytest.fixture(scope='session')
def poolcounter():
    path = find_poolcounterd()
    daemon = subprocess.Popen([path])
    yield daemon
    daemon.terminate()


@pytest.fixture()
def poolcounter_path():
    return find_poolcounterd()


# Use a singleton ClientPool to keep local_ports global
pool = ClientPool()


@pytest.fixture()
def clients():
    yield pool
    pool.teardown()


def pytest_generate_tests(metafunc):
    """parameterize lock_type if it's listed"""
    if 'lock_type' in metafunc.fixturenames:
        metafunc.parametrize('lock_type', ['ACQ4ME', 'ACQ4ANY'])