1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
|
import argparse
import asyncio
import json
import os
import pickle
import shutil
import time
import traceback
from typing import Any, Dict, List, Optional, Tuple
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.x448 import X448PrivateKey
from doubleratchet import DoubleRatchet as DR, EncryptedMessage, Header
from doubleratchet.recommended import (
aead_aes_hmac,
diffie_hellman_ratchet_curve448 as dhr448,
HashFunction,
kdf_hkdf,
kdf_separate_hmacs
)
class DoubleRatchet(DR):
"""
An example of a Double Ratchet implementation used in the chat.
"""
@staticmethod
def _build_associated_data(associated_data: bytes, header: Header) -> bytes:
return (
associated_data
+ header.ratchet_pub
+ header.sending_chain_length.to_bytes(8, "big")
+ header.previous_sending_chain_length.to_bytes(8, "big")
)
class DiffieHellmanRatchet(dhr448.DiffieHellmanRatchet):
"""
Use the recommended X448-based Diffie-Hellman ratchet implementation in this example.
"""
class AEAD(aead_aes_hmac.AEAD):
"""
Use the recommended AES/HMAC-based AEAD implementation in this example, with SHA-512 and a fitting info
string.
"""
@staticmethod
def _get_hash_function() -> HashFunction:
return HashFunction.SHA_512
@staticmethod
def _get_info() -> bytes:
return "Double Ratchet Chat AEAD".encode("ASCII")
class RootChainKDF(kdf_hkdf.KDF):
"""
Use the recommended HKDF-based KDF implementation for the root chain in this example, with SHA-512 and a
fitting info string.
"""
@staticmethod
def _get_hash_function() -> HashFunction:
return HashFunction.SHA_512
@staticmethod
def _get_info() -> bytes:
return "Double Ratchet Chat Root Chain KDF".encode("ASCII")
class MessageChainKDF(kdf_separate_hmacs.KDF):
"""
Use the recommended separate HMAC-based KDF implementation for the message chain in this example, with
truncated SHA-512.
"""
@staticmethod
def _get_hash_function() -> HashFunction:
return HashFunction.SHA_512_256
# Configuration of the DoubleRatchet class, which has to be passed to each constructing method
# (encrypt_initial_message, decrypt_initial_message, deserialize).
dr_configuration: Dict[str, Any] = {
"diffie_hellman_ratchet_class": DiffieHellmanRatchet,
"root_chain_kdf": RootChainKDF,
"message_chain_kdf": MessageChainKDF,
"message_chain_constant": b"\x01\x02",
"dos_protection_threshold": 100,
"max_num_skipped_message_keys": 1000,
"aead": AEAD
}
# Prepare the associated data, which is application-defined.
ad = "Alice + Bob".encode("ASCII")
shared_secret = "**32 bytes of very secret data**".encode("ASCII")
async def create_double_ratchets(message: bytes) -> Tuple[DoubleRatchet, DoubleRatchet]:
"""
Create the Double Ratchets for Alice and Bob by encrypting/decrypting an initial message.
Args:
message: The initial message.
Returns:
The Double Ratchets of Alice and Bob.
"""
# In a real application, the key exchange that also yields the shared secret for the session initiation
# probably manages the ratchet key pair.
bob_ratchet_priv = X448PrivateKey.generate()
bob_ratchet_pub = bob_ratchet_priv.public_key()
# Create Alice' Double Ratchet by encrypting the initial message for Bob:
alice_dr, initial_message_encrypted = await DoubleRatchet.encrypt_initial_message(
shared_secret=shared_secret,
recipient_ratchet_pub=bob_ratchet_pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
),
message=message,
associated_data=ad,
**dr_configuration
)
print(f"Alice> {message.decode('UTF-8')}")
# Create Bobs' Double Ratchet by decrypting the initial message from Alice:
bob_dr, initial_message_decrypted = await DoubleRatchet.decrypt_initial_message(
shared_secret=shared_secret,
own_ratchet_priv=bob_ratchet_priv.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
),
message=initial_message_encrypted,
associated_data=ad,
**dr_configuration
)
print(f"Bob< {initial_message_decrypted.decode('UTF-8')}")
# Bob should have decrypted the message Alice sent him
assert message == initial_message_decrypted
return alice_dr, bob_dr
Deferred = Dict[str, List[EncryptedMessage]]
async def loop(alice_dr: DoubleRatchet, bob_dr: DoubleRatchet, deferred: Deferred) -> bool:
"""
The loop logic of this chat example.
Args:
alice_dr: The Double Ratchet of Alice.
bob_dr: The Double Ratchet of Bob.
deferred: The dictionary to hold deferred messages.
Returns:
Whether to quit the chat.
"""
print("a: Send a message from Alice to Bob")
print("b: Send a message from Bob to Alice")
print("da: Send a deferred message from Alice to Bob")
print("db: Send a deferred message from Bob to Alice")
print("q: Quit")
action = input("Action: ")
# (declarations to avoid possibly-used-before-assignment)
sender: str
receiver: str
sender_dr: DoubleRatchet
receiver_dr: DoubleRatchet
if action == "a":
sender = "Alice"
receiver = "Bob"
sender_dr = alice_dr
receiver_dr = bob_dr
if action == "b":
sender = "Bob"
receiver = "Alice"
sender_dr = bob_dr
receiver_dr = alice_dr
if action in [ "a", "b" ]:
# Ask for the message to send
message = input(f"{sender}> ")
# Encrypt the message for the receiver
message_encrypted = await sender_dr.encrypt_message(message.encode("UTF-8"), ad)
while True:
send_or_defer = input("Send the message or save it for later? (s or d): ")
if send_or_defer in ["s", "d"]:
break
if send_or_defer == "s":
# Now the receiver can decrypt the message
message_decrypted = await receiver_dr.decrypt_message(message_encrypted, ad)
print(f"{receiver}< {message_decrypted.decode('UTF-8')}")
if send_or_defer == "d":
deferred[sender].append(message_encrypted)
print("(message saved)")
if action == "da":
sender = "Alice"
receiver = "Bob"
receiver_dr = bob_dr
if action == "db":
sender = "Bob"
receiver = "Alice"
receiver_dr = alice_dr
if action in [ "da", "db" ]:
num_saved_messages = len(deferred[sender])
if num_saved_messages == 0:
print(f"No messages saved from {sender} to {receiver}.")
else:
while True:
message_index = int(input(
f"{num_saved_messages} messages saved. Index of the message to send: "
))
if 0 <= message_index < num_saved_messages:
break
message_encrypted = deferred[sender][message_index]
del deferred[sender][message_index]
# Now the receiver can decrypt the message
message_decrypted = await receiver_dr.decrypt_message(message_encrypted, ad)
print(f"{receiver}< {message_decrypted.decode('UTF-8')}")
return action != "q"
async def main_loop(alice_dr: DoubleRatchet, bob_dr: DoubleRatchet, deferred: Deferred) -> None:
"""
The main loop of this chat example.
Args:
alice_dr: The Double Ratchet of Alice.
bob_dr: The Double Ratchet of Bob.
deferred: The dictionary to hold deferred messages.
"""
while True:
try:
if not await loop(alice_dr, bob_dr, deferred):
break
except BaseException: # pylint: disable=broad-except
print("Exception raised while processing:")
traceback.print_exc()
time.sleep(0.5)
print("")
print("")
async def main() -> None:
"""
The entry point for this chat example. Parses command line args, loads cached data, runs the mainloop and
caches data before quitting.
"""
# https://github.com/PyCQA/pylint/issues/3942
# pylint: disable=no-member
parser = argparse.ArgumentParser(description="Double Ratchet Chat")
parser.add_argument("-i", "--ignore-cache", dest="ignore_cache", action="store_true",
help="ignore the cache completely, neither loading data from the cache nor storing"
" data into the cache")
parser.add_argument("-c", "--clear-cache", dest="clear_cache", action="store_true",
help="clear the cache and quit")
args = parser.parse_args()
storage_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "dr_chat_storage")
if args.clear_cache:
shutil.rmtree(storage_dir)
return
if not args.ignore_cache:
try:
os.mkdir(storage_dir)
except FileExistsError:
pass
alice_dr: Optional[DoubleRatchet] = None
bob_dr: Optional[DoubleRatchet] = None
deferred: Optional[Deferred] = None
if not args.ignore_cache:
try:
with open(os.path.join(storage_dir, "alice_dr.json"), "r", encoding="utf-8") as alice_dr_json:
alice_dr = DoubleRatchet.from_json(json.load(alice_dr_json), **dr_configuration)
with open(os.path.join(storage_dir, "bob_dr.json"), "r", encoding="utf-8") as bob_dr_json:
bob_dr = DoubleRatchet.from_json(json.load(bob_dr_json), **dr_configuration)
with open(os.path.join(storage_dir, "deferred.pickle"), "rb") as deferred_bin:
deferred = pickle.load(deferred_bin)
except OSError:
pass
if alice_dr is None or bob_dr is None or deferred is None:
alice_dr, bob_dr = await create_double_ratchets("(initial message)".encode("UTF-8"))
deferred = { "Alice": [], "Bob": [] }
await main_loop(alice_dr, bob_dr, deferred)
if not args.ignore_cache:
with open(os.path.join(storage_dir, "alice_dr.json"), "w", encoding="utf-8") as alice_dr_json:
json.dump(alice_dr.json, alice_dr_json)
with open(os.path.join(storage_dir, "bob_dr.json"), "w", encoding="utf-8") as bob_dr_json:
json.dump(bob_dr.json, bob_dr_json)
with open(os.path.join(storage_dir, "deferred.pickle"), "wb") as deferred_bin:
pickle.dump(deferred, deferred_bin)
if __name__ == "__main__":
asyncio.run(main())
|