File: task.py

package info (click to toggle)
taskd 1.1.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 1,576 kB
  • ctags: 1,141
  • sloc: cpp: 13,971; python: 1,523; sh: 1,080; perl: 610; ansic: 48; makefile: 21
file content (301 lines) | stat: -rw-r--r-- 10,486 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# -*- coding: utf-8 -*-

import os
import tempfile
import shutil
import atexit
import unittest
from .utils import (run_cmd_wait, run_cmd_wait_nofail, which,
                    task_binary_location)
from .exceptions import CommandError
from .hooks import Hooks


class Task(object):
    """Manage a task warrior instance

    A temporary folder is used as data store of task warrior.
    This class can be instanciated multiple times if multiple taskw clients are
    needed.

    This class can be given a Taskd instance for simplified configuration.

    A taskw client should not be used after being destroyed.
    """
    DEFAULT_TASK = task_binary_location()

    def __init__(self, taskd=None, taskw=DEFAULT_TASK):
        """Initialize a Task warrior (client) that can interact with a taskd
        server. The task client runs in a temporary folder.

        :arg taskd: Taskd instance for client-server configuration
        :arg taskw: Task binary to use as client (defaults: task in PATH)
        """
        self.taskw = taskw
        self.taskd = taskd

        # Used to specify what command to launch (and to inject faketime)
        self._command = [self.taskw]

        # Configuration of the isolated environment
        self._original_pwd = os.getcwd()
        self.datadir = tempfile.mkdtemp(prefix="task_")
        self.taskrc = os.path.join(self.datadir, "test.rc")

        # Ensure any instance is properly destroyed at session end
        atexit.register(lambda: self.destroy())

        self.reset_env()

        # Cannot call self.config until confirmation is disabled
        with open(self.taskrc, 'w') as rc:
            rc.write("data.location={0}\n"
                     "confirmation=no\n"
                     "hooks=off\n"
                     "".format(self.datadir))

        # Setup configuration to talk to taskd automatically
        if self.taskd is not None:
            self.bind_taskd_server(self.taskd)

        # Hooks disabled until requested
        self.hooks = None

    def __repr__(self):
        txt = super(Task, self).__repr__()
        return "{0} running from {1}>".format(txt[:-1], self.datadir)

    def __call__(self, *args, **kwargs):
        "aka t = Task() ; t() which is now an alias to t.runSuccess()"
        return self.runSuccess(*args, **kwargs)

    def activate_hooks(self):
        """Enable self.hooks functionality and activate hooks on config
        """
        self.config("hooks", "on")
        self.hooks = Hooks(self.datadir)

    def reset_env(self):
        """Set a new environment derived from the one used to launch the test
        """
        # Copy all env variables to avoid clashing subprocess environments
        self.env = os.environ.copy()

        # Make sure no TASKDDATA is isolated
        self.env["TASKDATA"] = self.datadir
        # As well as TASKRC
        self.env["TASKRC"] = self.taskrc

    def bind_taskd_server(self, taskd):
        """Configure the present task client to talk to given taskd server

        Note that this can be performed automatically by passing taskd when
        creating an instance of the current class.
        """
        self.taskd = taskd

        cert = os.path.join(self.taskd.certpath, "test_client.cert.pem")
        key = os.path.join(self.taskd.certpath, "test_client.key.pem")
        self.config("taskd.certificate", cert)
        self.config("taskd.key", key)
        self.config("taskd.ca", self.taskd.ca_cert)

        address = ":".join((self.taskd.address, str(self.taskd.port)))
        self.config("taskd.server", address)

        # Also configure the default user for given taskd server
        self.set_taskd_user()

    def set_taskd_user(self, taskd_user=None, default=True):
        """Assign a new user user to the present task client

        If default==False, a new user will be assigned instead of reusing the
        default taskd user for the corresponding instance.
        """
        if taskd_user is None:
            if default:
                user, group, org, userkey = self.taskd.default_user
            else:
                user, group, org, userkey = self.taskd.create_user()
        else:
            user, group, org, userkey = taskd_user

        credentials = "/".join((org, user, userkey))
        self.config("taskd.credentials", credentials)

        self.credentials = {
            "user": user,
            "group": group,
            "org": org,
            "userkey": userkey,
        }

    def config(self, var, value):
        """Run setup `var` as `value` in taskd config
        """
        # Add -- to avoid misinterpretation of - in things like UUIDs
        cmd = (self.taskw, "config", "--", var, value)
        return run_cmd_wait(cmd, env=self.env)

    @property
    def taskrc_content(self):
        """
        Returns the contents of the taskrc file.
        """

        with open(self.taskrc, "r") as f:
            return f.readlines()

    def runSuccess(self, args=(), input=None, merge_streams=False,
                   timeout=5):
        """Invoke task with given arguments and fail if exit code != 0

        Use runError if you want exit_code to be tested automatically and
        *not* fail if program finishes abnormally.

        If you wish to pass instructions to task such as confirmations or other
        input via stdin, you can do so by providing a input string.
        Such as input="y\ny".

        If merge_streams=True stdout and stderr will be merged into stdout.

        timeout = number of seconds the test will wait for every task call.
        Defaults to 1 second if not specified. Unit is seconds.

        Returns (exit_code, stdout, stderr) if merge_streams=False
                (exit_code, output) if merge_streams=True
        """
        # Create a copy of the command
        command = self._command[:]
        command.extend(args)

        output = run_cmd_wait_nofail(command, input,
                                     merge_streams=merge_streams,
                                     env=self.env,
                                     timeout=timeout)

        if output[0] != 0:
            raise CommandError(command, *output)

        return output

    def runError(self, args=(), input=None, merge_streams=False, timeout=5):
        """Invoke task with given arguments and fail if exit code == 0

        Use runSuccess if you want exit_code to be tested automatically and
        *fail* if program finishes abnormally.

        If you wish to pass instructions to task such as confirmations or other
        input via stdin, you can do so by providing a input string.
        Such as input="y\ny".

        If merge_streams=True stdout and stderr will be merged into stdout.

        timeout = number of seconds the test will wait for every task call.
        Defaults to 1 second if not specified. Unit is seconds.

        Returns (exit_code, stdout, stderr) if merge_streams=False
                (exit_code, output) if merge_streams=True
        """
        # Create a copy of the command
        command = self._command[:]
        command.extend(args)

        output = run_cmd_wait_nofail(command, input,
                                     merge_streams=merge_streams,
                                     env=self.env,
                                     timeout=timeout)

        # output[0] is the exit code
        if output[0] == 0 or output[0] is None:
            raise CommandError(command, *output)

        return output

    def destroy(self):
        """Cleanup the data folder and release server port for other instances
        """
        try:
            shutil.rmtree(self.datadir)
        except OSError as e:
            if e.errno == 2:
                # Directory no longer exists
                pass
            else:
                raise

        # Prevent future reuse of this instance
        self.runSuccess = self.__destroyed
        self.runError = self.__destroyed

        # self.destroy will get called when the python session closes.
        # If self.destroy was already called, turn the action into a noop
        self.destroy = lambda: None

    def __destroyed(self, *args, **kwargs):
        raise AttributeError("Task instance has been destroyed. "
                             "Create a new instance if you need a new client.")

    def diag(self, merge_streams_with=None):
        """Run task diagnostics.

        This function may fail in which case the exception text is returned as
        stderr or appended to stderr if merge_streams_with is set.

        If set, merge_streams_with should have the format:
        (exitcode, out, err)
        which should be the output of any previous process that failed.
        """
        try:
            output = self.runSuccess(("diag",))
        except CommandError as e:
            # If task diag failed add the error to stderr
            output = (e.code, None, str(e))

        if merge_streams_with is None:
            return output
        else:
            # Merge any given stdout and stderr with that of "task diag"
            code, out, err = merge_streams_with
            dcode, dout, derr = output

            # Merge stdout
            newout = "\n##### Debugging information (task diag): #####\n{0}"
            if dout is None:
                newout = newout.format("Not available, check STDERR")
            else:
                newout = newout.format(dout)

            if out is not None:
                newout = out + newout

            # And merge stderr
            newerr = "\n##### Debugging information (task diag): #####\n{0}"
            if derr is None:
                newerr = newerr.format("Not available, check STDOUT")
            else:
                newerr = newerr.format(derr)

            if err is not None:
                newerr = err + derr

            return code, newout, newerr

    def faketime(self, faketime=None):
        """Set a faketime using libfaketime that will affect the following
        command calls.

        If faketime is None, faketime settings will be disabled.
        """
        cmd = which("faketime")
        if cmd is None:
            raise unittest.SkipTest("libfaketime/faketime is not installed")

        if self._command[0] == cmd:
            self._command = self._command[3:]

        if faketime is not None:
            # Use advanced time format
            self._command = [cmd, "-f", faketime] + self._command

# vim: ai sts=4 et sw=4