File: _ssh_askpass.py

package info (click to toggle)
ansible-core 2.19.0~beta6-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 32,628 kB
  • sloc: python: 180,313; cs: 4,929; sh: 4,601; xml: 34; makefile: 21
file content (47 lines) | stat: -rw-r--r-- 1,365 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Copyright: Contributors to the Ansible project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations

import json
import os
import re
import sys
import typing as t
from multiprocessing.shared_memory import SharedMemory

HOST_KEY_RE = re.compile(
    r'(The authenticity of host |differs from the key for the IP address)',
)


def main() -> t.Never:
    try:
        if HOST_KEY_RE.search(sys.argv[1]):
            sys.stdout.buffer.write(b'no')
            sys.stdout.flush()
            sys.exit(0)
    except IndexError:
        pass

    kwargs: dict[str, bool] = {}
    if sys.version_info[:2] >= (3, 13):
        # deprecated: description='unneeded due to track argument for SharedMemory' python_version='3.12'
        kwargs['track'] = False
    try:
        shm = SharedMemory(name=os.environ['_ANSIBLE_SSH_ASKPASS_SHM'], **kwargs)
    except FileNotFoundError:
        # We must be running after the ansible fork is shutting down
        sys.exit(1)
    cfg = json.loads(shm.buf.tobytes().rstrip(b'\x00'))

    try:
        if cfg['prompt'] not in sys.argv[1]:
            sys.exit(1)
    except IndexError:
        sys.exit(1)

    sys.stdout.buffer.write(cfg['password'].encode('utf-8'))
    sys.stdout.flush()
    shm.buf[:] = b'\x00' * shm.size
    shm.close()
    sys.exit(0)