File: units.py

package info (click to toggle)
matrix-synapse 1.146.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 79,996 kB
  • sloc: python: 261,671; javascript: 7,230; sql: 4,758; sh: 1,302; perl: 626; makefile: 207
file content (133 lines) | stat: -rw-r--r-- 3,827 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2014-2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#

"""Defines the JSON structure of the protocol units used by the server to
server protocol.
"""

import logging
from typing import Sequence

import attr

from synapse.api.constants import CANONICALJSON_MAX_INT, CANONICALJSON_MIN_INT
from synapse.events import EventBase
from synapse.types import JsonDict

logger = logging.getLogger(__name__)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class Edu:
    """An Edu represents a piece of data sent from one homeserver to another.

    In comparison to Pdus, Edus are not persisted for a long time on disk, are
    not meaningful beyond a given pair of homeservers, and don't have an
    internal ID or previous references graph.
    """

    edu_type: str
    content: dict
    origin: str
    destination: str

    def get_dict(self) -> JsonDict:
        return {
            "edu_type": self.edu_type,
            "content": self.content,
        }

    def get_internal_dict(self) -> JsonDict:
        return {
            "edu_type": self.edu_type,
            "content": self.content,
            "origin": self.origin,
            "destination": self.destination,
        }

    def get_context(self) -> str:
        return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")

    def strip_context(self) -> None:
        getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"


def _none_to_list(edus: list[JsonDict] | None) -> list[JsonDict]:
    if edus is None:
        return []
    return edus


@attr.s(slots=True, frozen=True, auto_attribs=True)
class Transaction:
    """A transaction is a list of Pdus and Edus to be sent to a remote home
    server with some extra metadata.

    Example transaction::

        {
            "origin": "foo",
            "prev_ids": ["abc", "def"],
            "pdus": [
                ...
            ],
        }

    """

    # Required keys.
    transaction_id: str
    origin: str
    destination: str
    origin_server_ts: int
    pdus: list[JsonDict] = attr.ib(factory=list, converter=_none_to_list)
    edus: list[JsonDict] = attr.ib(factory=list, converter=_none_to_list)

    def get_dict(self) -> JsonDict:
        """A JSON-ready dictionary of valid keys which aren't internal."""
        result = {
            "origin": self.origin,
            "origin_server_ts": self.origin_server_ts,
            "pdus": filter_pdus_for_valid_depth(self.pdus),
        }
        if self.edus:
            result["edus"] = self.edus
        return result


def filter_pdus_for_valid_depth(pdus: Sequence[JsonDict]) -> list[JsonDict]:
    filtered_pdus = []
    for pdu in pdus:
        # Drop PDUs that have a depth that is outside of the range allowed
        # by canonical json.
        if (
            "depth" in pdu
            and CANONICALJSON_MIN_INT <= pdu["depth"] <= CANONICALJSON_MAX_INT
        ):
            filtered_pdus.append(pdu)

    return filtered_pdus


def serialize_and_filter_pdus(
    pdus: Sequence[EventBase], time_now: int | None = None
) -> list[JsonDict]:
    return filter_pdus_for_valid_depth([pdu.get_pdu_json(time_now) for pdu in pdus])