File: utils.py

package info (click to toggle)
taskd 1.1.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 1,576 kB
  • ctags: 1,141
  • sloc: cpp: 13,971; python: 1,523; sh: 1,080; perl: 610; ansic: 48; makefile: 21
file content (447 lines) | stat: -rw-r--r-- 13,160 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# -*- coding: utf-8 -*-
from __future__ import division
import os
import sys
import socket
import signal
import functools
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
from Queue import Queue, Empty
from time import sleep
try:
    import simplejson as json
except ImportError:
    import json
from .exceptions import CommandError, TimeoutWaitingFor

USED_PORTS = set()
ON_POSIX = 'posix' in sys.builtin_module_names

# Directory relative to basetest module location
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

# Location of binary files (usually the src/ folder)
BIN_PREFIX = os.path.abspath(
    os.path.join(CURRENT_DIR, "..", "..", "src")
)

# Default location of test certificates
DEFAULT_CERT_PATH = os.path.abspath(
    os.path.join(CURRENT_DIR, "..", "test_certs")
)

# Default location of test hooks
DEFAULT_HOOK_PATH = os.path.abspath(
    os.path.join(CURRENT_DIR, "..", "test_hooks")
)


# Environment flags to control skipping of task and taskd tests
TASKW_SKIP = os.environ.get("TASKW_SKIP", False)
TASKD_SKIP = os.environ.get("TASKD_SKIP", False)
# Environment flags to control use of PATH or in-tree binaries
TASK_USE_PATH = os.environ.get("TASK_USE_PATH", False)
TASKD_USE_PATH = os.environ.get("TASKD_USE_PATH", False)

UUID_REGEXP = ("[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}")


def task_binary_location(cmd="task"):
    """If TASK_USE_PATH is set rely on PATH to look for task binaries.
    Otherwise ../src/ is used by default.
    """
    return binary_location(cmd, TASK_USE_PATH)


def taskd_binary_location(cmd="taskd"):
    """If TASKD_USE_PATH is set rely on PATH to look for taskd binaries.
    Otherwise ../src/ is used by default.
    """
    return binary_location(cmd, TASKD_USE_PATH)


def binary_location(cmd, USE_PATH=False):
    """If USE_PATH is True rely on PATH to look for taskd binaries.
    Otherwise ../src/ is used by default.
    """
    if USE_PATH:
        return cmd
    else:
        return os.path.join(BIN_PREFIX, cmd)


def wait_condition(cond, timeout=1, sleeptime=.01):
    """Wait for condition to return anything other than None
    """
    # NOTE Increasing sleeptime can dramatically increase testsuite runtime
    # It also reduces CPU load significantly
    if timeout is None:
        timeout = 1

    if timeout < sleeptime:
        print("Warning, timeout cannot be smaller than", sleeptime)
        timeout = sleeptime

    # Max number of attempts until giving up
    tries = int(timeout / sleeptime)

    for i in range(tries):
        val = cond()

        if val is not None:
            break

        sleep(sleeptime)

    return val


def wait_process(pid, timeout=None):
    """Wait for process to finish
    """
    def process():
        try:
            os.kill(pid, 0)
        except OSError:
            # Process is dead
            return True
        else:
            # Process is still ticking
            return None

    return wait_condition(process, timeout)


def _queue_output(arguments, pidq, outputq):
    """Read/Write output/input of given process.
    This function is meant to be executed in a thread as it may block
    """
    kwargs = arguments["process"]
    input = arguments["input"]

    try:
        proc = Popen(**kwargs)
    except OSError as e:
        # pid None is read by the main thread as a crash of the process
        pidq.put(None)

        outputq.put((
            "",
            ("Unexpected exception caught during execution of taskw: '{0}' . "
             "If you are running out-of-tree tests set TASK_USE_PATH=1 or "
             "TASKD_USE_PATH=1 in shell env before execution and add the "
             "location of the task(d) binary to the PATH".format(e)),
            255))  # false exitcode

        return

    # Put the PID in the queue for main process to know.
    pidq.put(proc.pid)

    # Send input and wait for finish
    out, err = proc.communicate(input)

    # Give the output back to the caller
    outputq.put((out, err, proc.returncode))


