File: conftest.py

package info (click to toggle)
python-consul 0.7.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 47,052 kB
  • sloc: python: 3,081; makefile: 147
file content (142 lines) | stat: -rw-r--r-- 4,017 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import collections
import subprocess
import platform
import sys
import tempfile
import socket
import shlex
import uuid
import time
import json
import os

import requests
import pytest
import py

collect_ignore = []

if sys.version_info[0] < 3:
    p = os.path.join(os.path.dirname(__file__), 'test_aio.py')
    collect_ignore.append(p)

'''
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
    p = os.path.join(os.path.dirname(__file__), 'test_twisted.py')
    collect_ignore.append(p)
    p = os.path.join(os.path.dirname(__file__), 'test_tornado.py')
    collect_ignore.append(p)
else:
    pytest_plugins = "pytest_twisted"
'''


def get_free_ports(num, host=None):
    if not host:
        host = '127.0.0.1'
    sockets = []
    ret = []
    for i in range(num):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind((host, 0))
        ret.append(s.getsockname()[1])
        sockets.append(s)
    for s in sockets:
        s.close()
    return ret


def start_consul_instance(acl_master_token=None):
    """
    starts a consul instance. if acl_master_token is None, acl will be disabled
    for this server, otherwise it will be enabled and the master token will be
    set to the supplied token

    returns: a tuple of the instances process object and the http port the
             instance is listening on
    """
    ports = dict(zip(
        ['http', 'rpc', 'serf_lan', 'serf_wan', 'server', 'dns'],
        get_free_ports(5) + [-1]))

    config = {'ports': ports, 'performance': {'raft_multiplier': 1}}
    if acl_master_token:
        config['acl_datacenter'] = 'dc1'
        config['acl_master_token'] = acl_master_token

    tmpdir = py.path.local(tempfile.mkdtemp())
    tmpdir.join('config.json').write(json.dumps(config))
    tmpdir.chdir()

    (system, node, release, version, machine, processor) = platform.uname()
    if system == 'Darwin':
        postfix = 'osx'
    else:
        postfix = 'linux64'
    bin = os.path.join(os.path.dirname(__file__), 'consul.'+postfix)
    command = '{bin} agent -server -bootstrap' \
              ' -bind=127.0.0.1' \
              ' -config-dir=. -data-dir=./data'
    command = command.format(bin=bin).strip()
    command = shlex.split(command)

    p = subprocess.Popen(
        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # wait for consul instance to bootstrap
    base_uri = 'http://127.0.0.1:%s/v1/' % ports['http']

    while True:
        time.sleep(0.1)
        try:
            response = requests.get(base_uri + 'status/leader')
        except requests.ConnectionError:
            continue
        if response.text.strip() != '""':
            break

    requests.put(base_uri + 'agent/service/register', data='{"name": "foo"}')

    while True:
        response = requests.get(base_uri + 'health/service/foo')
        if response.text.strip() != '[]':
            break
        time.sleep(0.1)

    requests.get(base_uri + 'agent/service/deregister/foo')
    # phew
    time.sleep(2)
    return p, ports['http']


@pytest.yield_fixture(scope="session")
def consul_instance():
    p, port = start_consul_instance()
    yield port
    p.terminate()


@pytest.yield_fixture
def consul_port(consul_instance):
    yield consul_instance
    # remove all data from the instance, to have a clean start
    base_uri = 'http://127.0.0.1:%s/v1/' % consul_instance
    requests.delete(base_uri + 'kv/?recurse=1')


@pytest.yield_fixture(scope="session")
def acl_consul_instance():
    acl_master_token = uuid.uuid4().hex
    p, port = start_consul_instance(acl_master_token=acl_master_token)
    yield port, acl_master_token
    p.terminate()


@pytest.yield_fixture
def acl_consul(acl_consul_instance):
    ACLConsul = collections.namedtuple('ACLConsul', ['port', 'token'])
    port, token = acl_consul_instance
    yield ACLConsul(port, token)
    # remove all data from the instance, to have a clean start
    base_uri = 'http://127.0.0.1:%s/v1/' % port
    requests.delete(base_uri + 'kv/?recurse=1')