1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
"""Plugin utilities."""
import logging
import os
import socket
import zope.component
from acme import errors as acme_errors
from acme import util as acme_util
from certbot import interfaces
from certbot import util
PSUTIL_REQUIREMENT = "psutil>=2.2.1"
try:
acme_util.activate(PSUTIL_REQUIREMENT)
import psutil # pragma: no cover
USE_PSUTIL = True
except acme_errors.DependencyError: # pragma: no cover
USE_PSUTIL = False
logger = logging.getLogger(__name__)
RENEWER_EXTRA_MSG = (
" For automated renewal, you may want to use a script that stops"
" and starts your webserver. You can find an example at"
" https://certbot.eff.org/docs/using.html#renewal ."
" Alternatively you can use the webroot plugin to renew without"
" needing to stop and start your webserver.")
def path_surgery(cmd):
"""Attempt to perform PATH surgery to find cmd
Mitigates https://github.com/certbot/certbot/issues/1833
:param str cmd: the command that is being searched for in the PATH
:returns: True if the operation succeeded, False otherwise
"""
dirs = ("/usr/sbin", "/usr/local/bin", "/usr/local/sbin")
path = os.environ["PATH"]
added = []
for d in dirs:
if d not in path:
path += os.pathsep + d
added.append(d)
if any(added):
logger.debug("Can't find %s, attempting PATH mitigation by adding %s",
cmd, os.pathsep.join(added))
os.environ["PATH"] = path
if util.exe_exists(cmd):
return True
else:
expanded = " expanded" if any(added) else ""
logger.warning("Failed to find %s in%s PATH: %s", cmd,
expanded, path)
return False
def already_listening(port, renewer=False):
"""Check if a process is already listening on the port.
If so, also tell the user via a display notification.
.. warning::
On some operating systems, this function can only usefully be
run as root.
:param int port: The TCP port in question.
:returns: True or False.
"""
if USE_PSUTIL:
return already_listening_psutil(port, renewer=renewer)
else:
logger.debug("Psutil not found, using simple socket check.")
return already_listening_socket(port, renewer=renewer)
def already_listening_socket(port, renewer=False):
"""Simple socket based check to find out if port is already in use
:param int port: The TCP port in question.
:returns: True or False
"""
try:
testsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
testsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
testsocket.bind(("", port))
except socket.error:
display = zope.component.getUtility(interfaces.IDisplay)
extra = ""
if renewer:
extra = RENEWER_EXTRA_MSG
display.notification(
"Port {0} is already in use by another process. This will "
"prevent us from binding to that port. Please stop the "
"process that is populating the port in question and try "
"again. {1}".format(port, extra), force_interactive=True)
return True
finally:
testsocket.close()
except socket.error:
pass
return False
def already_listening_psutil(port, renewer=False):
"""Psutil variant of the open port check
:param int port: The TCP port in question.
:returns: True or False.
"""
try:
net_connections = psutil.net_connections()
except psutil.AccessDenied as error:
logger.info("Access denied when trying to list network "
"connections: %s. Are you root?", error)
# this function is just a pre-check that often causes false
# positives and problems in testing (c.f. #680 on Mac, #255
# generally); we will fail later in bind() anyway
return False
listeners = [conn.pid for conn in net_connections
if conn.status == 'LISTEN' and
conn.type == socket.SOCK_STREAM and
conn.laddr[1] == port]
try:
if listeners and listeners[0] is not None:
# conn.pid may be None if the current process doesn't have
# permission to identify the listening process! Additionally,
# listeners may have more than one element if separate
# sockets have bound the same port on separate interfaces.
# We currently only have UI to notify the user about one
# of them at a time.
pid = listeners[0]
name = psutil.Process(pid).name()
display = zope.component.getUtility(interfaces.IDisplay)
extra = ""
if renewer:
extra = RENEWER_EXTRA_MSG
display.notification(
"The program {0} (process ID {1}) is already listening "
"on TCP port {2}. This will prevent us from binding to "
"that port. Please stop the {0} program temporarily "
"and then try again.{3}".format(name, pid, port, extra),
force_interactive=True)
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Perhaps the result of a race where the process could have
# exited or relinquished the port (NoSuchProcess), or the result
# of an OS policy where we're not allowed to look up the process
# name (AccessDenied).
pass
return False
|