File: influxdb_instance.py

package info (click to toggle)
influxdb-python 3.0.0-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 456 kB
  • sloc: python: 5,140; makefile: 7
file content (194 lines) | stat: -rw-r--r-- 6,839 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import datetime
import os
import tempfile
import distutils
import time
import shutil
import subprocess
import sys
import unittest

from influxdb.tests.misc import is_port_open, get_free_ports

# hack in check_output if it's not defined, like for python 2.6
if "check_output" not in dir(subprocess):
    def f(*popenargs, **kwargs):
        if 'stdout' in kwargs:
            raise ValueError(
                'stdout argument not allowed, it will be overridden.'
            )
        process = subprocess.Popen(stdout=subprocess.PIPE,
                                   *popenargs,
                                   **kwargs)
        output, unused_err = process.communicate()
        retcode = process.poll()
        if retcode:
            cmd = kwargs.get("args")
            if cmd is None:
                cmd = popenargs[0]
            raise subprocess.CalledProcessError(retcode, cmd)
        return output
    subprocess.check_output = f


class InfluxDbInstance(object):
    """ A class to launch of fresh influxdb server instance
    in a temporary place, using a config file template.
    """

    def __init__(self,
                 conf_template,
                 udp_enabled=False):

        if os.environ.get("INFLUXDB_PYTHON_SKIP_SERVER_TESTS", None) == 'True':
            raise unittest.SkipTest(
                "Skipping server test (INFLUXDB_PYTHON_SKIP_SERVER_TESTS)"
            )

        self.influxd_path = self.find_influxd_path()

        errors = 0
        while True:
            try:
                self._start_server(conf_template, udp_enabled)
                break
            # Happens when the ports are already in use.
            except RuntimeError as e:
                errors += 1
                if errors > 2:
                    raise e

    def _start_server(self, conf_template, udp_enabled):

        # create a temporary dir to store all needed files
        # for the influxdb server instance :
        self.temp_dir_base = tempfile.mkdtemp()

        # "temp_dir_base" will be used for conf file and logs,
        # while "temp_dir_influxdb" is for the databases files/dirs :
        tempdir = self.temp_dir_influxdb = tempfile.mkdtemp(
            dir=self.temp_dir_base)

        # find a couple free ports :
        free_ports = get_free_ports(4)
        ports = {}
        for service in 'http', 'admin', 'meta', 'udp':
            ports[service + '_port'] = free_ports.pop()
        if not udp_enabled:
            ports['udp_port'] = -1

        conf_data = dict(
            meta_dir=os.path.join(tempdir, 'meta'),
            data_dir=os.path.join(tempdir, 'data'),
            wal_dir=os.path.join(tempdir, 'wal'),
            cluster_dir=os.path.join(tempdir, 'state'),
            handoff_dir=os.path.join(tempdir, 'handoff'),
            logs_file=os.path.join(self.temp_dir_base, 'logs.txt'),
            udp_enabled='true' if udp_enabled else 'false',
        )
        conf_data.update(ports)
        self.__dict__.update(conf_data)

        conf_file = os.path.join(self.temp_dir_base, 'influxdb.conf')
        with open(conf_file, "w") as fh:
            with open(conf_template) as fh_template:
                fh.write(fh_template.read().format(**conf_data))

        # now start the server instance:
        self.proc = subprocess.Popen(
            [self.influxd_path, '-config', conf_file],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

        print(
            "%s > Started influxdb bin in %r with ports %s and %s.." % (
                datetime.datetime.now(),
                self.temp_dir_base,
                self.admin_port,
                self.http_port
            )
        )

        # wait for it to listen on the broker and admin ports:
        # usually a fresh instance is ready in less than 1 sec ..
        timeout = time.time() + 10  # so 10 secs should be enough,
        # otherwise either your system load is high,
        # or you run a 286 @ 1Mhz ?
        try:
            while time.time() < timeout:
                if (is_port_open(self.http_port) and
                        is_port_open(self.admin_port)):
                    # it's hard to check if a UDP port is open..
                    if udp_enabled:
                        # so let's just sleep 0.5 sec in this case
                        # to be sure that the server has open the port
                        time.sleep(0.5)
                    break
                time.sleep(0.5)
                if self.proc.poll() is not None:
                    raise RuntimeError('influxdb prematurely exited')
            else:
                self.proc.terminate()
                self.proc.wait()
                raise RuntimeError('Timeout waiting for influxdb to listen'
                                   ' on its ports (%s)' % ports)
        except RuntimeError as err:
            data = self.get_logs_and_output()
            data['reason'] = str(err)
            data['now'] = datetime.datetime.now()
            raise RuntimeError("%(now)s > %(reason)s. RC=%(rc)s\n"
                               "stdout=%(out)s\nstderr=%(err)s\nlogs=%(logs)r"
                               % data)

    def find_influxd_path(self):
        influxdb_bin_path = os.environ.get(
            'INFLUXDB_PYTHON_INFLUXD_PATH',
            None
        )

        if influxdb_bin_path is None:
            influxdb_bin_path = distutils.spawn.find_executable('influxd')
            if not influxdb_bin_path:
                try:
                    influxdb_bin_path = subprocess.check_output(
                        ['which', 'influxd']
                    ).strip()
                except subprocess.CalledProcessError:
                    # fallback on :
                    influxdb_bin_path = '/opt/influxdb/influxd'

        if not os.path.isfile(influxdb_bin_path):
            raise unittest.SkipTest("Could not find influxd binary")

        version = subprocess.check_output([influxdb_bin_path, 'version'])
        print("InfluxDB version: %s" % version, file=sys.stderr)

        return influxdb_bin_path

    def get_logs_and_output(self):
        proc = self.proc
        try:
            with open(self.logs_file) as fh:
                logs = fh.read()
        except IOError as err:
            logs = "Couldn't read logs: %s" % err
        return {
            'rc': proc.returncode,
            'out': proc.stdout.read(),
            'err': proc.stderr.read(),
            'logs': logs
        }

    def close(self, remove_tree=True):
        self.proc.terminate()
        self.proc.wait()
        if remove_tree:
            shutil.rmtree(self.temp_dir_base)