File: kdf_chain.py

package info (click to toggle)
python-doubleratchet 1.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 476 kB
  • sloc: python: 2,152; makefile: 16
file content (136 lines) | stat: -rw-r--r-- 3,923 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from __future__ import annotations

import json
from typing import Type, TypeVar, cast

from .kdf import KDF
from .migrations import parse_kdf_chain_model
from .models import KDFChainModel
from .types import JSONObject


__all__ = [
    "KDFChain"
]


KDFChainTypeT = TypeVar("KDFChainTypeT", bound="KDFChain")


class KDFChain:
    """
    The term KDF chain is used when some of the output from a KDF is used as an output key and some is used to
    replace the KDF key, which can then be used with another input.

    https://signal.org/docs/specifications/doubleratchet/#kdf-chains
    """

    def __init__(self) -> None:
        # Just the type definitions here
        self.__kdf: Type[KDF]
        self.__key: bytes
        self.__length: int

    @classmethod
    def create(cls: Type[KDFChainTypeT], kdf: Type[KDF], key: bytes) -> KDFChainTypeT:
        """
        Args:
            kdf: The KDF to use for the derivation step.
            key: The initial chain key.

        Returns:
            A configured instance of :class:`KDFChain`.
        """

        self = cls()
        self.__kdf = kdf
        self.__key = key
        self.__length = 0

        return self

    @property
    def model(self) -> KDFChainModel:
        """
        Returns:
            The internal state of this :class:`KDFChain` as a pydantic model.
        """

        return KDFChainModel(length=self.__length, key=self.__key)

    @property
    def json(self) -> JSONObject:
        """
        Returns:
            The internal state of this :class:`KDFChain` as a JSON-serializable Python object.
        """

        return cast(JSONObject, json.loads(self.model.model_dump_json()))

    @classmethod
    def from_model(cls: Type[KDFChainTypeT], model: KDFChainModel, kdf: Type[KDF]) -> KDFChainTypeT:
        """
        Args:
            model: The pydantic model holding the internal state of a :class:`KDFChain`, as produced by
                :attr:`model`.
            kdf: The KDF to use for the derivation step.

        Returns:
            A configured instance of :class:`KDFChain`, with internal state restored from the model.

        Warning:
            Migrations are not provided via the :attr:`model`/:meth:`from_model` API. Use
            :attr:`json`/:meth:`from_json` instead. Refer to :ref:`serialization_and_migration` in the
            documentation for details.
        """

        self = cls()
        self.__kdf = kdf
        self.__key = model.key
        self.__length = model.length

        return self

    @classmethod
    def from_json(cls: Type[KDFChainTypeT], serialized: JSONObject, kdf: Type[KDF]) -> KDFChainTypeT:
        """
        Args:
            serialized: A JSON-serializable Python object holding the internal state of a :class:`KDFChain`,
                as produced by :attr:`json`.
            kdf: The KDF to use for the derivation step.

        Returns:
            A configured instance of :class:`KDFChain`, with internal state restored from the serialized data.
        """

        return cls.from_model(parse_kdf_chain_model(serialized), kdf)

    async def step(self, data: bytes, length: int) -> bytes:
        """
        Perform a ratchet step of this KDF chain.

        Args:
            data: The input data.
            length: The desired size of the output data, in bytes.

        Returns:
            ``length`` bytes of output data, derived from the internal KDF key and the input data.
        """

        key_length = len(self.__key)

        output_data = await self.__kdf.derive(self.__key, data, key_length + length)

        self.__length += 1
        self.__key = output_data[:key_length]

        return output_data[key_length:]

    @property
    def length(self) -> int:
        """
        Returns:
            The length of this KDF chain, i.e. the number of steps that have been performed.
        """

        return self.__length