File: taskd.py

package info (click to toggle)
taskd 1.1.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 1,576 kB
  • ctags: 1,141
  • sloc: cpp: 13,971; python: 1,523; sh: 1,080; perl: 610; ansic: 48; makefile: 21
file content (370 lines) | stat: -rw-r--r-- 12,382 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# -*- coding: utf-8 -*-

from __future__ import division, print_function
import os
import tempfile
import shutil
import signal
import atexit
from time import sleep
from subprocess import Popen, PIPE
from .utils import (find_unused_port, release_port, port_used, run_cmd_wait,
                    which, parse_datafile, DEFAULT_CERT_PATH,
                    taskd_binary_location)
from .exceptions import CommandError

try:
    from subprocess import DEVNULL
except ImportError:
    DEVNULL = open(os.devnull, 'w')


class Taskd(object):
    """Manage a taskd instance

    A temporary folder is used as data store of taskd.
    This class can be instanciated multiple times if multiple taskd servers are
    needed.

    This class implements mechanisms to automatically select an available port
    and prevent assigning the same port to different instances.

    A server can be stopped and started multiple times, but should not be
    started or stopped after being destroyed.
    """
    DEFAULT_TASKD = taskd_binary_location()
    TASKD_RUNNING = 0
    TASKD_NEVER_STARTED = 1
    TASKD_EXITED = 2
    TASKD_NOT_LISTENING = 3

    def __init__(self, taskd=DEFAULT_TASKD, certpath=None,
                 address="localhost"):
        """Initialize a Task server that runs in the background and stores data
        in a temporary folder

        :arg taskd: Taskd binary to launch the server (defaults: taskd in PATH)
        :arg certpath: Folder where to find all certificates needed for taskd
        :arg address: Address to bind to
        """
        self.taskd = taskd
        self.usercount = 0

        # Will hold the taskd subprocess if it's running
        self.proc = None
        self.datadir = tempfile.mkdtemp(prefix="taskd_")
        self.tasklog = os.path.join(self.datadir, "taskd.log")
        self.taskpid = os.path.join(self.datadir, "taskd.pid")

        # Ensure any instance is properly destroyed at session end
        atexit.register(lambda: self.destroy())

        self.reset_env()

        if certpath is None:
            certpath = DEFAULT_CERT_PATH
        self.certpath = certpath

        self.address = address
        self.port = find_unused_port(self.address)

        # Keep all certificate paths public for access by TaskClients
        self.client_cert = os.path.join(self.certpath, "client.cert.pem")
        self.client_key = os.path.join(self.certpath, "client.key.pem")
        self.server_cert = os.path.join(self.certpath, "server.cert.pem")
        self.server_key = os.path.join(self.certpath, "server.key.pem")
        self.server_crl = os.path.join(self.certpath, "server.crl.pem")
        self.ca_cert = os.path.join(self.certpath, "ca.cert.pem")

        # Initialize taskd
        cmd = (self.taskd, "init", "--data", self.datadir)
        run_cmd_wait(cmd, env=self.env)

        self.config("server", "{0}:{1}".format(self.address, self.port))
        self.config("family", "IPv4")
        self.config("log", self.tasklog)
        self.config("pid.file", self.taskpid)
        self.config("root", self.datadir)
        self.config("client.allow", "^task [2-9]")

        # Setup all necessary certificates
        self.config("client.cert", self.client_cert)
        self.config("client.key", self.client_key)
        self.config("server.cert", self.server_cert)
        self.config("server.key", self.server_key)
        self.config("server.crl", self.server_crl)
        self.config("ca.cert", self.ca_cert)

        self.default_user = self.create_user()

    def __repr__(self):
        txt = super(Taskd, self).__repr__()
        return "{0} running from {1}>".format(txt[:-1], self.datadir)

    def reset_env(self):
        """Set a new environment derived from the one used to launch the test
        """
        # Copy all env variables to avoid clashing subprocess environments
        self.env = os.environ.copy()

        # Make sure TASKDDATA points to the temporary folder
        self.env["TASKDATA"] = self.datadir

    def create_user(self, user=None, group=None, org=None):
        """Create a user/group in the server and return the user
        credentials to use in a taskw client.
        """
        if user is None:
            # Create a unique user ID
            uid = self.usercount
            user = "test_user_{0}".format(uid)

            # Increment the user_id
            self.usercount += 1

        if group is None:
            group = "default_group"

        if org is None:
            org = "default_org"

        self._add_entity("org", org, ignore_exists=True)
        self._add_entity("group", org, group, ignore_exists=True)
        userkey = self._add_entity("user", org, user)

        return user, group, org, userkey

    def _add_entity(self, keyword, org, value=None, ignore_exists=False):
        """Add an organization, group or user to the current server

        If a user creation is requested, the user unique ID is returned
        """
        cmd = (self.taskd, "add", "--data", self.datadir, keyword, org)

        if value is not None:
            cmd += (value,)

        try:
            code, out, err = run_cmd_wait(cmd, env=self.env)
        except CommandError as e:
            match = False
            for line in e.out.splitlines():
                if line.endswith("already exists.") and ignore_exists:
                    match = True
                    break

            # If the error was not "Already exists" report it
            if not match:
                raise

        if keyword == "user":
            expected = "New user key: "
            for line in out.splitlines():
                if line.startswith(expected):
                    return line.replace(expected, '')

    def config(self, var, value):
        """Run setup `var` as `value` in taskd config
        """
        cmd = (self.taskd, "config", "--force", "--data", self.datadir, var,
               value)
        run_cmd_wait(cmd, env=self.env)

        # If server is running send a SIGHUP to force config reload
        if self.proc is not None:
            try:
                self.proc.send_signal(signal.SIGHUP)
            except:
                pass

    def status(self):
        """Check the status of the server by checking if it's still running and
        listening for connections
        :returns: Taskd.TASKD_[NEVER_STARTED/EXITED/NOT_LISTENING/RUNNING]
        """
        if self.proc is None:
            return self.TASKD_NEVER_STARTED

        if self.returncode() is not None:
            return self.TASKD_EXITED

        if not port_used(addr=self.address, port=self.port):
            return self.TASKD_NOT_LISTENING

        return self.TASKD_RUNNING

    def returncode(self):
        """If taskd finished, return its exit code, otherwise return None.
        :returns: taskd's exit code or None
        """
        return self.proc.poll()

    def start(self, minutes=5, tries_per_minute=2):
        """Start the taskd server if it's not running.
        If it's already running OSError will be raised
        """
        if self.proc is None:
            cmd = (self.taskd, "server", "--data", self.datadir)
            self.proc = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL,
                              env=self.env)
        else:
            self.show_log_contents()

            raise OSError("Taskd server is still running or crashed")

        # Wait for server to listen by checking connectivity in the port
        # Default is to wait up to 5 minutes checking once every 500ms
        for i in range(minutes * 60 * tries_per_minute):
            status = self.status()

            if status == self.TASKD_RUNNING:
                return

            elif status == self.TASKD_NEVER_STARTED:
                self.show_log_contents()

                raise OSError("Task server was never started. "
                              "This shouldn't happen!!")

            elif status == self.TASKD_EXITED:
                # Collect output logs
                out, err = self.proc.communicate()

                self.show_log_contents()

                raise OSError(
                    "Task server launched with '{0}' crashed or exited "
                    "prematurely. Exit code: {1}. "
                    "Listening on port: {2}. "
                    "Stdout: {3!r}, "
                    "Stderr: {4!r}.".format(
                        self.taskd,
                        self.returncode(),
                        self.port,
                        out,
                        err,
                    ))

            elif status == self.TASKD_NOT_LISTENING:
                sleep(1 / tries_per_minute)

            else:
                self.show_log_contents()

                raise OSError("Unknown running status for taskd '{0}'".format(
                    status))

        # Force stop so we can collect output
        proc = self.stop()

        # Collect output logs
        out, err = proc.communicate()

        self.show_log_contents()

        raise OSError("Task server didn't start and listen on port {0} after "
                      "{1} minutes. Stdout: {2!r}. Stderr: {3!r}.".format(
                          self.port, minutes, out, err))

    def stop(self):
        """Stop the server by sending a SIGTERM and SIGKILL if fails to
        terminate.
        If it's already stopped OSError will be raised

        Returns: a reference to the old process object
        """
        if self.proc is None:
            raise OSError("Taskd server is not running")

        if self._check_pid():
            self.proc.send_signal(signal.SIGTERM)

        if self._check_pid():
            self.proc.kill()

        # Wait for process to end to avoid zombies
        self.proc.wait()

        # Keep a reference to the old process
        proc = self.proc

        # Unset the process to inform that no process is running
        self.proc = None

        return proc

    def _check_pid(self):
        "Check if self.proc is still running and a PID still exists"
        # Wait ~1 sec for taskd to finish
        signal = True
        for i in range(10):
            sleep(0.1)
            if self.proc.poll() is not None:
                signal = False
                break

        return signal

    def destroy(self):
        """Cleanup the data folder and release server port for other instances
        """
        # Ensure server is stopped first
        if self.proc is not None:
            self.stop()

        try:
            shutil.rmtree(self.datadir)
        except OSError as e:
            if e.errno == 2:
                # Directory no longer exists
                pass
            else:
                raise

        release_port(self.port)

        # Prevent future reuse of this instance
        self.start = self.__destroyed
        self.config = self.__destroyed
        self.stop = self.__destroyed

        # self.destroy will get called when the python session closes.
        # If self.destroy was already called, turn the action into a noop
        self.destroy = lambda: None

    def __destroyed(self, *args, **kwargs):
        raise AttributeError("Taskd instance has been destroyed. "
                             "Create a new instance if you need a new server.")

    @classmethod
    def not_available(cls):
        """Check if the taskd binary is available in the path"""
        if which(cls.DEFAULT_TASKD):
            return False
        else:
            return True

    def client_data(self, client):
        """Return a python list with the content of tx.data matching the given
        task client. tx.data will be parsed to string and JSON.
        """
        file = os.path.join(self.datadir,
                            "orgs",
                            client.credentials["org"],
                            "users",
                            client.credentials["userkey"],
                            "tx.data")

        return parse_datafile(file)

    def show_log_contents(self):
        """Print to to STDOUT the contents of taskd.log
        """
        if os.path.isfile(self.tasklog):
            with open(self.tasklog) as fh:
                print("#### Start taskd.log ####")
                for line in fh:
                    print(line, end='')
                print("#### End taskd.log ####")

# vim: ai sts=4 et sw=4