1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Dict, NewType, Optional, Union
from enum import IntEnum
import warnings
from attr import dataclass
from ..primitive import JSON, DeviceID, IdentityKey, SessionID
from ..util import ExtensibleEnum, Obj, Serializable, SerializableAttrs, deserializer, field
from .base import BaseRoomEvent, BaseUnsigned
from .message import RelatesTo
class EncryptionAlgorithm(ExtensibleEnum):
OLM_V1: "EncryptionAlgorithm" = "m.olm.v1.curve25519-aes-sha2"
MEGOLM_V1: "EncryptionAlgorithm" = "m.megolm.v1.aes-sha2"
class EncryptionKeyAlgorithm(ExtensibleEnum):
CURVE25519: "EncryptionKeyAlgorithm" = "curve25519"
ED25519: "EncryptionKeyAlgorithm" = "ed25519"
SIGNED_CURVE25519: "EncryptionKeyAlgorithm" = "signed_curve25519"
@dataclass(frozen=True)
class KeyID(Serializable):
algorithm: EncryptionKeyAlgorithm
key_id: str
def serialize(self) -> JSON:
return str(self)
@classmethod
def deserialize(cls, raw: JSON) -> "KeyID":
assert isinstance(raw, str), "key IDs must be strings"
alg, key_id = raw.split(":", 1)
return cls(EncryptionKeyAlgorithm(alg), key_id)
def __str__(self) -> str:
return f"{self.algorithm.value}:{self.key_id}"
class OlmMsgType(Serializable, IntEnum):
PREKEY = 0
MESSAGE = 1
def serialize(self) -> JSON:
return self.value
@classmethod
def deserialize(cls, raw: JSON) -> "OlmMsgType":
return cls(raw)
@dataclass
class OlmCiphertext(SerializableAttrs):
body: str
type: OlmMsgType
@dataclass
class EncryptedOlmEventContent(SerializableAttrs):
ciphertext: Dict[str, OlmCiphertext]
sender_key: IdentityKey
algorithm: EncryptionAlgorithm = EncryptionAlgorithm.OLM_V1
@dataclass
class EncryptedMegolmEventContent(SerializableAttrs):
"""The content of an m.room.encrypted event"""
ciphertext: str
session_id: SessionID
algorithm: EncryptionAlgorithm = EncryptionAlgorithm.MEGOLM_V1
_sender_key: Optional[IdentityKey] = field(default=None, json="sender_key")
_device_id: Optional[DeviceID] = field(default=None, json="device_id")
_relates_to: Optional[RelatesTo] = field(default=None, json="m.relates_to")
@property
def sender_key(self) -> Optional[IdentityKey]:
"""
.. deprecated:: 0.17.0
Matrix v1.3 deprecated the device_id and sender_key fields in megolm events.
"""
warnings.warn(
"The sender_key field in Megolm events was deprecated in Matrix 1.3",
DeprecationWarning,
)
return self._sender_key
@property
def device_id(self) -> Optional[DeviceID]:
"""
.. deprecated:: 0.17.0
Matrix v1.3 deprecated the device_id and sender_key fields in megolm events.
"""
warnings.warn(
"The sender_key field in Megolm events was deprecated in Matrix 1.3",
DeprecationWarning,
)
return self._device_id
@property
def relates_to(self) -> RelatesTo:
if self._relates_to is None:
self._relates_to = RelatesTo()
return self._relates_to
@relates_to.setter
def relates_to(self, relates_to: RelatesTo) -> None:
self._relates_to = relates_to
EncryptedEventContent = NewType(
"EncryptedEventContent", Union[EncryptedOlmEventContent, EncryptedMegolmEventContent]
)
@deserializer(EncryptedEventContent)
def deserialize_encrypted(data: JSON) -> Union[EncryptedEventContent, Obj]:
alg = data.get("algorithm", None)
if alg == EncryptionAlgorithm.MEGOLM_V1.value:
return EncryptedMegolmEventContent.deserialize(data)
elif alg == EncryptionAlgorithm.OLM_V1.value:
return EncryptedOlmEventContent.deserialize(data)
return Obj(**data)
setattr(EncryptedEventContent, "deserialize", deserialize_encrypted)
@dataclass
class EncryptedEvent(BaseRoomEvent, SerializableAttrs):
"""A m.room.encrypted event"""
content: EncryptedEventContent
_unsigned: Optional[BaseUnsigned] = field(default=None, json="unsigned")
@property
def unsigned(self) -> BaseUnsigned:
if not self._unsigned:
self._unsigned = BaseUnsigned()
return self._unsigned
@unsigned.setter
def unsigned(self, value: BaseUnsigned) -> None:
self._unsigned = value
|