1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
# -*- coding: utf-8 -*-
from ..ecc.curve import Curve
from .bobaxolotlparamaters import BobAxolotlParameters
from .aliceaxolotlparameters import AliceAxolotlParameters
from ..kdf.hkdfv3 import HKDFv3
from ..util.byteutil import ByteUtil
from .rootkey import RootKey
from .chainkey import ChainKey
from ..protocol.ciphertextmessage import CiphertextMessage
class RatchetingSession:
@staticmethod
def initializeSession(sessionState, parameters):
"""
:type sessionState: SessionState
:type parameters: SymmetricAxolotlParameters
"""
if RatchetingSession.isAlice(parameters.getOurBaseKey().getPublicKey(), parameters.getTheirBaseKey()):
aliceParameters = AliceAxolotlParameters.newBuilder()
aliceParameters.setOurBaseKey(parameters.getOurBaseKey()) \
.setOurIdentityKey(parameters.getOurIdentityKey()) \
.setTheirRatchetKey(parameters.getTheirRatchetKey()) \
.setTheirIdentityKey(parameters.getTheirIdentityKey()) \
.setTheirSignedPreKey(parameters.getTheirBaseKey()) \
.setTheirOneTimePreKey(None)
RatchetingSession.initializeSessionAsAlice(sessionState, aliceParameters.create())
else:
bobParameters = BobAxolotlParameters.newBuilder()
bobParameters.setOurIdentityKey(parameters.getOurIdentityKey()) \
.setOurRatchetKey(parameters.getOurRatchetKey()) \
.setOurSignedPreKey(parameters.getOurBaseKey()) \
.setOurOneTimePreKey(None) \
.setTheirBaseKey(parameters.getTheirBaseKey()) \
.setTheirIdentityKey(parameters.getTheirIdentityKey())
RatchetingSession.initializeSessionAsBob(sessionState, bobParameters.create())
@staticmethod
def initializeSessionAsAlice(sessionState, parameters):
"""
:type sessionState: SessionState
:type parameters: AliceAxolotlParameters
"""
sessionState.setSessionVersion(CiphertextMessage.CURRENT_VERSION)
sessionState.setRemoteIdentityKey(parameters.getTheirIdentityKey())
sessionState.setLocalIdentityKey(parameters.getOurIdentityKey().getPublicKey())
sendingRatchetKey = Curve.generateKeyPair()
secrets = bytearray()
secrets.extend(RatchetingSession.getDiscontinuityBytes())
secrets.extend(Curve.calculateAgreement(parameters.getTheirSignedPreKey(),
parameters.getOurIdentityKey().getPrivateKey()))
secrets.extend(Curve.calculateAgreement(parameters.getTheirIdentityKey().getPublicKey(),
parameters.getOurBaseKey().getPrivateKey()))
secrets.extend(Curve.calculateAgreement(parameters.getTheirSignedPreKey(),
parameters.getOurBaseKey().getPrivateKey()))
if parameters.getTheirOneTimePreKey() is not None:
secrets.extend(Curve.calculateAgreement(parameters.getTheirOneTimePreKey(),
parameters.getOurBaseKey().getPrivateKey()))
derivedKeys = RatchetingSession.calculateDerivedKeys(secrets)
sendingChain = derivedKeys.getRootKey().createChain(parameters.getTheirRatchetKey(), sendingRatchetKey)
sessionState.addReceiverChain(parameters.getTheirRatchetKey(), derivedKeys.getChainKey())
sessionState.setSenderChain(sendingRatchetKey, sendingChain[1])
sessionState.setRootKey(sendingChain[0])
@staticmethod
def initializeSessionAsBob(sessionState, parameters):
"""
:type sessionState: SessionState
:type parameters: BobAxolotlParameters
"""
sessionState.setSessionVersion(CiphertextMessage.CURRENT_VERSION)
sessionState.setRemoteIdentityKey(parameters.getTheirIdentityKey())
sessionState.setLocalIdentityKey(parameters.getOurIdentityKey().getPublicKey())
secrets = bytearray()
secrets.extend(RatchetingSession.getDiscontinuityBytes())
secrets.extend(Curve.calculateAgreement(parameters.getTheirIdentityKey().getPublicKey(),
parameters.getOurSignedPreKey().getPrivateKey()))
secrets.extend(Curve.calculateAgreement(parameters.getTheirBaseKey(),
parameters.getOurIdentityKey().getPrivateKey()))
secrets.extend(Curve.calculateAgreement(parameters.getTheirBaseKey(),
parameters.getOurSignedPreKey().getPrivateKey()))
if parameters.getOurOneTimePreKey() is not None:
secrets.extend(Curve.calculateAgreement(parameters.getTheirBaseKey(),
parameters.getOurOneTimePreKey().getPrivateKey()))
derivedKeys = RatchetingSession.calculateDerivedKeys(secrets)
sessionState.setSenderChain(parameters.getOurRatchetKey(), derivedKeys.getChainKey())
sessionState.setRootKey(derivedKeys.getRootKey())
@staticmethod
def getDiscontinuityBytes():
return bytearray([0xFF] * 32)
@staticmethod
def calculateDerivedKeys(masterSecret):
kdf = HKDFv3()
derivedSecretBytes = kdf.deriveSecrets(masterSecret, bytearray("WhisperText".encode()), 64)
derivedSecrets = ByteUtil.split(derivedSecretBytes, 32, 32)
return RatchetingSession.DerivedKeys(RootKey(kdf, derivedSecrets[0]),
ChainKey(kdf, derivedSecrets[1], 0))
@staticmethod
def isAlice(ourKey, theirKey):
"""
:type ourKey: ECPublicKey
:type theirKey: ECPublicKey
"""
return ourKey < theirKey
class DerivedKeys:
def __init__(self, rootKey, chainKey):
"""
:type rootKey: RootKey
:type chainKey: ChainKey
"""
self.rootKey = rootKey
self.chainKey = chainKey
def getRootKey(self):
return self.rootKey
def getChainKey(self):
return self.chainKey
|