File: base.py

package info (click to toggle)
custodia 0.6.0-5.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 880 kB
  • sloc: python: 6,074; makefile: 317; sh: 62
file content (128 lines) | stat: -rw-r--r-- 4,025 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright (C) 2017  Custodia Project Contributors - see LICENSE file
from __future__ import absolute_import

import os
import shutil
import socket
import subprocess
import sys
import time
from string import Template

import pytest

from custodia.client import CustodiaHTTPClient
from custodia.compat import url_escape
from custodia.server.args import parse_args
from custodia.server.config import parse_config


@pytest.mark.servertest
class CustodiaServerRunner(object):
    request_headers = {'REMOTE_USER': 'me'}
    test_dir = 'tests/functional/tmp'
    custodia_client = None
    env = None
    process = None
    args = None
    config = None
    custodia_conf = None
    unique_number = 0

    @classmethod
    def setup_class(cls):
        if os.path.isdir(cls.test_dir):
            shutil.rmtree(cls.test_dir)
        os.makedirs(cls.test_dir)

    @classmethod
    def teardown_class(cls):
        shutil.rmtree(cls.test_dir)

    def _wait_pid(self, process, wait):
        timeout = time.time() + wait
        while time.time() < timeout:
            pid, _ = os.waitpid(process.pid, os.WNOHANG)
            if pid == process.pid:
                return True
            time.sleep(0.1)
        return False

    def _wait_socket(self, process, wait):
        timeout = time.time() + wait
        while time.time() < timeout:
            if process.poll() is not None:
                raise AssertionError(
                    "Premature termination of Custodia server")
            try:
                s = socket.socket(family=socket.AF_UNIX)
                s.connect(self.env['CUSTODIA_SOCKET'])
            except OSError:
                pass
            else:
                return True
            time.sleep(0.1)
        raise OSError('Timeout error')

    def get_unique_number(self):
        CustodiaServerRunner.unique_number = self.unique_number + 1
        return CustodiaServerRunner.unique_number

    @pytest.fixture(scope="class")
    def simple_configuration(self):
        with open('tests/functional/conf/template_simple.conf') as f:
            configstr = f.read()

        self.custodia_conf = os.path.join(self.test_dir, 'custodia.conf')
        with (open(self.custodia_conf, 'w+')) as conffile:
            t = Template(configstr)
            conf = t.substitute({'TEST_DIR': self.test_dir})
            conffile.write(conf)

        self.args = parse_args([self.custodia_conf])
        _, self.config = parse_config(self.args)
        self.env = os.environ.copy()
        self.env['CUSTODIA_SOCKET'] = self.config['server_socket']

    @pytest.fixture(scope="session")
    def dev_null(self, request):
        fd = os.open(os.devnull, os.O_RDWR)

        def close_dev_null():
            os.close(fd)

        request.addfinalizer(close_dev_null)
        return fd

    @pytest.fixture(scope="class")
    def custodia_server(self, simple_configuration, request, dev_null):
        # Don't write server messages to stdout unless we are in debug mode
        # pylint: disable=no-member
        if pytest.config.getoption('debug') or \
                pytest.config.getoption('verbose'):
            stdout = stderr = None
        else:
            stdout = stderr = dev_null
        # pylint: enable=no-member

        self.process = subprocess.Popen(
            [sys.executable, '-m', 'custodia.server', self.custodia_conf],
            stdout=stdout, stderr=stderr
        )

        self._wait_pid(self.process, 2)
        self._wait_socket(self.process, 5)

        arg = '{}/custodia.sock'.format(CustodiaServerRunner.test_dir)
        url = 'http+unix://{}'.format(url_escape(arg, ''))
        self.custodia_client = CustodiaHTTPClient(url)

        def fin():
            self.process.terminate()
            if not self._wait_pid(self.process, 2):
                self.process.kill()
                if not self._wait_pid(self.process, 2):
                    raise AssertionError("Hard kill failed")

        request.addfinalizer(fin)
        return self.custodia_client