1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
#
import base64
import errno
import struct
import os.path
import subprocess
from azurelinuxagent.common.future import ustr, bytebuffer
from azurelinuxagent.common.exception import CryptError
import azurelinuxagent.common.logger as logger
import azurelinuxagent.common.utils.shellutil as shellutil
DECRYPT_SECRET_CMD = "{0} cms -decrypt -inform DER -inkey {1} -in /dev/stdin"
class CryptUtil(object):
def __init__(self, openssl_cmd):
self.openssl_cmd = openssl_cmd
def gen_transport_cert(self, prv_file, crt_file):
"""
Create ssl certificate for https communication with endpoint server.
"""
cmd = [self.openssl_cmd, "req", "-x509", "-nodes", "-subj", "/CN=LinuxTransport",
"-days", "730", "-newkey", "rsa:2048", "-keyout", prv_file, "-out", crt_file]
try:
shellutil.run_command(cmd)
except shellutil.CommandError as cmd_err:
msg = "Failed to create {0} and {1} certificates.\n[stdout]\n{2}\n\n[stderr]\n{3}\n"\
.format(prv_file, crt_file, cmd_err.stdout, cmd_err.stderr)
logger.error(msg)
def get_pubkey_from_prv(self, file_name):
if not os.path.exists(file_name):
raise IOError(errno.ENOENT, "File not found", file_name)
# OpenSSL's pkey command may not be available on older versions so try 'rsa' first.
try:
command = [self.openssl_cmd, "rsa", "-in", file_name, "-pubout"]
return shellutil.run_command(command, log_error=False)
except shellutil.CommandError as error:
if not ("Not an RSA key" in error.stderr or "expecting an rsa key" in error.stderr):
logger.error(
"Command: [{0}], return code: [{1}], stdout: [{2}] stderr: [{3}]",
" ".join(command),
error.returncode,
error.stdout,
error.stderr)
raise
return shellutil.run_command([self.openssl_cmd, "pkey", "-in", file_name, "-pubout"], log_error=True)
def get_pubkey_from_crt(self, file_name):
if not os.path.exists(file_name):
raise IOError(errno.ENOENT, "File not found", file_name)
else:
cmd = [self.openssl_cmd, "x509", "-in", file_name, "-pubkey", "-noout"]
pub = shellutil.run_command(cmd, log_error=True)
return pub
def get_thumbprint_from_crt(self, file_name):
if not os.path.exists(file_name):
raise IOError(errno.ENOENT, "File not found", file_name)
else:
cmd = [self.openssl_cmd, "x509", "-in", file_name, "-fingerprint", "-noout"]
thumbprint = shellutil.run_command(cmd)
thumbprint = thumbprint.rstrip().split('=')[1].replace(':', '').upper()
return thumbprint
def decrypt_certificates_p7m(self, p7m_file, trans_prv_file, trans_cert_file, pfx_file):
umask = None
try:
umask = os.umask(0o077)
with open(pfx_file, "wb") as pfx_file_:
shellutil.run_command([self.openssl_cmd, "cms", "-decrypt", "-in", p7m_file, "-inkey", trans_prv_file, "-recip", trans_cert_file], stdout=pfx_file_)
finally:
if umask is not None:
os.umask(umask)
def convert_pfx_to_pem(self, pfx_file, nomacver, pem_file):
command = [self.openssl_cmd, "pkcs12", "-nodes", "-password", "pass:", "-in", pfx_file, "-out", pem_file]
if nomacver:
command.append("-nomacver")
shellutil.run_command(command)
def crt_to_ssh(self, input_file, output_file):
with open(output_file, "ab") as file_out:
cmd = ["ssh-keygen", "-i", "-m", "PKCS8", "-f", input_file]
try:
shellutil.run_command(cmd, stdout=file_out, log_error=True)
except shellutil.CommandError:
pass # nothing to do; the error is already logged
def asn1_to_ssh(self, pubkey):
lines = pubkey.split("\n")
lines = [x for x in lines if not x.startswith("----")]
base64_encoded = "".join(lines)
try:
#TODO remove pyasn1 dependency
from pyasn1.codec.der import decoder as der_decoder
der_encoded = base64.b64decode(base64_encoded)
der_encoded = der_decoder.decode(der_encoded)[0][1] # pylint: disable=unsubscriptable-object
key = der_decoder.decode(self.bits_to_bytes(der_encoded))[0]
n=key[0] # pylint: disable=unsubscriptable-object
e=key[1] # pylint: disable=unsubscriptable-object
keydata = bytearray()
keydata.extend(struct.pack('>I', len("ssh-rsa")))
keydata.extend(b"ssh-rsa")
keydata.extend(struct.pack('>I', len(self.num_to_bytes(e))))
keydata.extend(self.num_to_bytes(e))
keydata.extend(struct.pack('>I', len(self.num_to_bytes(n)) + 1))
keydata.extend(b"\0")
keydata.extend(self.num_to_bytes(n))
keydata_base64 = base64.b64encode(bytebuffer(keydata))
return ustr(b"ssh-rsa " + keydata_base64 + b"\n",
encoding='utf-8')
except ImportError:
raise CryptError("Failed to load pyasn1.codec.der")
def num_to_bytes(self, num):
"""
Pack number into bytes. Retun as string.
"""
result = bytearray()
while num:
result.append(num & 0xFF)
num >>= 8
result.reverse()
return result
def bits_to_bytes(self, bits):
"""
Convert an array contains bits, [0,1] to a byte array
"""
index = 7
byte_array = bytearray()
curr = 0
for bit in bits:
curr = curr | (bit << index)
index = index - 1
if index == -1:
byte_array.append(curr)
curr = 0
index = 7
return bytes(byte_array)
def decrypt_secret(self, encrypted_password, private_key):
try:
decoded = base64.b64decode(encrypted_password)
args = DECRYPT_SECRET_CMD.format(self.openssl_cmd, private_key).split(' ')
output = shellutil.run_command(args, input=decoded, stderr=subprocess.STDOUT, encode_input=False, encode_output=False)
return output.decode('utf-16')
except shellutil.CommandError as command_error:
raise subprocess.CalledProcessError(command_error.returncode, "openssl cms -decrypt", output=command_error.stdout)
except Exception as e:
raise CryptError("Error decoding secret", e)
|