File: watchers.py

package info (click to toggle)
python-invoke 1.4.1%2Bds-0.1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,704 kB
  • sloc: python: 11,377; makefile: 18; sh: 12
file content (142 lines) | stat: -rw-r--r-- 4,974 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import re
import threading

from .exceptions import ResponseNotAccepted


class StreamWatcher(threading.local):
    """
    A class whose subclasses may act on seen stream data from subprocesses.

    Subclasses must exhibit the following API; see `Responder` for a concrete
    example.

    * ``__init__`` is completely up to each subclass, though as usual,
      subclasses *of* subclasses should be careful to make use of `super` where
      appropriate.
    * `submit` must accept the entire current contents of the stream being
      watched, as a Unicode string, and may optionally return an iterable of
      Unicode strings (or act as a generator iterator, i.e. multiple calls to
      ``yield <unicode string>``), which will each be written to the
      subprocess' standard input.

    .. note::
        `StreamWatcher` subclasses exist in part to enable state tracking, such
        as detecting when a submitted password didn't work & erroring (or
        prompting a user, or etc). Such bookkeeping isn't easily achievable
        with simple callback functions.

    .. note::
        `StreamWatcher` subclasses `threading.local` so that its instances can
        be used to 'watch' both subprocess stdout and stderr in separate
        threads.

    .. versionadded:: 1.0
    """

    def submit(self, stream):
        """
        Act on ``stream`` data, potentially returning responses.

        :param unicode stream:
            All data read on this stream since the beginning of the session.

        :returns:
            An iterable of Unicode strings (which may be empty).

        .. versionadded:: 1.0
        """
        raise NotImplementedError


class Responder(StreamWatcher):
    """
    A parameterizable object that submits responses to specific patterns.

    Commonly used to implement password auto-responds for things like ``sudo``.

    .. versionadded:: 1.0
    """

    def __init__(self, pattern, response):
        r"""
        Imprint this `Responder` with necessary parameters.

        :param pattern:
            A raw string (e.g. ``r"\[sudo\] password for .*:"``) which will be
            turned into a regular expression.

        :param response:
            The string to submit to the subprocess' stdin when ``pattern`` is
            detected.
        """
        # TODO: precompile the keys into regex objects
        self.pattern = pattern
        self.response = response
        self.index = 0

    def pattern_matches(self, stream, pattern, index_attr):
        """
        Generic "search for pattern in stream, using index" behavior.

        Used here and in some subclasses that want to track multiple patterns
        concurrently.

        :param unicode stream: The same data passed to ``submit``.
        :param unicode pattern: The pattern to search for.
        :param unicode index_attr: The name of the index attribute to use.
        :returns: An iterable of string matches.

        .. versionadded:: 1.0
        """
        # NOTE: generifies scanning so it can be used to scan for >1 pattern at
        # once, e.g. in FailingResponder.
        # Only look at stream contents we haven't seen yet, to avoid dupes.
        index = getattr(self, index_attr)
        new_ = stream[index:]
        # Search, across lines if necessary
        matches = re.findall(pattern, new_, re.S)
        # Update seek index if we've matched
        if matches:
            setattr(self, index_attr, index + len(new_))
        return matches

    def submit(self, stream):
        # Iterate over findall() response in case >1 match occurred.
        for _ in self.pattern_matches(stream, self.pattern, "index"):
            yield self.response


class FailingResponder(Responder):
    """
    Variant of `Responder` which is capable of detecting incorrect responses.

    This class adds a ``sentinel`` parameter to ``__init__``, and its
    ``submit`` will raise `.ResponseNotAccepted` if it detects that sentinel
    value in the stream.

    .. versionadded:: 1.0
    """

    def __init__(self, pattern, response, sentinel):
        super(FailingResponder, self).__init__(pattern, response)
        self.sentinel = sentinel
        self.failure_index = 0
        self.tried = False

    def submit(self, stream):
        # Behave like regular Responder initially
        response = super(FailingResponder, self).submit(stream)
        # Also check stream for our failure sentinel
        failed = self.pattern_matches(stream, self.sentinel, "failure_index")
        # Error out if we seem to have failed after a previous response.
        if self.tried and failed:
            err = 'Auto-response to r"{}" failed with {!r}!'.format(
                self.pattern, self.sentinel
            )
            raise ResponseNotAccepted(err)
        # Once we see that we had a response, take note
        if response:
            self.tried = True
        # Again, behave regularly by default.
        return response