File: events.py

package info (click to toggle)
matrix-synapse 1.146.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 79,992 kB
  • sloc: python: 261,671; javascript: 7,230; sql: 4,758; sh: 1,302; perl: 626; makefile: 207
file content (69 lines) | stat: -rw-r--r-- 2,119 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from http import HTTPStatus
from typing import TYPE_CHECKING

from synapse.api.errors import NotFoundError
from synapse.events.utils import (
    SerializeEventConfig,
    format_event_raw,
    serialize_event,
)
from synapse.http.servlet import RestServlet
from synapse.http.site import SynapseRequest
from synapse.rest.admin import admin_patterns
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import JsonDict

if TYPE_CHECKING:
    from synapse.server import HomeServer


class EventRestServlet(RestServlet):
    """
    Get an event that is known to the homeserver.
    The requester must have administrator access in Synapse.

    GET /_synapse/admin/v1/fetch_event/<event_id>
    returns:
        200 OK with event json if the event is known to the homeserver. Otherwise raises
        a NotFound error.

    Args:
        event_id: the id of the requested event.
    Returns:
        JSON blob of the event
    """

    PATTERNS = admin_patterns("/fetch_event/(?P<event_id>[^/]*)$")

    def __init__(self, hs: "HomeServer"):
        self._auth = hs.get_auth()
        self._store = hs.get_datastores().main
        self._clock = hs.get_clock()

    async def on_GET(
        self, request: SynapseRequest, event_id: str
    ) -> tuple[int, JsonDict]:
        requester = await self._auth.get_user_by_req(request)
        await assert_user_is_admin(self._auth, requester)

        event = await self._store.get_event(
            event_id,
            EventRedactBehaviour.as_is,
            allow_none=True,
        )

        if event is None:
            raise NotFoundError("Event not found")

        config = SerializeEventConfig(
            as_client_event=False,
            event_format=format_event_raw,
            requester=requester,
            only_event_fields=None,
            include_stripped_room_state=True,
            include_admin_metadata=True,
        )
        res = {"event": serialize_event(event, self._clock.time_msec(), config=config)}

        return HTTPStatus.OK, res