1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
|
# Copyright 2019, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
As Mitogen separates asynchronous IO out to a broker thread, communication
necessarily involves context switching and waking that thread. When application
threads and the broker share a CPU, this can be almost invisibly fast - around
25 microseconds for a full A->B->A round-trip.
However when threads are scheduled on different CPUs, round-trip delays
regularly vary wildly, and easily into milliseconds. Many contributing factors
exist, not least scenarios like:
1. A is preempted immediately after waking B, but before releasing the GIL.
2. B wakes from IO wait only to immediately enter futex wait.
3. A may wait 10ms or more for another timeslice, as the scheduler on its CPU
runs threads unrelated to its transaction (i.e. not B), wake only to release
its GIL, before entering IO sleep waiting for a reply from B, which cannot
exist yet.
4. B wakes, acquires GIL, performs work, and sends reply to A, causing it to
wake. B is preempted before releasing GIL.
5. A wakes from IO wait only to immediately enter futex wait.
6. B may wait 10ms or more for another timeslice, wake only to release its GIL,
before sleeping again.
7. A wakes, acquires GIL, finally receives reply.
Per above if we are unlucky, on an even moderately busy machine it is possible
to lose milliseconds just in scheduling delay, and the effect is compounded
when pairs of threads in process A are communicating with pairs of threads in
process B using the same scheme, such as when Ansible WorkerProcess is
communicating with ContextService in the connection multiplexer. In the worst
case it could involve 4 threads working in lockstep spread across 4 busy CPUs.
Since multithreading in Python is essentially useless except for waiting on IO
due to the presence of the GIL, at least in Ansible there is no good reason for
threads in the same process to run on distinct CPUs - they always operate in
lockstep due to the GIL, and are thus vulnerable to issues like above.
Linux lacks any natural API to describe what we want, it only permits
individual threads to be constrained to run on specific CPUs, and for that
constraint to be inherited by new threads and forks of the constrained thread.
This module therefore implements a CPU pinning policy for Ansible processes,
providing methods that should be called early in any new process, either to
rebalance which CPU it is pinned to, or in the case of subprocesses, to remove
the pinning entirely. It is likely to require ongoing tweaking, since pinning
necessarily involves preventing the scheduler from making load balancing
decisions.
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import ctypes
import logging
import mmap
import multiprocessing
import os
import struct
import mitogen.parent
LOG = logging.getLogger(__name__)
try:
_libc = ctypes.CDLL(None, use_errno=True)
_strerror = _libc.strerror
_strerror.restype = ctypes.c_char_p
_sem_init = _libc.sem_init
_sem_wait = _libc.sem_wait
_sem_post = _libc.sem_post
_sched_setaffinity = _libc.sched_setaffinity
except (OSError, AttributeError):
_libc = None
_strerror = None
_sem_init = None
_sem_wait = None
_sem_post = None
_sched_setaffinity = None
class sem_t(ctypes.Structure):
"""
Wrap sem_t to allow storing a lock in shared memory.
"""
_fields_ = [
('data', ctypes.c_uint8 * 128),
]
def init(self):
if _sem_init(self.data, 1, 1):
raise Exception(_strerror(ctypes.get_errno()))
def acquire(self):
if _sem_wait(self.data):
raise Exception(_strerror(ctypes.get_errno()))
def release(self):
if _sem_post(self.data):
raise Exception(_strerror(ctypes.get_errno()))
class State(ctypes.Structure):
"""
Contents of shared memory segment. This allows :meth:`Manager.assign` to be
called from any child, since affinity assignment must happen from within
the context of the new child process.
"""
_fields_ = [
('lock', sem_t),
('counter', ctypes.c_uint8),
]
class Policy(object):
"""
Process affinity policy.
"""
def assign_controller(self):
"""
Assign the Ansible top-level policy to this process.
"""
def assign_muxprocess(self, index):
"""
Assign the MuxProcess policy to this process.
"""
def assign_worker(self):
"""
Assign the WorkerProcess policy to this process.
"""
def assign_subprocess(self):
"""
Assign the helper subprocess policy to this process.
"""
class FixedPolicy(Policy):
"""
:class:`Policy` for machines where the only control method available is
fixed CPU placement. The scheme here was tested on an otherwise idle 16
thread machine.
- The connection multiplexer is pinned to CPU 0.
- The Ansible top-level (strategy) is pinned to CPU 1.
- WorkerProcesses are pinned sequentually to 2..N, wrapping around when no
more CPUs exist.
- Children such as SSH may be scheduled on any CPU except 0/1.
If the machine has less than 4 cores available, the top-level and workers
are pinned between CPU 2..N, i.e. no CPU is reserved for the top-level
process.
This could at least be improved by having workers pinned to independent
cores, before reusing the second hyperthread of an existing core.
A hook is installed that causes :meth:`reset` to run in the child of any
process created with :func:`mitogen.parent.popen`, ensuring CPU-intensive
children like SSH are not forced to share the same core as the (otherwise
potentially very busy) parent.
"""
def __init__(self, cpu_count=None):
#: For tests.
self.cpu_count = cpu_count or multiprocessing.cpu_count()
self.mem = mmap.mmap(-1, 4096)
self.state = State.from_buffer(self.mem)
self.state.lock.init()
if self.cpu_count < 2:
# uniprocessor
self._reserve_mux = False
self._reserve_controller = False
self._reserve_mask = 0
self._reserve_shift = 0
elif self.cpu_count < 4:
# small SMP
self._reserve_mux = True
self._reserve_controller = False
self._reserve_mask = 1
self._reserve_shift = 1
else:
# big SMP
self._reserve_mux = True
self._reserve_controller = True
self._reserve_mask = 3
self._reserve_shift = 2
def _set_affinity(self, descr, mask):
if descr:
LOG.debug('CPU mask for %s: %#08x', descr, mask)
mitogen.parent._preexec_hook = self._clear
self._set_cpu_mask(mask)
def _balance(self, descr):
self.state.lock.acquire()
try:
n = self.state.counter
self.state.counter += 1
finally:
self.state.lock.release()
self._set_cpu(descr, self._reserve_shift + (
(n % (self.cpu_count - self._reserve_shift))
))
def _set_cpu(self, descr, cpu):
self._set_affinity(descr, 1 << (cpu % self.cpu_count))
def _clear(self):
all_cpus = (1 << self.cpu_count) - 1
self._set_affinity(None, all_cpus & ~self._reserve_mask)
def assign_controller(self):
if self._reserve_controller:
self._set_cpu('Ansible top-level process', 1)
else:
self._balance('Ansible top-level process')
def assign_muxprocess(self, index):
self._set_cpu('MuxProcess %d' % (index,), index)
def assign_worker(self):
self._balance('WorkerProcess')
def assign_subprocess(self):
self._clear()
class LinuxPolicy(FixedPolicy):
def _mask_to_bytes(self, mask):
"""
Convert the (type long) mask to a cpu_set_t.
"""
chunks = []
shiftmask = (2 ** 64) - 1
for x in range(16):
chunks.append(struct.pack('<Q', mask & shiftmask))
mask >>= 64
return b''.join(chunks)
def _get_thread_ids(self):
try:
ents = os.listdir('/proc/self/task')
except OSError:
LOG.debug('cannot fetch thread IDs for current process')
return [os.getpid()]
return [int(s) for s in ents if s.isdigit()]
def _set_cpu_mask(self, mask):
s = self._mask_to_bytes(mask)
for tid in self._get_thread_ids():
_sched_setaffinity(tid, len(s), s)
if _sched_setaffinity is not None:
policy = LinuxPolicy()
else:
policy = Policy()
|