1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
import UserDict
import os
import tempfile
from M2Crypto import BIO, Rand, SMIME, X509
from email import Message
class smimeplus(object):
def __init__(self, cert, privkey, passphrase, cacert, randfile=None):
self.cipher = "aes_256_cbc" # XXX make it configable??
self.setsender(cert, privkey, passphrase)
self.setcacert(cacert)
if randfile is None and os.path.exists("/dev/urandom"):
# Default to /dev/urandom on POSIX-like systems if no file is specified
self.randfile = "/dev/urandom"
else:
self.randfile = randfile
self.__loadrand()
def __passcallback(self, v):
"""private key passphrase callback function"""
return self.passphrase
def __loadrand(self):
"""Load random number file"""
if self.randfile:
# On POSIX-like systems, only load a small amount from /dev/urandom
# to seed the PRNG, unless it's a dedicated file for state saving.
if self.randfile == "/dev/urandom":
# Read -1 bytes (all available), M2Crypto Rand.load_file() reads up to 1024 bytes
# from /dev/urandom by default if max_bytes is -1 (or similar logic).
# To be explicit, we can load a small seed.
Rand.load_file(self.randfile, 1024)
else:
# Load the full user-specified random file
Rand.load_file(self.randfile, -1)
def __saverand(self):
"""Save random number file"""
if self.randfile and self.randfile != "/dev/urandom":
Rand.save_file(self.randfile)
def __gettext(self, msg):
"""Return a string representation of 'msg'"""
_data = ""
if isinstance(msg, Message.Message):
for _p in msg.walk():
_data = _data + _p.as_string()
else:
_data = str(msg)
return _data
def __pack(self, msg):
"""Convert 'msg' to string and put it into an memory buffer for
openssl operation"""
return BIO.MemoryBuffer(self.__gettext(msg))
def setsender(self, cert=None, privkey=None, passphrase=None):
if cert:
self.cert = cert
if privkey:
self.key = privkey
if passphrase:
self.passphrase = passphrase
def setcacert(self, cacert):
self.cacert = cacert
def sign(self, msg):
"""Sign a message"""
_sender = SMIME.SMIME()
_sender.load_key_bio(
self.__pack(self.key),
self.__pack(self.cert),
callback=self.__passcallback,
)
_signed = _sender.sign(self.__pack(msg), SMIME.PKCS7_DETACHED)
_out = self.__pack(None)
_sender.write(_out, _signed, self.__pack(msg))
return _out.read()
def verify(self, smsg, scert):
"""Verify to see if 'smsg' was signed by 'scert', and scert was
issued by cacert of this object. Return message signed if success,
None otherwise"""
# Load signer's cert.
_x509 = X509.load_cert_bio(self.__pack(scert))
_stack = X509.X509_Stack()
_stack.push(_x509)
# Load CA cert directly from the data into a BIO and then the X509_Store.
_ca_bio = self.__pack(self.cacert)
_store = X509.X509_Store()
# load_info_bio is the typical replacement for file-based loading with a BIO.
_store.load_info_bio(_ca_bio)
# prepare SMIME object
_sender = SMIME.SMIME()
_sender.set_x509_stack(_stack)
_sender.set_x509_store(_store)
# Load signed message, verify it, and return result
_p7, _data = SMIME.smime_load_pkcs7_bio(self.__pack(smsg))
try:
# Removed flags=SMIME.PKCS7_SIGNED which erroneously sets PKCS7_NOSIGS
# https://todo.sr.ht/~mcepl/m2crypto/329
return _sender.verify(
_p7, _data, flags=0 # Use flags=0 for standard verification
)
except SMIME.SMIME_Error:
return None
def encrypt(self, rcert, msg):
# Instantiate an SMIME object.
_sender = SMIME.SMIME()
# Load target cert to encrypt to.
_x509 = X509.load_cert_bio(self.__pack(rcert))
_stack = X509.X509_Stack()
_stack.push(_x509)
_sender.set_x509_stack(_stack)
_sender.set_cipher(SMIME.Cipher(self.cipher))
# Encrypt the buffer.
_buf = self.__pack(self.__gettext(msg))
_p7 = _sender.encrypt(_buf)
# Output p7 in mail-friendly format.
_out = self.__pack("")
_sender.write(_out, _p7)
# Save the PRNG's state.
self.__saverand()
return _out.read()
def decrypt(self, emsg):
"""decrypt 'msg'. Return decrypt message if success, None
otherwise"""
# Load private key and cert.
_sender = SMIME.SMIME()
_sender.load_key_bio(
self.__pack(self.key),
self.__pack(self.cert),
callback=self.__passcallback,
)
# Load the encrypted data.
_p7, _data = SMIME.smime_load_pkcs7_bio(self.__pack(emsg))
# Decrypt p7.
try:
return _sender.decrypt(_p7)
except SMIME.SMIME_Error:
return None
def addHeader(self, rcert, content, subject=""):
"""Add To, From, Subject Header to 'content'"""
_scert = X509.load_cert_bio(self.__pack(self.cert))
# Use get_components() to get CN and emailAddress
_scertsubj_data = _scert.get_subject().get_components()
_rcert = X509.load_cert_bio(self.__pack(rcert))
# Use get_components() to get CN and emailAddress directly
_rcertsubj_data = _rcert.get_subject().get_components()
# The data comes from get_components, which returns bytes, so decode them.
_sender_cn = _scertsubj_data.get("CN", b"").decode()
_sender_email = _scertsubj_data.get("emailAddress", b"").decode()
_recipient_cn = _rcertsubj_data.get("CN", b"").decode()
_recipient_email = _rcertsubj_data.get("emailAddress", b"").decode()
_out = f'From: "{_sender_cn}" <{_sender_email}>\n'
_out += f'To: "{_recipient_cn}" <{_recipient_email}>\n'
_out += f"Subject: {subject}\n"
_out += content
return _out
|