File: media_repository_callbacks.py

package info (click to toggle)
matrix-synapse 1.146.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 79,992 kB
  • sloc: python: 261,671; javascript: 7,230; sql: 4,758; sh: 1,302; perl: 626; makefile: 207
file content (157 lines) | stat: -rw-r--r-- 6,000 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#

import logging
from typing import TYPE_CHECKING, Awaitable, Callable

from synapse.config.repository import MediaUploadLimit
from synapse.types import JsonDict
from synapse.util.async_helpers import delay_cancellation
from synapse.util.metrics import Measure

if TYPE_CHECKING:
    from synapse.server import HomeServer

logger = logging.getLogger(__name__)

GET_MEDIA_CONFIG_FOR_USER_CALLBACK = Callable[[str], Awaitable[JsonDict | None]]

IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK = Callable[[str, int], Awaitable[bool]]

GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK = Callable[
    [str], Awaitable[list[MediaUploadLimit] | None]
]

ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK = Callable[
    [str, MediaUploadLimit, int, int], Awaitable[None]
]


class MediaRepositoryModuleApiCallbacks:
    def __init__(self, hs: "HomeServer") -> None:
        self.server_name = hs.hostname
        self.clock = hs.get_clock()
        self._get_media_config_for_user_callbacks: list[
            GET_MEDIA_CONFIG_FOR_USER_CALLBACK
        ] = []
        self._is_user_allowed_to_upload_media_of_size_callbacks: list[
            IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK
        ] = []
        self._get_media_upload_limits_for_user_callbacks: list[
            GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK
        ] = []
        self._on_media_upload_limit_exceeded_callbacks: list[
            ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK
        ] = []

    def register_callbacks(
        self,
        get_media_config_for_user: GET_MEDIA_CONFIG_FOR_USER_CALLBACK | None = None,
        is_user_allowed_to_upload_media_of_size: IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK
        | None = None,
        get_media_upload_limits_for_user: GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK
        | None = None,
        on_media_upload_limit_exceeded: ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK
        | None = None,
    ) -> None:
        """Register callbacks from module for each hook."""
        if get_media_config_for_user is not None:
            self._get_media_config_for_user_callbacks.append(get_media_config_for_user)

        if is_user_allowed_to_upload_media_of_size is not None:
            self._is_user_allowed_to_upload_media_of_size_callbacks.append(
                is_user_allowed_to_upload_media_of_size
            )

        if get_media_upload_limits_for_user is not None:
            self._get_media_upload_limits_for_user_callbacks.append(
                get_media_upload_limits_for_user
            )

        if on_media_upload_limit_exceeded is not None:
            self._on_media_upload_limit_exceeded_callbacks.append(
                on_media_upload_limit_exceeded
            )

    async def get_media_config_for_user(self, user_id: str) -> JsonDict | None:
        for callback in self._get_media_config_for_user_callbacks:
            with Measure(
                self.clock,
                name=f"{callback.__module__}.{callback.__qualname__}",
                server_name=self.server_name,
            ):
                res: JsonDict | None = await delay_cancellation(callback(user_id))
            if res:
                return res

        return None

    async def is_user_allowed_to_upload_media_of_size(
        self, user_id: str, size: int
    ) -> bool:
        for callback in self._is_user_allowed_to_upload_media_of_size_callbacks:
            with Measure(
                self.clock,
                name=f"{callback.__module__}.{callback.__qualname__}",
                server_name=self.server_name,
            ):
                res: bool = await delay_cancellation(callback(user_id, size))
            if not res:
                return res

        return True

    async def get_media_upload_limits_for_user(
        self, user_id: str
    ) -> list[MediaUploadLimit] | None:
        """
        Get the first non-None list of MediaUploadLimits for the user from the registered callbacks.
        If a list is returned it will be sorted in descending order of duration.
        """
        for callback in self._get_media_upload_limits_for_user_callbacks:
            with Measure(
                self.clock,
                name=f"{callback.__module__}.{callback.__qualname__}",
                server_name=self.server_name,
            ):
                res: list[MediaUploadLimit] | None = await delay_cancellation(
                    callback(user_id)
                )
            if res is not None:  # to allow [] to be returned meaning no limit
                # We sort them in descending order of time period
                res.sort(key=lambda limit: limit.time_period_ms, reverse=True)
                return res

        return None

    async def on_media_upload_limit_exceeded(
        self,
        user_id: str,
        limit: MediaUploadLimit,
        sent_bytes: int,
        attempted_bytes: int,
    ) -> None:
        for callback in self._on_media_upload_limit_exceeded_callbacks:
            with Measure(
                self.clock,
                name=f"{callback.__module__}.{callback.__qualname__}",
                server_name=self.server_name,
            ):
                # Use a copy of the data in case the module modifies it
                limit_copy = MediaUploadLimit(
                    max_bytes=limit.max_bytes, time_period_ms=limit.time_period_ms
                )
                await delay_cancellation(
                    callback(user_id, limit_copy, sent_bytes, attempted_bytes)
                )