File: hooks.py

package info (click to toggle)
taskd 1.1.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 1,576 kB
  • ctags: 1,141
  • sloc: cpp: 13,971; python: 1,523; sh: 1,080; perl: 610; ansic: 48; makefile: 21
file content (513 lines) | stat: -rw-r--r-- 17,491 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
# -*- coding: utf-8 -*-

from __future__ import division
import os
from sys import stderr
import shutil
import stat
try:
    import simplejson as json
except ImportError:
    import json

from datetime import datetime
from .utils import DEFAULT_HOOK_PATH
from .exceptions import HookError


class InvalidJSON(object):
    """Object representing the original unparsed JSON string and the JSON error
    """
    def __init__(self, original, error):
        self.original = original
        self.error = error


def json_decoder(string):
    """Attempt to decode a JSON string and in case of error return an
    InvalidJSON object
    """
    decoder = json.JSONDecoder().decode

    try:
        return decoder(string)
    except ValueError as e:
        return InvalidJSON(string, str(e))


class Hooks(object):
    """Abstraction to help interact with hooks (add, remove) during tests and
    keep track of which are active.
    """
    def __init__(self, datadir):
        """Initialize hooks container which keeps track of active hooks and

        :arg datadir: Temporary location where task is running (/tmp/...)
        """
        self.hookdir = os.path.join(datadir, "hooks")
        self._hooks = {}

        # Check if the hooks dir already exists
        if not os.path.isdir(self.hookdir):
            os.mkdir(self.hookdir)

    def __repr__(self):
        enabled = []
        disabled = []

        for hook in self:
            if hook.is_active():
                enabled.append(hook)
            else:
                disabled.append(hook)

        enabled = ", ".join(enabled) or None
        disabled = ", ".join(disabled) or None

        return "<Hooks: enabled: {0} | disabled: {1}>".format(enabled,
                                                              disabled)

    def __getitem__(self, name):
        return self._hooks[name]

    def __setitem__(self, key, value):
        self._hooks[key] = value

    def __delitem__(self, key):
        del self._hooks[key]

    def __iter__(self):
        for item in self._hooks:
            yield item

    def __len__(self):
        return len(self._hooks)

    def add(self, hookname, content, log=False):
        """Register hook with name 'hookname' and given file content.

        :arg hookname: Should be a string starting with one of:
            - on-launch
            - on-add
            - on-exit
            - on-modify

        :arg content: Content of the file as a (multi-line) string
        :arg log: If we require checking input/output of the hook
        """
        if log:
            self[hookname] = LoggedHook(hookname, self.hookdir, content)
        else:
            self[hookname] = Hook(hookname, self.hookdir, content)

        self[hookname].enable()

    def add_default(self, hookname, log=False):
        """Register a pre-built hook that exists in the folder containing hooks
        used for testing.
        If not explicitly passed hooks folder defaults to DEFAULT_HOOK_PATH

        :arg hookname: Name of the default hook
        :arg log: If we require checking input/output of the hook
        """
        if log:
            self[hookname] = LoggedHook(hookname, self.hookdir, default=True)
        else:
            self[hookname] = Hook(hookname, self.hookdir, default=True)

        # Finally enable this hook
        self[hookname].enable()

    def remove(self, hook):
        """Remove the hook matching given hookname"""
        try:
            hookname = hook.hookname
        except AttributeError:
            hookname = hook

        hook = self[hookname]

        try:
            del self[hookname]
        except KeyError:
            raise HookError("Hook {0} is not on record".format(hookname))

        hook._delete()

    def clear(self):
        """Remove all existing hooks and empty the hook registry
        """
        self._hooks = {}

        # Remove any existing hooks
        try:
            shutil.rmtree(self.hookdir)
        except OSError as e:
            # If the hookdir folder doesn't exist, no harm done and keep going
            if e.errno != 2:
                raise

        os.mkdir(self.hookdir)


