1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
# -*- coding: utf-8 -*-
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Copyright (c), Michael DeHaan <michael.dehaan@gmail.com>, 2012-2013
#
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import hmac
import re
from ansible.module_utils.six.moves.urllib.parse import urlparse
try:
from hashlib import sha1
except ImportError:
import sha as sha1
HASHED_KEY_MAGIC = "|1|"
def is_ssh_url(url):
""" check if url is ssh """
if "@" in url and "://" not in url:
return True
for scheme in "ssh://", "git+ssh://", "ssh+git://":
if url.startswith(scheme):
return True
return False
def get_fqdn_and_port(repo_url):
""" chop the hostname and port out of a url """
fqdn = None
port = None
ipv6_re = re.compile(r'(\[[^]]*\])(?::([0-9]+))?')
if "@" in repo_url and "://" not in repo_url:
# most likely an user@host:path or user@host/path type URL
repo_url = repo_url.split("@", 1)[1]
match = ipv6_re.match(repo_url)
# For this type of URL, colon specifies the path, not the port
if match:
fqdn, path = match.groups()
elif ":" in repo_url:
fqdn = repo_url.split(":")[0]
elif "/" in repo_url:
fqdn = repo_url.split("/")[0]
elif "://" in repo_url:
# this should be something we can parse with urlparse
parts = urlparse(repo_url)
# parts[1] will be empty on python2.4 on ssh:// or git:// urls, so
# ensure we actually have a parts[1] before continuing.
if parts[1] != '':
fqdn = parts[1]
if "@" in fqdn:
fqdn = fqdn.split("@", 1)[1]
match = ipv6_re.match(fqdn)
if match:
fqdn, port = match.groups()
elif ":" in fqdn:
fqdn, port = fqdn.split(":")[0:2]
return fqdn, port
def check_hostkey(module, fqdn):
return not not_in_host_file(module, fqdn)
# this is a variant of code found in connection_plugins/paramiko.py and we should modify
# the paramiko code to import and use this.
def not_in_host_file(self, host):
if 'USER' in os.environ:
user_host_file = os.path.expandvars("~${USER}/.ssh/known_hosts")
else:
user_host_file = "~/.ssh/known_hosts"
user_host_file = os.path.expanduser(user_host_file)
host_file_list = [
user_host_file,
"/etc/ssh/ssh_known_hosts",
"/etc/ssh/ssh_known_hosts2",
"/etc/openssh/ssh_known_hosts",
]
hfiles_not_found = 0
for hf in host_file_list:
if not os.path.exists(hf):
hfiles_not_found += 1
continue
try:
with open(hf) as host_fh:
data = host_fh.read()
except IOError:
hfiles_not_found += 1
continue
for line in data.split("\n"):
if line is None or " " not in line:
continue
tokens = line.split()
if tokens[0].find(HASHED_KEY_MAGIC) == 0:
# this is a hashed known host entry
try:
(kn_salt, kn_host) = tokens[0][len(HASHED_KEY_MAGIC):].split("|", 2)
hash = hmac.new(kn_salt.decode('base64'), digestmod=sha1)
hash.update(host)
if hash.digest() == kn_host.decode('base64'):
return False
except Exception:
# invalid hashed host key, skip it
continue
else:
# standard host file entry
if host in tokens[0]:
return False
return True
def add_host_key(module, fqdn, port=22, key_type="rsa", create_dir=False):
""" use ssh-keyscan to add the hostkey """
keyscan_cmd = module.get_bin_path('ssh-keyscan', True)
if 'USER' in os.environ:
user_ssh_dir = os.path.expandvars("~${USER}/.ssh/")
user_host_file = os.path.expandvars("~${USER}/.ssh/known_hosts")
else:
user_ssh_dir = "~/.ssh/"
user_host_file = "~/.ssh/known_hosts"
user_ssh_dir = os.path.expanduser(user_ssh_dir)
if not os.path.exists(user_ssh_dir):
if create_dir:
try:
os.makedirs(user_ssh_dir, int('700', 8))
except Exception:
module.fail_json(msg="failed to create host key directory: %s" % user_ssh_dir)
else:
module.fail_json(msg="%s does not exist" % user_ssh_dir)
elif not os.path.isdir(user_ssh_dir):
module.fail_json(msg="%s is not a directory" % user_ssh_dir)
if port:
this_cmd = "%s -t %s -p %s %s" % (keyscan_cmd, key_type, port, fqdn)
else:
this_cmd = "%s -t %s %s" % (keyscan_cmd, key_type, fqdn)
rc, out, err = module.run_command(this_cmd)
# ssh-keyscan gives a 0 exit code and prints nothing on timeout
if rc != 0 or not out:
msg = 'failed to retrieve hostkey'
if not out:
msg += '. "%s" returned no matches.' % this_cmd
else:
msg += ' using command "%s". [stdout]: %s' % (this_cmd, out)
if err:
msg += ' [stderr]: %s' % err
module.fail_json(msg=msg)
module.append_to_file(user_host_file, out)
return rc, out, err
|