1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
# Copyright (c) 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from multiprocessing import managers
from multiprocessing import util as mp_util
import threading
import time
import weakref
import oslo_rootwrap
from oslo_rootwrap import daemon
from oslo_rootwrap import jsonrpc
from oslo_rootwrap import subprocess
if oslo_rootwrap._patched_socket:
# We have to use slow version of recvall with eventlet because of a bug in
# GreenSocket.recv_into:
# https://bitbucket.org/eventlet/eventlet/pull-request/41
# This check happens here instead of jsonrpc to avoid importing eventlet
# from daemon code that is run with root privileges.
jsonrpc.JsonConnection.recvall = jsonrpc.JsonConnection._recvall_slow
try:
finalize = weakref.finalize
except AttributeError:
def finalize(obj, func, *args, **kwargs):
return mp_util.Finalize(obj, func, args=args, kwargs=kwargs,
exitpriority=0)
ClientManager = daemon.get_manager_class()
LOG = logging.getLogger(__name__)
SHUTDOWN_RETRIES = 3
class Client:
def __init__(self, rootwrap_daemon_cmd):
self._start_command = rootwrap_daemon_cmd
self._initialized = False
self._need_restart = False
self._mutex = threading.Lock()
self._manager = None
self._proxy = None
self._process = None
self._finalize = None
# This is for eventlet compatibility. multiprocessing stores
# daemon connection in ForkAwareLocal, so this won't be
# needed with the threading module.
self._exec_sem = threading.Lock()
def _initialize(self):
if self._process is not None and self._process.poll() is not None:
LOG.warning("Leaving behind already spawned process with pid %d, "
"root should kill it if it's still there (I can't)",
self._process.pid)
process_obj = subprocess.Popen(self._start_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True)
LOG.debug("Popen for %s command has been instantiated",
self._start_command)
self._process = process_obj
socket_path = process_obj.stdout.readline()[:-1]
# For Python 3 we need to convert bytes to str here
if not isinstance(socket_path, str):
socket_path = socket_path.decode('utf-8')
authkey = process_obj.stdout.read(32)
if process_obj.poll() is not None:
stderr = process_obj.stderr.read()
# NOTE(yorik-sar): don't expose stdout here
raise Exception("Failed to spawn rootwrap process.\nstderr:\n%s" %
(stderr,))
LOG.info("Spawned new rootwrap daemon process with pid=%d",
process_obj.pid)
def wait_process():
return_code = process_obj.wait()
LOG.info("Rootwrap daemon process exit with status: %d",
return_code)
reap_process = threading.Thread(target=wait_process)
reap_process.daemon = True
reap_process.start()
self._manager = ClientManager(socket_path, authkey)
self._manager.connect()
self._proxy = self._manager.rootwrap()
self._finalize = finalize(self, self._shutdown, self._process,
self._manager)
self._initialized = True
@staticmethod
def _shutdown(process, manager, JsonClient=jsonrpc.JsonClient):
# Storing JsonClient in arguments because globals are set to None
# before executing atexit routines in Python 2.x
if process.poll() is None:
LOG.info('Stopping rootwrap daemon process with pid=%s',
process.pid)
for _ in range(SHUTDOWN_RETRIES):
try:
manager.rootwrap().shutdown()
break
except (EOFError, OSError):
break # assume it is dead already
except RuntimeError:
time.sleep(0.2)
# We might want to wait for process to exit or kill it, but we
# can't provide sane timeout on 2.x and we most likely don't have
# permisions to do so
# Invalidate manager's state so that proxy won't try to do decref
manager._state.value = managers.State.SHUTDOWN
def _ensure_initialized(self):
with self._mutex:
if not self._initialized:
self._initialize()
def _restart(self, proxy):
with self._mutex:
if not self._initialized:
raise AssertionError("Client should be initialized.")
# Verify if someone has already restarted this.
if self._proxy is proxy:
self._finalize()
self._manager = None
self._proxy = None
self._initialized = False
self._initialize()
self._need_restart = False
return self._proxy
def _run_one_command(self, proxy, cmd, stdin):
"""Wrap proxy.run_one_command, setting _need_restart on an exception.
Usually it should be enough to drain stale data on socket
rather than to restart, but we cannot do draining easily.
"""
try:
_need_restart = True
res = proxy.run_one_command(cmd, stdin)
_need_restart = False
return res
finally:
if _need_restart:
self._need_restart = True
def execute(self, cmd, stdin=None):
with self._exec_sem:
self._ensure_initialized()
proxy = self._proxy
retry = False
if self._need_restart:
proxy = self._restart(proxy)
try:
res = self._run_one_command(proxy, cmd, stdin)
except (EOFError, OSError):
retry = True
# res can be None if we received final None sent by dying
# server thread instead of response to our
# request. Process is most likely to be dead at this
# point.
if retry or res is None:
proxy = self._restart(proxy)
res = self._run_one_command(proxy, cmd, stdin)
return res
|