File: utils.py

package info (click to toggle)
task 3.4.2%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 7,332 kB
  • sloc: cpp: 42,567; python: 12,689; sh: 775; perl: 189; makefile: 35
file content (335 lines) | stat: -rw-r--r-- 8,989 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import errno
import os
import sys
import socket
import signal
import functools
import atexit
import tempfile
from subprocess import Popen, PIPE, STDOUT
from threading import Thread

from queue import Queue, Empty
from time import sleep
import json
from .exceptions import CommandError, TimeoutWaitingFor

USED_PORTS = set()
ON_POSIX = "posix" in sys.builtin_module_names

# Directory relative to basetest module location
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

# From the CMAKE value of the same name. This is substituted at configure.
CMAKE_BINARY_DIR = os.path.abspath("${CMAKE_BINARY_DIR}")

# Location of binary files (usually the src/ folder)
BIN_PREFIX = os.path.abspath(os.path.join(CMAKE_BINARY_DIR, "src"))

# Default location of test hooks
DEFAULT_HOOK_PATH = os.path.abspath(
    os.path.join("${CMAKE_SOURCE_DIR}", "test", "test_hooks")
)

# Source directory
SOURCE_DIR = os.path.abspath("${CMAKE_SOURCE_DIR}")


# Environment flags to control skipping of task tests
TASKW_SKIP = os.environ.get("TASKW_SKIP", False)
# Environment flags to control use of PATH or in-tree binaries
TASK_USE_PATH = os.environ.get("TASK_USE_PATH", False)

UUID_REGEXP = "[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}"


def task_binary_location(cmd="task"):
    """If TASK_USE_PATH is set rely on PATH to look for task binaries.
    Otherwise ../src/ is used by default.
    """
    return binary_location(cmd, TASK_USE_PATH)


def binary_location(cmd, USE_PATH=False):
    """If USE_PATH is True rely on PATH to look for binaries.
    Otherwise ../src/ is used by default.
    """
    if USE_PATH:
        return cmd
    else:
        return os.path.join(BIN_PREFIX, cmd)


def wait_condition(cond, timeout=10, sleeptime=0.01):
    """Wait for condition to return anything other than None"""
    # NOTE Increasing sleeptime can dramatically increase testsuite runtime
    # It also reduces CPU load significantly
    if timeout is None:
        timeout = 10

    if timeout < sleeptime:
        print(("Warning, timeout cannot be smaller than", sleeptime))
        timeout = sleeptime

    # Max number of attempts until giving up
    tries = int(timeout / sleeptime)

    for i in range(tries):
        val = cond()

        if val is not None:
            break

        sleep(sleeptime)

    return val


def wait_process(pid, timeout=None):
    """Wait for process to finish"""

    def process():
        try:
            os.kill(pid, 0)
        except OSError:
            # Process is dead
            return True
        else:
            # Process is still ticking
            return None

    return wait_condition(process, timeout)


def _queue_output(arguments, pidq, outputq):
    """Read/Write output/input of given process.
    This function is meant to be executed in a thread as it may block
    """
    kwargs = arguments["process"]
    input_data = arguments["input"].encode("utf-8") if arguments["input"] else None

    try:
        proc = Popen(**kwargs)
    except OSError as e:
        # pid None is read by the main thread as a crash of the process
        pidq.put(None)

        outputq.put(
            (
                "",
                (
                    "Unexpected exception caught during execution of taskw: '{0}' . "
                    "If you are running out-of-tree tests set TASK_USE_PATH=1 "
                    "in shell env before execution and add the "
                    "location of the task(d) binary to the PATH".format(e)
                ),
                255,
            )
        )  # false exitcode

        return

    # Put the PID in the queue for main process to know.
    pidq.put(proc.pid)

    # Send input and wait for finish
    out, err = proc.communicate(input_data)

    out, err = out.decode('utf-8'), err.decode('utf-8')

    # Give the output back to the caller
    outputq.put((out, err, proc.returncode))


def _retrieve_output(thread, timeout, queue, thread_error):
    """Fetch output from taskw subprocess queues"""
    # Try to join the thread on failure abort
    thread.join(timeout)
    if thread.is_alive():
        # Join should have killed the thread. This is unexpected
        raise TimeoutWaitingFor(thread_error + ". Unexpected error")

    # Thread died so we should have output
    try:
        # data = (stdout, stderr, exitcode)
        data = queue.get(timeout=timeout)
    except Empty:
        data = TimeoutWaitingFor("streams from TaskWarrior")

    return data


