1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
# -*- coding: utf-8 -*-
import unittest
from ..state.sessionrecord import SessionRecord
from ..ecc.curve import Curve
from ..identitykeypair import IdentityKeyPair, IdentityKey
from ..ratchet.aliceaxolotlparameters import AliceAxolotlParameters
from ..ratchet.bobaxolotlparamaters import BobAxolotlParameters
from ..ratchet.ratchetingsession import RatchetingSession
from ..tests.inmemoryaxolotlstore import InMemoryAxolotlStore
from ..sessioncipher import SessionCipher
from ..protocol.whispermessage import WhisperMessage
class SessionCipherTest(unittest.TestCase):
def test_basicSessionV3(self):
aliceSessionRecord = SessionRecord()
bobSessionRecord = SessionRecord()
self.initializeSessionsV3(aliceSessionRecord.getSessionState(), bobSessionRecord.getSessionState())
self.runInteraction(aliceSessionRecord, bobSessionRecord)
def runInteraction(self, aliceSessionRecord, bobSessionRecord):
aliceStore = InMemoryAxolotlStore()
bobStore = InMemoryAxolotlStore()
aliceStore.storeSession(2, 1, aliceSessionRecord)
bobStore.storeSession(3, 1, bobSessionRecord)
aliceCipher = SessionCipher(aliceStore, aliceStore, aliceStore, aliceStore, 2, 1)
bobCipher = SessionCipher(bobStore, bobStore, bobStore, bobStore, 3, 1)
alicePlaintext = b"This is a plaintext message."
message = aliceCipher.encrypt(alicePlaintext)
bobPlaintext = bobCipher.decryptMsg(WhisperMessage(serialized=message.serialize()))
self.assertEqual(alicePlaintext, bobPlaintext)
bobReply = b"This is a message from Bob."
reply = bobCipher.encrypt(bobReply)
receivedReply = aliceCipher.decryptMsg(WhisperMessage(serialized=reply.serialize()))
self.assertEqual(bobReply, receivedReply)
alicePlaintext = b"ABCDEFGHIJKLMNOP" # ensure padding/unpadding properly applies on message of blocksize length
message = aliceCipher.encrypt(alicePlaintext)
bobPlaintext = bobCipher.decryptMsg(WhisperMessage(serialized=message.serialize()))
self.assertEqual(alicePlaintext, bobPlaintext)
aliceCiphertextMessages = []
alicePlaintextMessages = []
for i in range(0, 50):
alicePlaintextMessages.append(b"aaaaaa %d" % i)
aliceCiphertextMessages.append(aliceCipher.encrypt(b"aaaaaa %d" % i))
# shuffle(aliceCiphertextMessages)
# shuffle(alicePlaintextMessages)
for i in range(0, int(len(aliceCiphertextMessages)/2)):
receivedPlaintext = bobCipher.decryptMsg(WhisperMessage(serialized=aliceCiphertextMessages[i].serialize()))
self.assertEqual(receivedPlaintext, alicePlaintextMessages[i])
def initializeSessionsV3(self, aliceSessionState, bobSessionState):
aliceIdentityKeyPair = Curve.generateKeyPair()
aliceIdentityKey = IdentityKeyPair(IdentityKey(aliceIdentityKeyPair.getPublicKey()),
aliceIdentityKeyPair.getPrivateKey())
aliceBaseKey = Curve.generateKeyPair()
# aliceEphemeralKey = Curve.generateKeyPair()
# alicePreKey = aliceBaseKey
bobIdentityKeyPair = Curve.generateKeyPair()
bobIdentityKey = IdentityKeyPair(IdentityKey(bobIdentityKeyPair.getPublicKey()),
bobIdentityKeyPair.getPrivateKey())
bobBaseKey = Curve.generateKeyPair()
bobEphemeralKey = bobBaseKey
# bobPreKey = Curve.generateKeyPair()
aliceParameters = AliceAxolotlParameters.newBuilder()\
.setOurBaseKey(aliceBaseKey)\
.setOurIdentityKey(aliceIdentityKey)\
.setTheirOneTimePreKey(None)\
.setTheirRatchetKey(bobEphemeralKey.getPublicKey())\
.setTheirSignedPreKey(bobBaseKey.getPublicKey())\
.setTheirIdentityKey(bobIdentityKey.getPublicKey())\
.create()
bobParameters = BobAxolotlParameters.newBuilder()\
.setOurRatchetKey(bobEphemeralKey)\
.setOurSignedPreKey(bobBaseKey)\
.setOurOneTimePreKey(None)\
.setOurIdentityKey(bobIdentityKey)\
.setTheirIdentityKey(aliceIdentityKey.getPublicKey())\
.setTheirBaseKey(aliceBaseKey.getPublicKey())\
.create()
RatchetingSession.initializeSessionAsAlice(aliceSessionState, aliceParameters)
RatchetingSession.initializeSessionAsBob(bobSessionState, bobParameters)
|