class Hook(object):
    """Represents a hook script and provides methods to enable/disable hooks
    """
    def __init__(self, hookname, hookdir, content=None, default=False,
                 default_hookpath=None):
        """Initialize and create the hook

        This class supports creating hooks in two ways:
          * by specifying default=True in which case hookname will be
            searched on the hookpath and linked to the destination
          * by specifying content="some text" in which case the hook will be
            created with given content

        :arg hookname: Name of the hook e.g.: on-add.foobar
        :arg hookdir: Hooks directory under temporary task/ folder
        :arg content: What should be written to the hookfile
        :arg default: If True hookname is looked up on default_hookpath
        :arg default_hookpath: Default location where to look for preset hooks
        """
        self.hookname = hookname
        self.hookdir = hookdir
        self.hookfile = os.path.join(self.hookdir, self.hookname)

        if default_hookpath is None:
            self.default_hookpath = DEFAULT_HOOK_PATH
        else:
            self.default_hookpath = default_hookpath

        self._check_hook_type()
        self._check_hook_not_exists(self.hookfile)

        if not default and content is None:
            raise HookError("Cannot create hookfile {0} without content. "
                            "If using a builtin hook pass default=True"
                            .format(self.hookname))

        if os.path.isfile(self.hookfile):
            raise HookError("Hook with name {0} already exists. "
                            "Did you forget to remove() it before recreating?"
                            .format(self.hookname))

        if default:
            self.default_hookfile = os.path.join(self.default_hookpath,
                                                 self.hookname)
            self._check_hook_exists(self.default_hookfile)
            # Symlinks change permission of source file, cannot use one here
            shutil.copy(self.default_hookfile, self.hookfile)
        else:
            self.default_hookfile = None
            with open(self.hookfile, 'w') as fh:
                fh.write(content)

    def __eq__(self, other):
        try:
            if self.hookname == other.hookname:
                return True
        except AttributeError:
            pass

        return False

    def __ne__(self, other):
        try:
            if self.hookname != other.hookname:
                return True
        except AttributeError:
            pass

        return False

    def __hash__(self):
        return self.hookname.__hash__()

    def __repr__(self):
        return "<Hook '{0}'>".format(self.hookname)

    def __str__(self):
        return self.hookname

    def _check_hook_exists(self, hookfile):
        """Checks if the file pointed to by the current hook exists"""
        if not os.path.isfile(hookfile) and not os.path.islink(hookfile):
            raise HookError("Hook {0} doesn't exist.".format(hookfile))

    def _check_hook_not_exists(self, hookfile):
        """Checks if the file pointed to by the current hook doesn't exist"""
        try:
            self._check_hook_exists(hookfile)
        except HookError:
            return
        else:
            raise HookError("Hook {0} already exists.".format(hookfile))

    def _check_hook_type(self):
        """Check if the hookname is valid and if another hook with the same
        name was already created.
        """
        for hooktype in ("on-launch", "on-add", "on-exit", "on-modify"):
            if self.hookname.startswith(hooktype):
                break
        else:
            stderr.write("WARNING: {0} is not a valid hook type. "
                         "It will not be triggered\n".format(self.hookname))

    def _remove_file(self, file):
        try:
            os.remove(file)
        except OSError as e:
            if e.errno == 2:
                raise HookError("Hook with name {0} was not found on "
                                "hooks/ folder".format(file))
            else:
                raise

    def _delete(self):
        """Remove the hook from disk

        Don't call this method directly. Use Hooks.remove(hook) instead
        """
        self._remove_hookfile(self.hookfile)

    def enable(self):
        """Make hookfile executable to allow triggering
        """
        os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)

    def disable(self):
        """Remove hookfile executable bit to deny triggering
        """
        os.chmod(self.hookfile, stat.S_IREAD | stat.S_IWRITE)

    def is_active(self):
        """Check if hook is active by verifying the execute bit
        """
        return os.access(self.hookfile, os.X_OK)


