1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.
# Use of this source code is governed by MIT license that can be
# found in the LICENSE file.
"""Process utils."""
import os
import random
import sys
import time
from binascii import hexlify
try:
import multiprocessing
except ImportError:
multiprocessing = None
from .log import logger
_task_id = None
def cpu_count():
"""Returns the number of processors on this machine."""
if multiprocessing is None:
return 1
try:
return multiprocessing.cpu_count()
except NotImplementedError:
pass
try:
return os.sysconf("SC_NPROCESSORS_CONF")
except (AttributeError, ValueError):
pass
return 1
def _reseed_random():
# If os.urandom is available, this method does the same thing as
# random.seed. If os.urandom is not available, we mix in the pid in
# addition to a timestamp.
try:
seed = int(hexlify(os.urandom(16)), 16)
except NotImplementedError:
seed = int(time.time() * 1000) ^ os.getpid()
random.seed(seed)
def fork_processes(number, max_restarts=100):
"""Starts multiple worker processes.
If *number* is None or <= 0, we detect the number of cores available
on this machine and fork that number of child processes.
If *number* is given and > 0, we fork that specific number of
sub-processes.
Since we use processes and not threads, there is no shared memory
between any server code.
In each child process, *fork_processes* returns its *task id*, a
number between 0 and *number*. Processes that exit abnormally
(due to a signal or non-zero exit status) are restarted with the
same id (up to *max_restarts* times). In the parent process,
*fork_processes* returns None if all child processes have exited
normally, but will otherwise only exit by throwing an exception.
"""
assert _task_id is None
if number is None or number <= 0:
number = cpu_count()
logger.info("starting %d pre-fork processes", number)
children = {}
def start_child(i):
pid = os.fork()
if pid == 0:
# child process
_reseed_random()
global _task_id
_task_id = i
return i
else:
children[pid] = i
return None
for i in range(number):
id = start_child(i)
if id is not None:
return id
num_restarts = 0
while children:
try:
pid, status = os.wait()
except InterruptedError:
continue
if pid not in children:
continue
id = children.pop(pid)
if os.WIFSIGNALED(status):
logger.warning(
"child %d (pid %d) killed by signal %d, restarting",
id,
pid,
os.WTERMSIG(status),
)
elif os.WEXITSTATUS(status) != 0:
logger.warning(
"child %d (pid %d) exited with status %d, restarting",
id,
pid,
os.WEXITSTATUS(status),
)
else:
logger.info("child %d (pid %d) exited normally", id, pid)
continue
num_restarts += 1
if num_restarts > max_restarts:
raise RuntimeError("Too many child restarts, giving up")
new_id = start_child(id)
if new_id is not None:
return new_id
# All child processes exited cleanly, so exit the master process
# instead of just returning to right after the call to
# fork_processes (which will probably just start up another IOLoop
# unless the caller checks the return value).
sys.exit(0)
|