File: async_process.py

package info (click to toggle)
neutron 2:13.0.2-15
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 30,764 kB
  • sloc: python: 188,554; sh: 1,060; makefile: 246
file content (272 lines) | stat: -rw-r--r-- 10,132 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# Copyright 2013 Red Hat, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import signal

import eventlet
import eventlet.event
import eventlet.queue
from neutron_lib.utils import helpers
from oslo_log import log as logging

from neutron._i18n import _
from neutron.agent.common import ip_lib
from neutron.agent.common import utils
from neutron.common import utils as common_utils


LOG = logging.getLogger(__name__)


class AsyncProcessException(Exception):
    pass


class AsyncProcess(object):
    """Manages an asynchronous process.

    This class spawns a new process via subprocess and uses
    greenthreads to read stderr and stdout asynchronously into queues
    that can be read via repeatedly calling iter_stdout() and
    iter_stderr().

    If respawn_interval is non-zero, any error in communicating with
    the managed process will result in the process and greenthreads
    being cleaned up and the process restarted after the specified
    interval.

    Example usage:

    >>> import time
    >>> proc = AsyncProcess(['ping'])
    >>> proc.start()
    >>> time.sleep(5)
    >>> proc.stop()
    >>> for line in proc.iter_stdout():
    ...     print(line)
    """

    def __init__(self, cmd, run_as_root=False, respawn_interval=None,
                 namespace=None, log_output=False, die_on_error=False):
        """Constructor.

        :param cmd: The list of command arguments to invoke.
        :param run_as_root: The process should run with elevated privileges.
        :param respawn_interval: Optional, the interval in seconds to wait
               to respawn after unexpected process death. Respawn will
               only be attempted if a value of 0 or greater is provided.
        :param namespace: Optional, start the command in the specified
               namespace.
        :param log_output: Optional, also log received output.
        :param die_on_error: Optional, kills the process on stderr output.
        """
        self.cmd_without_namespace = cmd
        self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
        self.run_as_root = run_as_root
        if respawn_interval is not None and respawn_interval < 0:
            raise ValueError(_('respawn_interval must be >= 0 if provided.'))
        self.respawn_interval = respawn_interval
        self._process = None
        self._pid = None
        self._is_running = False
        self._kill_event = None
        self._reset_queues()
        self._watchers = []
        self.log_output = log_output
        self.die_on_error = die_on_error

    @property
    def cmd(self):
        return ' '.join(self._cmd)

    def _reset_queues(self):
        self._stdout_lines = eventlet.queue.LightQueue()
        self._stderr_lines = eventlet.queue.LightQueue()

    def is_active(self):
        # If using sudo rootwrap as a root_helper, we have to wait until sudo
        # spawns rootwrap and rootwrap spawns the process. self.pid will make
        # sure to get the correct pid.
        return utils.pid_invoked_with_cmdline(
            self.pid, self.cmd_without_namespace)

    def start(self, block=False):
        """Launch a process and monitor it asynchronously.

        :param block: Block until the process has started.
        :raises utils.WaitTimeout if blocking is True and the process
                did not start in time.
        """
        LOG.debug('Launching async process [%s].', self.cmd)
        if self._is_running:
            raise AsyncProcessException(_('Process is already started'))
        else:
            self._spawn()

        if block:
            common_utils.wait_until_true(self.is_active)

    def stop(self, block=False, kill_signal=None):
        """Halt the process and watcher threads.

        :param block: Block until the process has stopped.
        :param kill_signal: Number of signal that will be sent to the process
                            when terminating the process
        :raises utils.WaitTimeout if blocking is True and the process
                did not stop in time.
        """
        kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM)
        if self._is_running:
            LOG.debug('Halting async process [%s].', self.cmd)
            self._kill(kill_signal)
        else:
            raise AsyncProcessException(_('Process is not running.'))

        if block:
            common_utils.wait_until_true(lambda: not self.is_active())

    def _spawn(self):
        """Spawn a process and its watchers."""
        self._is_running = True
        self._pid = None
        self._kill_event = eventlet.event.Event()
        self._process, cmd = utils.create_process(self._cmd,
                                                  run_as_root=self.run_as_root)
        self._watchers = []
        for reader in (self._read_stdout, self._read_stderr):
            # Pass the stop event directly to the greenthread to
            # ensure that assignment of a new event to the instance
            # attribute does not prevent the greenthread from using
            # the original event.
            watcher = eventlet.spawn(self._watch_process,
                                     reader,
                                     self._kill_event)
            self._watchers.append(watcher)

    @property
    def pid(self):
        if self._process:
            if not self._pid:
                self._pid = utils.get_root_helper_child_pid(
                    self._process.pid,
                    self.cmd_without_namespace,
                    run_as_root=self.run_as_root)
            return self._pid

    def _kill(self, kill_signal):
        """Kill the process and the associated watcher greenthreads."""
        pid = self.pid
        if pid:
            self._is_running = False
            self._pid = None
            self._kill_process(pid, kill_signal)

        # Halt the greenthreads if they weren't already.
        if self._kill_event:
            self._kill_event.send()
            self._kill_event = None

    def _kill_process(self, pid, kill_signal):
        try:
            # A process started by a root helper will be running as
            # root and need to be killed via the same helper.
            utils.kill_process(pid, kill_signal, self.run_as_root)
        except Exception:
            LOG.exception('An error occurred while killing [%s].',
                          self.cmd)
            return False

        if self._process:
            self._process.wait()
        return True

    def _handle_process_error(self):
        """Kill the async process and respawn if necessary."""
        stdout = list(self.iter_stdout())
        stderr = list(self.iter_stderr())
        LOG.debug('Halting async process [%s] in response to an error. stdout:'
                  ' [%s] - stderr: [%s]', self.cmd, stdout, stderr)
        self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM))
        if self.respawn_interval is not None and self.respawn_interval >= 0:
            eventlet.sleep(self.respawn_interval)
            LOG.debug('Respawning async process [%s].', self.cmd)
            try:
                self.start()
            except AsyncProcessException:
                # Process was already respawned by someone else...
                pass

    def _watch_process(self, callback, kill_event):
        while not kill_event.ready():
            try:
                output = callback()
                if not output and output != "":
                    break
            except Exception:
                LOG.exception('An error occurred while communicating '
                              'with async process [%s].', self.cmd)
                break
            # Ensure that watching a process with lots of output does
            # not block execution of other greenthreads.
            eventlet.sleep()
        # self._is_running being True indicates that the loop was
        # broken out of due to an error in the watched process rather
        # than the loop condition being satisfied.
        if self._is_running:
            self._is_running = False
            self._handle_process_error()

    def _read(self, stream, queue):
        data = stream.readline()
        if data:
            data = helpers.safe_decode_utf8(data.strip())
            queue.put(data)
            return data

    def _read_stdout(self):
        data = self._read(self._process.stdout, self._stdout_lines)
        if self.log_output:
            LOG.debug('Output received from [%(cmd)s]: %(data)s',
                      {'cmd': self.cmd,
                       'data': data})
        return data

    def _read_stderr(self):
        data = self._read(self._process.stderr, self._stderr_lines)
        if self.log_output:
            LOG.error('Error received from [%(cmd)s]: %(err)s',
                      {'cmd': self.cmd,
                       'err': data})
        if self.die_on_error:
            LOG.error("Process [%(cmd)s] dies due to the error: %(err)s",
                      {'cmd': self.cmd,
                       'err': data})
            # the callback caller will use None to indicate the need to bail
            # out of the thread
            return None

        return data

    def _iter_queue(self, queue, block):
        while True:
            try:
                yield queue.get(block=block)
            except eventlet.queue.Empty:
                break

    def iter_stdout(self, block=False):
        return self._iter_queue(self._stdout_lines, block)

    def iter_stderr(self, block=False):
        return self._iter_queue(self._stderr_lines, block)