1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
|
import base64
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.hazmat.primitives.serialization import Encoding
from odoo import _, api, fields, models
from odoo.exceptions import UserError
STR_TO_HASH = {
'sha1': hashes.SHA1(),
'sha256': hashes.SHA256(),
}
STR_TO_CURVE = {
'SECP256R1': ec.SECP256R1(),
}
def _get_formatted_value(data, formatting='encodebytes'):
if formatting == 'encodebytes':
return base64.encodebytes(data)
elif formatting == 'base64':
return base64.b64encode(data)
else:
return data
def _int_to_bytes(value, byteorder='big'):
return value.to_bytes((value.bit_length() + 7) // 8, byteorder=byteorder)
class Key(models.Model):
_name = 'certificate.key'
_description = 'Cryptographic Keys'
name = fields.Char(string='Name', default="New key")
content = fields.Binary(string='Key file', required=True)
password = fields.Char(string='Private key password')
pem_key = fields.Binary(
string='Key bytes in PEM format',
compute='_compute_pem_key',
store=True,
)
public = fields.Boolean(
string='Public/Private key',
compute='_compute_pem_key',
store=True,
)
loading_error = fields.Text(
string='Loading error',
compute='_compute_pem_key',
store=True,
)
active = fields.Boolean(name='Active', help='Set active to false to archive the key.', default=True)
company_id = fields.Many2one(
comodel_name='res.company',
string='Company',
required=True,
default=lambda self: self.env.company,
ondelete='cascade',
)
@api.depends('content', 'password')
def _compute_pem_key(self):
for key in self:
content = key.with_context(bin_size=False).content
if not content:
key.pem_key = None
key.public = None
key.loading_error = ""
else:
pkey_content = base64.b64decode(content)
pkey_password = key.password.encode('utf-8') if key.password else None
# Try to load the key in different format starting with DER then PEM for private then public keys.
# If none succeeded, we report an error.
pkey = None
try:
pkey = serialization.load_der_private_key(pkey_content, pkey_password)
key.public = False
except (ValueError, TypeError):
pass
if not pkey:
try:
pkey = serialization.load_pem_private_key(pkey_content, pkey_password)
key.public = False
except (ValueError, TypeError):
pass
if not pkey:
try:
pkey = serialization.load_der_public_key(pkey_content)
key.public = True
except (ValueError, TypeError):
pass
if not pkey:
try:
pkey = serialization.load_pem_public_key(pkey_content)
key.public = True
except (ValueError, TypeError):
pass
if not pkey:
key.pem_key = None
key.public = None
key.loading_error = _("This key could not be loaded. Either its content or its password is erroneous.")
continue
if key.public:
key.pem_key = base64.b64encode(pkey.public_bytes(
encoding=Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
))
else:
key.pem_key = base64.b64encode(pkey.private_bytes(
encoding=Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
key.loading_error = ""
# -------------------------------------------------------
# Business Methods #
# -------------------------------------------------------
def _sign(self, message, hashing_algorithm='sha256', formatting='encodebytes'):
""" Return the base64 encoded signature of message. """
self.ensure_one()
if self.public:
raise UserError(_("Make sure to use a private key to sign documents."))
return self._sign_with_key(
message,
self.with_context(bin_size=False).pem_key,
pwd=None,
hashing_algorithm=hashing_algorithm,
formatting=formatting
)
def _get_public_key_numbers_bytes(self, formatting='encodebytes'):
self.ensure_one()
return self._numbers_public_key_bytes_with_key(
self._get_public_key_bytes(encoding='PEM'),
formatting=formatting,
)
def _get_public_key_bytes(self, encoding='der', formatting='encodebytes'):
self.ensure_one()
if self.public:
public_key = serialization.load_pem_public_key(base64.b64decode(self.with_context(bin_size=False).pem_key))
else:
public_key = serialization.load_pem_private_key(base64.b64decode(self.with_context(bin_size=False).pem_key), None).public_key()
encoding = serialization.Encoding.DER if encoding == 'der' else serialization.Encoding.PEM
return _get_formatted_value(
public_key.public_bytes(
encoding=encoding,
format=serialization.PublicFormat.SubjectPublicKeyInfo
),
formatting=formatting,
)
def _decrypt(self, message, hashing_algorithm='sha256'):
self.ensure_one()
if not isinstance(message, bytes):
message = message.encode('utf-8')
if self.public:
raise UserError(_("A private key is required to decrypt data."))
if hashing_algorithm not in STR_TO_HASH:
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
private_key = serialization.load_pem_private_key(base64.b64decode(self.pem_key), None)
if not isinstance(private_key, rsa.RSAPrivateKey):
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported for decryption: RSA.", type(private_key)))
return private_key.decrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=STR_TO_HASH[hashing_algorithm]),
algorithm=STR_TO_HASH[hashing_algorithm],
label=None
)
)
@api.model
def _sign_with_key(self, message, pem_key, pwd=None, hashing_algorithm='sha256', formatting='encodebytes'):
""" Return the base64 encoded signature of message. """
if not isinstance(message, bytes):
message = message.encode('utf-8')
if hashing_algorithm not in STR_TO_HASH:
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
try:
private_key = serialization.load_pem_private_key(base64.b64decode(pem_key), pwd)
except ValueError:
raise UserError(_("The private key could not be loaded."))
if isinstance(private_key, ec.EllipticCurvePrivateKey):
signature = private_key.sign(
message,
ec.ECDSA(STR_TO_HASH[hashing_algorithm])
)
elif isinstance(private_key, rsa.RSAPrivateKey):
signature = private_key.sign(
message,
padding.PKCS1v15(),
STR_TO_HASH[hashing_algorithm]
)
else:
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported for signature: EC and RSA.", type(private_key)))
return _get_formatted_value(signature, formatting=formatting)
@api.model
def _numbers_public_key_bytes_with_key(self, pem_key, formatting='encodebytes'):
try:
public_key = serialization.load_pem_public_key(base64.b64decode(pem_key))
except ValueError:
raise UserError(_("The public key could not be loaded."))
if isinstance(public_key, ec.EllipticCurvePublicKey):
e = public_key.public_numbers().x
n = public_key.public_numbers().y
elif isinstance(public_key, rsa.RSAPublicKey):
e = public_key.public_numbers().e
n = public_key.public_numbers().n
else:
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported: EC, RSA.", type(public_key)))
return (
_get_formatted_value(_int_to_bytes(e), formatting=formatting),
_get_formatted_value(_int_to_bytes(n), formatting=formatting)
)
@api.model
def _generate_ec_private_key(self, company, name='id_ec', curve='SECP256R1'):
if curve not in STR_TO_CURVE:
raise UserError(f"Unsupported curve algorithm '{curve}'. Currently supported: SECP256R1.")
private_key = ec.generate_private_key(STR_TO_CURVE[curve])
return self.env['certificate.key'].create({
'name': name,
'content': base64.b64encode(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())),
'company_id': company.id,
})
@api.model
def _generate_rsa_private_key(self, company, name='id_rsa', public_exponent=65537, key_size=2048):
private_key = rsa.generate_private_key(
public_exponent=public_exponent,
key_size=key_size
)
return self.env['certificate.key'].create({
'name': name,
'content': base64.b64encode(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())),
'company_id': company.id,
})
|