class LoggedHook(Hook):
    """A variant of a Hook that allows checking that the hook was called, what
    was received via STDIN and what was answered to STDOUT
    """
    def __init__(self, *args, **kwargs):
        super(LoggedHook, self).__init__(*args, **kwargs)

        # The wrapper will replace the hookfile
        # The original file will be 'wrappedfile'

        # NOTE If the prefix "original_" is changed here, update wrapper.sh
        self.wrappedname = "original_" + self.hookname
        self.wrappedfile = os.path.join(self.hookdir, self.wrappedname)

        self.original_wrapper = os.path.join(self.default_hookpath,
                                             "wrapper.sh")

        self.hooklog_in = self.wrappedfile + ".log.in"
        self.hooklog_out = self.wrappedfile + ".log.out"

        # Cache is used to avoid parsing the logfiles everytime it's needed
        self._cache = {}

        # Setup wrapper pointing to the correct hook name
        self._setup_wrapper()

    def __repr__(self):
        return "<LoggedHook '{0}'>".format(self.hookname)

    def _delete(self):
        """Remove the hook from disk

        Don't call this method directly. Use Task.hooks.remove(hook) instead
        """
        super(LoggedHook, self)._delete()
        self._remove_file(self.wrappedfile)
        self._remove_file(self.hooklog_in)
        self._remove_file(self.hooklog_out)

    def _setup_wrapper(self):
        """Setup wrapper shell script to allow capturing input/output of hook
        """
        # Create empty hooklog to allow checking that hook executed
        open(self.hooklog_in, 'w').close()
        open(self.hooklog_out, 'w').close()

        # Rename the original hook to the name that will be used by wrapper
        self._check_hook_not_exists(self.wrappedfile)
        os.rename(self.hookfile, self.wrappedfile)

        # Symlinks change permission of source file, cannot use one here
        shutil.copy(self.original_wrapper, self.hookfile)

    def _get_log_stat(self):
        """Return the most recent change timestamp and size of both logfiles
        """
        stdin = os.stat(self.hooklog_in)
        stdout = os.stat(self.hooklog_out)

        last_change = max((stdin.st_mtime, stdout.st_mtime))
        return last_change, stdin.st_size, stdout.st_size

    def _use_cache(self):
        """Check if log files were changed since last check
        """
        try:
            last_change = self._cache["last_change"]
        except KeyError:
            # No cache available
            return False
        else:
            change = self._get_log_stat()

            if last_change != change:
                # Cache is outdated
                return False
            else:
                # Cache is up to date
                return True

    def enable(self):
        """Make hookfile executable to allow triggering
        """
        super(LoggedHook, self).enable()
        os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)

    def disable(self):
        """Remove hookfile executable bit to deny triggering
        """
        super(LoggedHook, self).disable()
        os.chmod(self.wrappedfile, stat.S_IREAD | stat.S_IWRITE)

    def is_active(self):
        """Check if hook is active by verifying the execute bit
        """
        parent_is_active = super(LoggedHook, self).disable()
        return parent_is_active and os.access(self.wrappedfile, os.X_OK)

    def get_logs(self):
        """Parse the logs generated by the hook and return a dictionary
        containing the logs collected with the wrapper in a python friendly
        format:
        * JSON is parsed as python dictionaries
        * timestamps are parsed as datetime objects

        It should look something like this:

        ## STDIN file
        % Called at 1414874711 with 'arg1 arg2 ...'
        {... JSON received by the hook ... }
        {... more JSON ...}

        ## STDOUT file
        {... JSON emitted by the hook ... }
        Logged messages
        {... more JSON ...}
        ! Exit code: 1
        """
        if self._use_cache():
            return self._cache["log"]

        log = {"calls": [],
               "input": {
                   "json": [],
                   },
               "output": {
                   "json": [],
                   "msgs": [],
                   },
               "exitcode": None,
               }

        with open(self.hooklog_in) as fh:
            for i, line in enumerate(fh):
                line = line.rstrip("\n")
                if line.startswith("%"):
                    tstamp, args = line.split(" with ")
                    # Timestamp includes nanosecond resolution
                    timestamp = tstamp.split(" ")[-1]
                    # convert timestamp to python datetime object
                    log["calls"].append({
                        "timestamp": datetime.fromtimestamp(float(timestamp)),
                        "args": args,
                    })
                elif line.startswith("{"):
                    # Decode json input (to hook)
                    log["input"]["json"].append(json_decoder(line))
                else:
                    raise IOError("Unexpected content on STDIN line {0}: {1}"
                                  .format(i, line))

        with open(self.hooklog_out) as fh:
            for line in fh:
                line = line.rstrip("\n")
                if line.startswith("!"):
                    exitcode = int(line.split(" ")[-1])
                    log["exitcode"] = exitcode
                elif line.startswith("{"):
                    # Decode json output (from hook)
                    log["output"]["json"].append(json_decoder(line))
                else:
                    log["output"]["msgs"].append(line)

        # NOTE convert all lists to tuples to prevent tampering?

        self._cache["log"] = log

        # Update last modification timestamp in cache
        self._cache["last_change"] = self._get_log_stat()

        return self._cache["log"]

    def assertTriggeredCount(self, count):
        """Check if current hook file was triggered/used by taskwarrior and
        how many times.
        """
        log = self.get_logs()

        assert len(log["calls"]) == count, ("{0} calls expected for {1} but "
                                            "found {2}".format(
                                                count,
                                                self.hookname,
                                                log["calls"]
                                            ))

    def assertExitcode(self, exitcode):
        """Check if current hook finished with the expected exit code
        """
        log = self.get_logs()

        assert log["exitcode"] == exitcode, ("Expected exit code {0} for {1} "
                                             "but found {2}".format(
                                                 exitcode,
                                                 self.hookname,
                                                 log["exitcode"]
                                             ))

    def assertValidJSONOutput(self):
        """Check if current hook output is valid JSON in all expected replies
        """
        log = self.get_logs()

        for i, out in enumerate(log["output"]["json"]):
            assert not isinstance(out, InvalidJSON), ("Invalid JSON found at "
                                                      "reply number {0} with "
                                                      "content {1}".format(
                                                          i + 1,
                                                          out.original
                                                      ))

    def assertInvalidJSONOutput(self):
        """Check if current hook output is invalid JSON in any expected reply
        """
        log = self.get_logs()

        for i, out in enumerate(log["output"]["json"]):
            assert isinstance(out, InvalidJSON), ("Valid JSON found at reply "
                                                  "number {0} with content "
                                                  "{1}".format(
                                                      i + 1,
                                                      out.original
                                                  ))

# vim: ai sts=4 et sw=4