1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
|
import base64
import datetime
from importlib import metadata
from cryptography import x509
from cryptography.hazmat.primitives import constant_time, serialization
from cryptography.hazmat.primitives.serialization import Encoding, pkcs12
from odoo import _, api, fields, models
from .key import STR_TO_HASH, _get_formatted_value
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.tools import parse_version
class Certificate(models.Model):
_name = 'certificate.certificate'
_description = 'Certificate'
_order = 'date_end DESC'
_check_company_auto = True
name = fields.Char(string='Name')
content = fields.Binary(string='Certificate', readonly=False, required=True)
pkcs12_password = fields.Char(string='Certificate Password', help='Password to decrypt the PKS file.')
private_key_id = fields.Many2one(
string='Private Key',
comodel_name='certificate.key',
check_company=True,
domain=[('public', '=', False)],
compute='_compute_private_key',
store=True,
readonly=False,
)
public_key_id = fields.Many2one(
string='Public Key',
comodel_name='certificate.key',
check_company=True,
domain=[('public', '=', True)],
help="""Used to set a public key in case the one self-contained in the certificate is erroneus.
When a public key is set this way, it will be used instead of the one in the certificate.
""",
)
scope = fields.Selection(
string="Certificate scope",
selection=[
('general', 'General'),
],
)
content_format = fields.Selection(
selection=[
('der', 'DER'),
('pem', 'PEM'),
('pkcs12', 'PKCS12'),
],
string='Original certificate format',
compute='_compute_pem_certificate',
store=True,
)
pem_certificate = fields.Binary(
string='Certificate in PEM format',
compute='_compute_pem_certificate',
store=True,
)
subject_common_name = fields.Char(
string='Subject Name',
compute='_compute_pem_certificate',
store=True,
)
serial_number = fields.Char(
string='Serial number',
help='The serial number to add to electronic documents',
compute='_compute_pem_certificate',
store=True,
)
date_start = fields.Datetime(
string='Available date',
help='The date on which the certificate starts to be valid (UTC)',
compute='_compute_pem_certificate',
store=True,
)
date_end = fields.Datetime(
string='Expiration date',
help='The date on which the certificate expires (UTC)',
compute='_compute_pem_certificate',
store=True,
)
loading_error = fields.Text(string='Loading error', compute='_compute_pem_certificate', store=True)
is_valid = fields.Boolean(string='Valid', compute='_compute_is_valid', search='_search_is_valid')
active = fields.Boolean(name='Active', help='Set active to false to archive the certificate', default=True)
company_id = fields.Many2one(
comodel_name='res.company',
string='Company',
required=True,
default=lambda self: self.env.company,
ondelete='cascade',
)
country_code = fields.Char(related='company_id.country_code', depends=['company_id'])
@api.depends('pem_certificate')
def _compute_private_key(self):
attachments = self.env['ir.attachment'].search([
('res_model', '=', 'certificate.key'),
('res_field', '=', 'content'),
('res_id', 'in', self.ids)
])
content_to_key_id = {(att.datas, att.company_id.id): att.res_id for att in attachments}
for certificate in self:
if not certificate.pem_certificate:
certificate.private_key_id = None
continue
if certificate.private_key_id:
continue
# Create the private key in case of PKCS12 File and no private key is set
if certificate.content_format == 'pkcs12':
content = certificate.with_context(bin_size=False).content
pkcs12_password = certificate.pkcs12_password.encode('utf-8') if certificate.pkcs12_password else None
key, _cert, _additional_certs = pkcs12.load_key_and_certificates(base64.b64decode(content), pkcs12_password)
if key:
pem_key = base64.b64encode(key.private_bytes(
encoding=Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
key_id = content_to_key_id.get((pem_key, certificate.company_id.id))
if not key_id:
key_id = self.env['certificate.key'].create({
'name': (certificate.subject_common_name or certificate.name or "") + ".key",
'content': pem_key,
'company_id': certificate.company_id.id,
})
certificate.private_key_id = key_id
@api.depends('content', 'pkcs12_password')
def _compute_pem_certificate(self):
for certificate in self:
content = certificate.with_context(bin_size=False).content
if not content:
certificate.pem_certificate = None
certificate.subject_common_name = None
certificate.content_format = None
certificate.date_start = None
certificate.date_end = None
certificate.serial_number = None
certificate.loading_error = ""
else:
content = base64.b64decode(content)
cert = None
# Try to load the certificate in different format starting with DER then PKCS12 and
# finally PEM. If none succeeded, we report an error.
try:
cert = x509.load_der_x509_certificate(content)
certificate.content_format = 'der'
except ValueError:
pass
if not cert:
try:
pkcs12_password = certificate.pkcs12_password.encode('utf-8') if certificate.pkcs12_password else None
_key, cert, _additional_certs = pkcs12.load_key_and_certificates(content, pkcs12_password)
certificate.content_format = 'pkcs12'
except ValueError:
pass
if not cert:
try:
cert = x509.load_pem_x509_certificate(content)
certificate.content_format = 'pem'
except ValueError:
pass
if not cert:
certificate.pem_certificate = None
certificate.subject_common_name = None
certificate.content_format = None
certificate.date_start = None
certificate.date_end = None
certificate.serial_number = None
certificate.loading_error = _("This certificate could not be loaded. Either the content or the password is erroneous.")
continue
try:
common_name = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
certificate.subject_common_name = common_name[0].value if common_name else ""
except ValueError:
certificate.subject_common_name = None
certificate.loading_error = ""
# Extract certificate data
certificate.pem_certificate = base64.b64encode(cert.public_bytes(Encoding.PEM))
certificate.serial_number = cert.serial_number
if parse_version(metadata.version('cryptography')) < parse_version('42.0.0'):
certificate.date_start = cert.not_valid_before
certificate.date_end = cert.not_valid_after
else:
certificate.date_start = cert.not_valid_before_utc.replace(tzinfo=None)
certificate.date_end = cert.not_valid_after_utc.replace(tzinfo=None)
@api.depends('date_start', 'date_end', 'loading_error')
def _compute_is_valid(self):
# Certificate dates are UTC timezoned
# https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Certificate.not_valid_after
utc_now = datetime.datetime.now(datetime.timezone.utc)
for certificate in self:
if not certificate.date_start or not certificate.date_end or certificate.loading_error:
certificate.is_valid = False
else:
date_start = certificate.date_start.replace(tzinfo=datetime.timezone.utc)
date_end = certificate.date_end.replace(tzinfo=datetime.timezone.utc)
certificate.is_valid = date_start <= utc_now <= date_end
def _search_is_valid(self, operator, value):
if operator not in ['=', '!='] or not isinstance(value, bool):
raise NotImplementedError("Operation not supported, only '=' and '!=' are allowed.")
utc_now = datetime.datetime.now(datetime.timezone.utc)
if (operator == '=' and value) or (operator == '!=' and not value):
return [
('pem_certificate', '!=', False),
('date_start', '<=', utc_now),
('date_end', '>=', utc_now),
('loading_error', '=', '')
]
else:
return expression.OR([
[('pem_certificate', '=', False)],
[('date_start', '=', False)],
[('date_end', '=', False)],
[('date_start', '>', utc_now)],
[('date_end', '<', utc_now)],
[('loading_error', '!=', '')],
])
@api.constrains('pem_certificate', 'private_key_id', 'public_key_id')
def _constrains_certificate_key_compatibility(self):
for certificate in self:
pem_certificate = certificate.with_context(bin_size=False).pem_certificate
if pem_certificate:
cert = x509.load_pem_x509_certificate(base64.b64decode(pem_certificate))
cert_public_key_bytes = cert.public_key().public_bytes(
encoding=Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
if certificate.private_key_id:
if certificate.private_key_id.loading_error:
raise UserError(certificate.private_key_id.loading_error)
pkey_public_key_bytes = base64.b64decode(
certificate.private_key_id._get_public_key_bytes(encoding='pem')
)
if not constant_time.bytes_eq(pkey_public_key_bytes, cert_public_key_bytes):
raise UserError(_("The certificate and private key are not compatible."))
if certificate.public_key_id:
if certificate.public_key_id.loading_error:
raise UserError(certificate.public_key_id.loading_error)
pkey_public_key_bytes = base64.b64decode(
certificate.public_key_id._get_public_key_bytes(encoding='pem')
)
if not constant_time.bytes_eq(pkey_public_key_bytes, cert_public_key_bytes):
raise UserError(_("The certificate and public key are not compatible."))
# -------------------------------------------------------
# Business Methods #
# -------------------------------------------------------
def _get_der_certificate_bytes(self, formatting='encodebytes'):
self.ensure_one()
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
return _get_formatted_value(cert.public_bytes(serialization.Encoding.DER), formatting=formatting)
def _get_fingerprint_bytes(self, hashing_algorithm='sha256', formatting='encodebytes'):
self.ensure_one()
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
if hashing_algorithm not in STR_TO_HASH:
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
return _get_formatted_value(cert.fingerprint(STR_TO_HASH[hashing_algorithm]), formatting=formatting)
def _get_signature_bytes(self, formatting='encodebytes'):
self.ensure_one()
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
return _get_formatted_value(cert.signature, formatting=formatting)
def _get_public_key_numbers_bytes(self, formatting='encodebytes'):
self.ensure_one()
if self.public_key_id or self.private_key_id:
return (self.public_key_id or self.private_key_id)._get_public_key_numbers_bytes(formatting=formatting)
# When no keys are set to the certificate, use the self-contained public key from the content
return self.env['certificate.key']._get_public_key_numbers_bytes_with_key(
self._get_public_key_bytes(encoding='pem'),
formatting=formatting,
)
def _get_public_key_bytes(self, encoding='der', formatting='encodebytes'):
self.ensure_one()
if self.public_key_id or self.private_key_id:
return (self.public_key_id or self.private_key_id)._get_public_key_bytes(encoding=encoding, formatting=formatting)
# When no keys are set to the certificate, use the self-contained public key from the content
try:
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
public_key = cert.public_key()
except ValueError:
raise UserError(_("The public key from the certificate could not be loaded."))
encoding = serialization.Encoding.DER if encoding == 'der' else serialization.Encoding.PEM
return _get_formatted_value(
public_key.public_bytes(
encoding=encoding,
format=serialization.PublicFormat.SubjectPublicKeyInfo
),
formatting=formatting,
)
def _sign(self, message, hashing_algorithm='sha256', formatting='encodebytes'):
""" Return the base64 encoded signature of message. """
self.ensure_one()
if not self.is_valid:
raise UserError(self.loading_error or _("This certificate is not valid, its validity has expired."))
if not self.private_key_id:
raise UserError(_("No private key linked to the certificate, it is required to sign documents."))
return self.private_key_id._sign(message, hashing_algorithm=hashing_algorithm, formatting=formatting)
|