File: util.py

package info (click to toggle)
python-certbot 0.10.2-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 1,548 kB
  • sloc: python: 12,428; makefile: 175; sh: 44
file content (163 lines) | stat: -rw-r--r-- 5,598 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""Plugin utilities."""
import logging
import os
import socket

import zope.component

from acme import errors as acme_errors
from acme import util as acme_util

from certbot import interfaces
from certbot import util

PSUTIL_REQUIREMENT = "psutil>=2.2.1"

try:
    acme_util.activate(PSUTIL_REQUIREMENT)
    import psutil  # pragma: no cover
    USE_PSUTIL = True
except acme_errors.DependencyError:  # pragma: no cover
    USE_PSUTIL = False

logger = logging.getLogger(__name__)

RENEWER_EXTRA_MSG = (
    " For automated renewal, you may want to use a script that stops"
    " and starts your webserver. You can find an example at"
    " https://certbot.eff.org/docs/using.html#renewal ."
    " Alternatively you can use the webroot plugin to renew without"
    " needing to stop and start your webserver.")


def path_surgery(cmd):
    """Attempt to perform PATH surgery to find cmd

    Mitigates https://github.com/certbot/certbot/issues/1833

    :param str cmd: the command that is being searched for in the PATH

    :returns: True if the operation succeeded, False otherwise
    """
    dirs = ("/usr/sbin", "/usr/local/bin", "/usr/local/sbin")
    path = os.environ["PATH"]
    added = []
    for d in dirs:
        if d not in path:
            path += os.pathsep + d
            added.append(d)

    if any(added):
        logger.debug("Can't find %s, attempting PATH mitigation by adding %s",
                     cmd, os.pathsep.join(added))
        os.environ["PATH"] = path

    if util.exe_exists(cmd):
        return True
    else:
        expanded = " expanded" if any(added) else ""
        logger.warning("Failed to find %s in%s PATH: %s", cmd,
                       expanded, path)
        return False


def already_listening(port, renewer=False):
    """Check if a process is already listening on the port.

    If so, also tell the user via a display notification.

    .. warning::
        On some operating systems, this function can only usefully be
        run as root.

    :param int port: The TCP port in question.
    :returns: True or False.

    """

    if USE_PSUTIL:
        return already_listening_psutil(port, renewer=renewer)
    else:
        logger.debug("Psutil not found, using simple socket check.")
        return already_listening_socket(port, renewer=renewer)


def already_listening_socket(port, renewer=False):
    """Simple socket based check to find out if port is already in use

    :param int port: The TCP port in question.
    :returns: True or False
    """

    try:
        testsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        testsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        try:
            testsocket.bind(("", port))
        except socket.error:
            display = zope.component.getUtility(interfaces.IDisplay)
            extra = ""
            if renewer:
                extra = RENEWER_EXTRA_MSG
            display.notification(
                "Port {0} is already in use by another process. This will "
                "prevent us from binding to that port. Please stop the "
                "process that is populating the port in question and try "
                "again. {1}".format(port, extra), force_interactive=True)
            return True
        finally:
            testsocket.close()
    except socket.error:
        pass
    return False


def already_listening_psutil(port, renewer=False):
    """Psutil variant of the open port check

    :param int port: The TCP port in question.
    :returns: True or False.

    """
    try:
        net_connections = psutil.net_connections()
    except psutil.AccessDenied as error:
        logger.info("Access denied when trying to list network "
                    "connections: %s. Are you root?", error)
        # this function is just a pre-check that often causes false
        # positives and problems in testing (c.f. #680 on Mac, #255
        # generally); we will fail later in bind() anyway
        return False

    listeners = [conn.pid for conn in net_connections
                 if conn.status == 'LISTEN' and
                 conn.type == socket.SOCK_STREAM and
                 conn.laddr[1] == port]
    try:
        if listeners and listeners[0] is not None:
            # conn.pid may be None if the current process doesn't have
            # permission to identify the listening process!  Additionally,
            # listeners may have more than one element if separate
            # sockets have bound the same port on separate interfaces.
            # We currently only have UI to notify the user about one
            # of them at a time.
            pid = listeners[0]
            name = psutil.Process(pid).name()
            display = zope.component.getUtility(interfaces.IDisplay)
            extra = ""
            if renewer:
                extra = RENEWER_EXTRA_MSG
            display.notification(
                "The program {0} (process ID {1}) is already listening "
                "on TCP port {2}. This will prevent us from binding to "
                "that port. Please stop the {0} program temporarily "
                "and then try again.{3}".format(name, pid, port, extra),
                force_interactive=True)
            return True
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        # Perhaps the result of a race where the process could have
        # exited or relinquished the port (NoSuchProcess), or the result
        # of an OS policy where we're not allowed to look up the process
        # name (AccessDenied).
        pass
    return False