File: local.py

package info (click to toggle)
ansible-core 2.19.0~beta6-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 32,628 kB
  • sloc: python: 180,313; cs: 4,929; sh: 4,601; xml: 34; makefile: 21
file content (290 lines) | stat: -rw-r--r-- 12,125 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
# (c) 2015, 2017 Toshio Kuratomi <tkuratomi@ansible.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import annotations

DOCUMENTATION = """
    name: local
    short_description: execute on controller
    description:
        - This connection plugin allows ansible to execute tasks on the Ansible 'controller' instead of on a remote host.
    author: ansible (@core)
    version_added: historical
    options:
        become_success_timeout:
            version_added: '2.19'
            type: int
            default: 10
            description:
                - Number of seconds to wait for become to succeed when enabled.
                - The default will be used if the configured value is less than 1.
            vars:
                - name: ansible_local_become_success_timeout
        become_strip_preamble:
            version_added: '2.19'
            type: bool
            default: true
            description:
                - Strip internal become output preceding command execution. Disable for additional diagnostics.
            vars:
                - name: ansible_local_become_strip_preamble
    extends_documentation_fragment:
        - connection_pipelining
    notes:
        - The remote user is ignored, the user with which the ansible CLI was executed is used instead.
"""

import functools
import getpass
import os
import pty
import selectors
import shutil
import subprocess
import time
import typing as t

import ansible.constants as C
from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleConnectionFailure
from ansible.module_utils.six import text_type, binary_type
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.plugins.connection import ConnectionBase
from ansible.utils.display import Display
from ansible.utils.path import unfrackpath

display = Display()