def _retrieve_output(thread, timeout, queue, thread_error):
    """Fetch output from taskw subprocess queues
    """
    # Try to join the thread on failure abort
    thread.join(timeout)
    if thread.isAlive():
        # Join should have killed the thread. This is unexpected
        raise TimeoutWaitingFor(thread_error + ". Unexpected error")

    # Thread died so we should have output
    try:
        # data = (stdout, stderr, exitcode)
        data = queue.get(timeout=timeout)
    except Empty:
        data = TimeoutWaitingFor("streams from TaskWarrior")

    return data


def _get_output(arguments, timeout=None):
    """Collect output from the subprocess without blocking the main process if
    subprocess hangs.
    """
    # NOTE Increase this value if tests fail with None being received as
    # stdout/stderr instead of the expected content
    output_timeout = 0.1  # seconds

    pidq = Queue()
    outputq = Queue()

    t = Thread(target=_queue_output, args=(arguments, pidq, outputq))
    t.daemon = True
    t.start()

    try:
        pid = pidq.get(timeout=timeout)
    except Empty:
        pid = None

    # Process crashed or timed out for some reason
    if pid is None:
        return _retrieve_output(t, output_timeout, outputq,
                                "TaskWarrior to start")

    # Wait for process to finish (normal execution)
    state = wait_process(pid, timeout)

    if state:
        # Process finished
        return _retrieve_output(t, output_timeout, outputq,
                                "TaskWarrior thread to join")

    # If we reach this point we assume the process got stuck or timed out
    for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL):
        # Start with lower signals and escalate if process ignores them
        try:
            os.kill(pid, signal.SIGABRT)
        except OSError as e:
            # 3 means the process finished/died between last check and now
            if e.errno != 3:
                raise

        # Wait for process to finish (should die/exit after signal)
        state = wait_process(pid, timeout)

        if state:
            # Process finished
            return _retrieve_output(t, output_timeout, outputq,
                                    "TaskWarrior to die")

    # This should never happen but in case something goes really bad
    raise OSError("TaskWarrior stopped responding and couldn't be killed")


def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
                 merge_streams=False, env=os.environ, timeout=None):
    "Run a subprocess and wait for it to finish"

    if input is None:
        stdin = None
    else:
        stdin = PIPE

    if merge_streams:
        stderr = STDOUT
    else:
        stderr = PIPE

    arguments = {
        "process": {
            "args": cmd,
            "stdin": stdin,
            "stdout": stdout,
            "stderr": stderr,
            "bufsize": 1,
            "close_fds": ON_POSIX,
            "env": env,
        },
        "input": input,
    }
    out, err, exit = _get_output(arguments, timeout)

    if merge_streams:
        if exit != 0:
            raise CommandError(cmd, exit, out)
        else:
            return exit, out
    else:
        if exit != 0:
            raise CommandError(cmd, exit, out, err)
        else:
            return exit, out, err


def run_cmd_wait_nofail(*args, **kwargs):
    "Same as run_cmd_wait but silence the exception if it happens"
    try:
        return run_cmd_wait(*args, **kwargs)
    except CommandError as e:
        return e.code, e.out, e.err


def get_IPs(hostname):
    output = {}
    addrs = socket.getaddrinfo(hostname, 0, 0, 0, socket.IPPROTO_TCP)

    for family, socktype, proto, canonname, sockaddr in addrs:
        addr = sockaddr[0]
        output[family] = addr

    return output