def _get_output(arguments, timeout=None):
    """Collect output from the subprocess without blocking the main process if
    subprocess hangs.
    """
    # NOTE Increase this value if tests fail with None being received as
    # stdout/stderr instead of the expected content
    output_timeout = 0.1  # seconds

    pidq = Queue()
    outputq = Queue()

    t = Thread(target=_queue_output, args=(arguments, pidq, outputq))
    t.daemon = True
    t.start()

    try:
        pid = pidq.get(timeout=timeout)
    except Empty:
        pid = None

    # Process crashed or timed out for some reason
    if pid is None:
        return _retrieve_output(t, output_timeout, outputq, "TaskWarrior to start")

    # Wait for process to finish (normal execution)
    state = wait_process(pid, timeout)

    if state:
        # Process finished
        return _retrieve_output(
            t, output_timeout, outputq, "TaskWarrior thread to join"
        )

    # If we reach this point we assume the process got stuck or timed out
    for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL):
        # Start with lower signals and escalate if process ignores them
        try:
            os.kill(pid, signal.SIGABRT)
        except OSError as e:
            # ESRCH means the process finished/died between last check and now
            if e.errno != errno.ESRCH:
                raise

        # Wait for process to finish (should die/exit after signal)
        state = wait_process(pid, timeout)

        if state:
            # Process finished
            return _retrieve_output(t, output_timeout, outputq, "TaskWarrior to die")

    # This should never happen but in case something goes really bad
    raise OSError("TaskWarrior stopped responding and couldn't be killed")


def run_cmd_wait(
    cmd,
    input=None,
    stdout=PIPE,
    stderr=PIPE,
    merge_streams=False,
    env=os.environ,
    timeout=None,
):
    "Run a subprocess and wait for it to finish"

    if input is None:
        stdin = None
    else:
        stdin = PIPE

    if merge_streams:
        stderr = STDOUT
    else:
        stderr = PIPE

    arguments = {
        "process": {
            "args": cmd,
            "stdin": stdin,
            "stdout": stdout,
            "stderr": stderr,
            "close_fds": ON_POSIX,
            "env": env,
        },
        "input": input,
    }
    out, err, exit = _get_output(arguments, timeout)

    if merge_streams:
        if exit != 0:
            raise CommandError(cmd, exit, out)
        else:
            return exit, out
    else:
        if exit != 0:
            raise CommandError(cmd, exit, out, err)
        else:
            return exit, out, err


def run_cmd_wait_nofail(*args, **kwargs):
    "Same as run_cmd_wait but silence the exception if it happens"
    try:
        return run_cmd_wait(*args, **kwargs)
    except CommandError as e:
        return e.code, e.out, e.err


def memoize(obj):
    """Keep an in-memory cache of function results given its inputs"""
    cache = obj.cache = {}

    @functools.wraps(obj)
    def memoizer(*args, **kwargs):
        key = str(args) + str(kwargs)
        if key not in cache:
            cache[key] = obj(*args, **kwargs)
        return cache[key]

    return memoizer


from shutil import which
which = memoize(which)


def parse_datafile(file):
    """Parse .data files on the client and server treating files as JSON"""
    data = []
    with open(file) as fh:
        for line in fh:
            line = line.rstrip("\n")

            # Turn [] strings into {} to be treated properly as JSON hashes
            if line.startswith("[") and line.endswith("]"):
                line = "{" + line[1:-1] + "}"

            if line.startswith("{"):
                data.append(json.loads(line))
            else:
                data.append(line)
    return data


def mkstemp(data):
    """
    Create a temporary file that is removed at process exit
    """

    def rmtemp(name):
        try:
            os.remove(name)
        except OSError:
            pass

    f = tempfile.NamedTemporaryFile(delete=False)
    f.write(data.encode("utf-8") if not isinstance(data, bytes) else data)
    f.close()

    # Ensure removal at end of python session
    atexit.register(rmtemp, f.name)

    return f.name


def mkstemp_exec(data):
    """Create a temporary executable file that is removed at process exit"""
    name = mkstemp(data)
    os.chmod(name, 0o755)

    return name


# vim: ai sts=4 et sw=4