class Connection(ConnectionBase):
    """ Local based connections """

    transport = 'local'
    has_pipelining = True

    def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:

        super(Connection, self).__init__(*args, **kwargs)
        self.cwd = None
        try:
            self.default_user = getpass.getuser()
        except KeyError:
            display.vv("Current user (uid=%s) does not seem to exist on this system, leaving user empty." % os.getuid())
            self.default_user = ""

    def _connect(self) -> Connection:
        """ connect to the local host; nothing to do here """

        # Because we haven't made any remote connection we're running as
        # the local user, rather than as whatever is configured in remote_user.
        self._play_context.remote_user = self.default_user

        if not self._connected:
            display.vvv(u"ESTABLISH LOCAL CONNECTION FOR USER: {0}".format(self._play_context.remote_user), host=self._play_context.remote_addr)
            self._connected = True
        return self

    def exec_command(self, cmd: str, in_data: bytes | None = None, sudoable: bool = True) -> tuple[int, bytes, bytes]:
        """ run a command on the local host """

        super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)

        display.debug("in local.exec_command()")

        executable = C.DEFAULT_EXECUTABLE.split()[0] if C.DEFAULT_EXECUTABLE else None

        if not os.path.exists(to_bytes(executable, errors='surrogate_or_strict')):
            raise AnsibleError("failed to find the executable specified %s."
                               " Please verify if the executable exists and re-try." % executable)

        display.vvv(u"EXEC {0}".format(to_text(cmd)), host=self._play_context.remote_addr)
        display.debug("opening command with Popen()")

        if isinstance(cmd, (text_type, binary_type)):
            cmd = to_text(cmd)
        else:
            cmd = map(to_text, cmd)

        pty_primary = None
        stdin = subprocess.PIPE
        if sudoable and self.become and self.become.expect_prompt() and not self.get_option('pipelining'):
            # Create a pty if sudoable for privilege escalation that needs it.
            # Falls back to using a standard pipe if this fails, which may
            # cause the command to fail in certain situations where we are escalating
            # privileges or the command otherwise needs a pty.
            try:
                pty_primary, stdin = pty.openpty()
            except OSError as ex:
                display.debug(f"Unable to open pty: {ex}")

        p = subprocess.Popen(
            cmd,
            shell=isinstance(cmd, (text_type, binary_type)),
            executable=executable,
            cwd=self.cwd,
            stdin=stdin,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        # if we created a pty, we can close the other half of the pty now, otherwise primary is stdin
        if pty_primary is not None:
            os.close(stdin)

        display.debug("done running command with Popen()")

        become_stdout_bytes, become_stderr_bytes = self._ensure_become_success(p, pty_primary, sudoable)

        display.debug("getting output with communicate()")
        stdout, stderr = p.communicate(in_data)
        display.debug("done communicating")

        # preserve output from privilege escalation stage as `bytes`; it may contain actual output (eg `raw`) or error messages
        stdout = become_stdout_bytes + stdout
        stderr = become_stderr_bytes + stderr

        # finally, close the other half of the pty, if it was created
        if pty_primary:
            os.close(pty_primary)

        display.debug("done with local.exec_command()")
        return p.returncode, stdout, stderr

    def _ensure_become_success(self, p: subprocess.Popen, pty_primary: int, sudoable: bool) -> tuple[bytes, bytes]:
        """
        Ensure that become succeeds, returning a tuple containing stdout captured after success and all stderr.
        Returns immediately if `self.become` or `sudoable` are False, or `build_become_command` was not called, without performing any additional checks.
        """
        if not self.become or not sudoable or not self.become._id:  # _id is set by build_become_command, if it was not called, assume no become
            return b'', b''

        start_seconds = time.monotonic()
        become_stdout = bytearray()
        become_stderr = bytearray()
        last_stdout_prompt_offset = 0
        last_stderr_prompt_offset = 0

        # map the buffers to their associated stream for the selector reads
        become_capture = {
            p.stdout: become_stdout,
            p.stderr: become_stderr,
        }

        expect_password_prompt = self.become.expect_prompt()
        sent_password = False

        def become_error_msg(reason: str) -> str:
            error_message = f'{reason} waiting for become success'

            if expect_password_prompt and not sent_password:
                error_message += ' or become password prompt'

            error_message += '.'

            if become_stdout:
                error_message += f'\n>>> Standard Output\n{to_text(bytes(become_stdout))}'

            if become_stderr:
                error_message += f'\n>>> Standard Error\n{to_text(bytes(become_stderr))}'

            return error_message

        os.set_blocking(p.stdout.fileno(), False)
        os.set_blocking(p.stderr.fileno(), False)

        with selectors.DefaultSelector() as selector:
            selector.register(p.stdout, selectors.EVENT_READ, 'stdout')
            selector.register(p.stderr, selectors.EVENT_READ, 'stderr')

            while not self.become.check_success(become_stdout):
                if not selector.get_map():  # we only reach end of stream after all descriptors are EOF
                    raise AnsibleError(become_error_msg('Premature end of stream'))

                if expect_password_prompt and (
                    self.become.check_password_prompt(become_stdout[last_stdout_prompt_offset:]) or
                    self.become.check_password_prompt(become_stderr[last_stderr_prompt_offset:])
                ):
                    if sent_password:
                        raise AnsibleError(become_error_msg('Duplicate become password prompt encountered'))

                    last_stdout_prompt_offset = len(become_stdout)
                    last_stderr_prompt_offset = len(become_stderr)

                    password_to_send = to_bytes(self.become.get_option('become_pass') or '') + b'\n'

                    if pty_primary is None:
                        p.stdin.write(password_to_send)
                        p.stdin.flush()
                    else:
                        os.write(pty_primary, password_to_send)

                    sent_password = True

                remaining_timeout_seconds = self._become_success_timeout - (time.monotonic() - start_seconds)
                events = selector.select(remaining_timeout_seconds) if remaining_timeout_seconds > 0 else []

                if not events:
                    # ignoring remaining output after timeout to prevent hanging
                    raise AnsibleConnectionFailure(become_error_msg('Timed out'))

                # read all content (non-blocking) from streams that signaled available input and append to the associated buffer
                for key, event in events:
                    obj = t.cast(t.BinaryIO, key.fileobj)

                    if chunk := obj.read():
                        become_capture[obj] += chunk
                    else:
                        selector.unregister(obj)  # EOF on this obj, stop polling it

        os.set_blocking(p.stdout.fileno(), True)
        os.set_blocking(p.stderr.fileno(), True)

        become_stdout_bytes = bytes(become_stdout)
        become_stderr_bytes = bytes(become_stderr)

        if self.get_option('become_strip_preamble'):
            become_stdout_bytes = self.become.strip_become_success(self.become.strip_become_prompt(become_stdout_bytes))
            become_stderr_bytes = self.become.strip_become_prompt(become_stderr_bytes)

        return become_stdout_bytes, become_stderr_bytes

    @functools.cached_property
    def _become_success_timeout(self) -> int:
        """Timeout value for become success in seconds."""
        if (timeout := self.get_option('become_success_timeout')) < 1:
            timeout = C.config.get_config_default('become_success_timeout', plugin_type='connection', plugin_name='local')

        return timeout

    def put_file(self, in_path: str, out_path: str) -> None:
        """ transfer a file from local to local """

        super(Connection, self).put_file(in_path, out_path)

        in_path = unfrackpath(in_path, basedir=self.cwd)
        out_path = unfrackpath(out_path, basedir=self.cwd)

        display.vvv(u"PUT {0} TO {1}".format(in_path, out_path), host=self._play_context.remote_addr)
        if not os.path.exists(to_bytes(in_path, errors='surrogate_or_strict')):
            raise AnsibleFileNotFound("file or module does not exist: {0}".format(to_native(in_path)))
        try:
            shutil.copyfile(to_bytes(in_path, errors='surrogate_or_strict'), to_bytes(out_path, errors='surrogate_or_strict'))
        except shutil.Error:
            raise AnsibleError("failed to copy: {0} and {1} are the same".format(to_native(in_path), to_native(out_path)))
        except OSError as ex:
            raise AnsibleError(f"Failed to transfer file to {out_path!r}.") from ex

    def fetch_file(self, in_path: str, out_path: str) -> None:
        """ fetch a file from local to local -- for compatibility """

        super(Connection, self).fetch_file(in_path, out_path)

        display.vvv(u"FETCH {0} TO {1}".format(in_path, out_path), host=self._play_context.remote_addr)
        self.put_file(in_path, out_path)

    def reset(self) -> None:
        pass

    def close(self) -> None:
        """ terminate the connection; nothing to do here """
        self._connected = False