def port_used(addr="localhost", port=None):
    "Return True if port is in use, False otherwise"
    if port is None:
        raise TypeError("Argument 'port' may not be None")

    # If we got an address name, resolve it both to IPv6 and IPv4.
    IPs = get_IPs(addr)

    # Taskd seems to prefer IPv6 so we do it first
    for family in (socket.AF_INET6, socket.AF_INET):
        try:
            addr = IPs[family]
        except KeyError:
            continue

        s = socket.socket(family, socket.SOCK_STREAM)
        result = s.connect_ex((addr, port))
        s.close()
        if result == 0:
            # connection was successful
            return True
    else:
        return False


def find_unused_port(addr="localhost", start=53589, track=True):
    """Find an unused port starting at `start` port

    If track=False the returned port will not be marked as in-use and the code
    will rely entirely on the ability to connect to addr:port as detection
    mechanism. Note this may cause problems if ports are assigned but not used
    immediately
    """
    maxport = 65535
    unused = None

    for port in xrange(start, maxport):
        if not port_used(addr, port):
            if track and port in USED_PORTS:
                continue

            unused = port
            break

    if unused is None:
        raise ValueError("No available port in the range {0}-{1}".format(
            start, maxport))

    if track:
        USED_PORTS.add(unused)

    return unused


def release_port(port):
    """Forget that given port was marked as'in-use
    """
    try:
        USED_PORTS.remove(port)
    except KeyError:
        pass


def memoize(obj):
    """Keep an in-memory cache of function results given it's inputs
    """
    cache = obj.cache = {}

    @functools.wraps(obj)
    def memoizer(*args, **kwargs):
        key = str(args) + str(kwargs)
        if key not in cache:
            cache[key] = obj(*args, **kwargs)
        return cache[key]
    return memoizer


try:
    from shutil import which
    which = memoize(which)
except ImportError:
    # NOTE: This is shutil.which backported from python-3.3.3
    @memoize
    def which(cmd, mode=os.F_OK | os.X_OK, path=None):
        """Given a command, mode, and a PATH string, return the path which
        conforms to the given mode on the PATH, or None if there is no such
        file.

        `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
        of os.environ.get("PATH"), or can be overridden with a custom search
        path.

        """
        # Check that a given file can be accessed with the correct mode.
        # Additionally check that `file` is not a directory, as on Windows
        # directories pass the os.access check.
        def _access_check(fn, mode):
            return (os.path.exists(fn) and os.access(fn, mode) and
                    not os.path.isdir(fn))

        # If we're given a path with a directory part, look it up directly
        # rather than referring to PATH directories. This includes checking
        # relative to the current directory, e.g. ./script
        if os.path.dirname(cmd):
            if _access_check(cmd, mode):
                return cmd
            return None

        if path is None:
            path = os.environ.get("PATH", os.defpath)
        if not path:
            return None
        path = path.split(os.pathsep)

        if sys.platform == "win32":
            # The current directory takes precedence on Windows.
            if os.curdir not in path:
                path.insert(0, os.curdir)

            # PATHEXT is necessary to check on Windows.
            pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
            # See if the given file matches any of the expected path
            # extensions. This will allow us to short circuit when given
            # "python.exe". If it does match, only test that one, otherwise we
            # have to try others.
            if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
                files = [cmd]
            else:
                files = [cmd + ext for ext in pathext]
        else:
            # On other platforms you don't have things like PATHEXT to tell you
            # what file suffixes are executable, so just pass on cmd as-is.
            files = [cmd]

        seen = set()
        for dir in path:
            normdir = os.path.normcase(dir)
            if normdir not in seen:
                seen.add(normdir)
                for thefile in files:
                    name = os.path.join(dir, thefile)
                    if _access_check(name, mode):
                        return name
        return None


def parse_datafile(file):
    """Parse .data files on the client and server treating files as JSON
    """
    data = []
    with open(file) as fh:
        for line in fh:
            line = line.rstrip("\n")

            # Turn [] strings into {} to be treated properly as JSON hashes
            if line.startswith('[') and line.endswith(']'):
                line = '{' + line[1:-1] + '}'

            if line.startswith("{"):
                data.append(json.loads(line))
            else:
                data.append(line)
    return data


# vim: ai sts=4 et sw=4