File: background_queue.py

package info (click to toggle)
matrix-synapse 1.143.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 79,852 kB
  • sloc: python: 258,912; javascript: 7,330; sql: 4,733; sh: 1,281; perl: 626; makefile: 207
file content (142 lines) | stat: -rw-r--r-- 5,130 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 Element Creations Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
#

import collections
import logging
from typing import (
    TYPE_CHECKING,
    Awaitable,
    Callable,
    Generic,
    TypeVar,
)

from twisted.internet import defer

from synapse.util.async_helpers import DeferredEvent
from synapse.util.constants import MILLISECONDS_PER_SECOND

if TYPE_CHECKING:
    from synapse.server import HomeServer

logger = logging.getLogger(__name__)

T = TypeVar("T")


class BackgroundQueue(Generic[T]):
    """A single-producer single-consumer async queue processing items in the
    background.

    This is optimised for the case where we receive many items, but processing
    each one takes a short amount of time. In this case we don't want to pay the
    overhead of a new background process each time. Instead, we spawn a
    background process that will wait for new items to arrive.

    If the background process has been idle for a while, it will exit, and a new
    background process will be spawned when new items arrive.

    Args:
        hs: The homeserver.
        name: The name of the background process.
        callback: The async callback to process each item.
        timeout_ms: The time in milliseconds to wait for new items before
            exiting the background process.
    """

    def __init__(
        self,
        hs: "HomeServer",
        name: str,
        callback: Callable[[T], Awaitable[None]],
        timeout_ms: int = 1000,
    ) -> None:
        self._hs = hs
        self._name = name
        self._callback = callback
        self._timeout_ms = timeout_ms

        # The queue of items to process.
        self._queue: collections.deque[T] = collections.deque()

        # Indicates if a background process is running, and if so whether there
        # is new data in the queue. Used to signal to an existing background
        # process that there is new data added to the queue.
        self._wakeup_event: DeferredEvent | None = None

    def add(self, item: T) -> None:
        """Add an item into the queue."""

        self._queue.append(item)
        if self._wakeup_event is None:
            self._hs.run_as_background_process(self._name, self._process_queue)
        else:
            self._wakeup_event.set()

    async def _process_queue(self) -> None:
        """Process items in the queue until it is empty."""

        # Make sure we're the only background process.
        if self._wakeup_event is not None:
            # If there is already a background process then we signal it to wake
            # up and exit. We do not want multiple background processes running
            # at a time.
            self._wakeup_event.set()
            return

        self._wakeup_event = DeferredEvent(self._hs.get_clock())

        try:
            while True:
                # Clear the event before checking the queue. If we cleared after
                # we run the risk of the wakeup signal racing with us checking
                # the queue. (This can't really happen in Python due to the
                # single threaded nature, but let's be a bit defensive anyway.)
                self._wakeup_event.clear()

                while self._queue:
                    item = self._queue.popleft()
                    try:
                        await self._callback(item)
                    except defer.CancelledError:
                        raise
                    except Exception:
                        logger.exception("Error processing background queue item")

                # Wait for new data to arrive, timing out after a while to avoid
                # keeping the background process alive forever.
                #
                # New data may have arrived and been processed while we were
                # pulling from the queue, so this may return that there is new
                # data immediately even though there isn't. That's fine, we'll
                # just loop round, clear the event, recheck the queue, and then
                # wait here again.
                new_data = await self._wakeup_event.wait(
                    timeout_seconds=self._timeout_ms / MILLISECONDS_PER_SECOND
                )
                if not new_data:
                    # Timed out waiting for new data, so exit the loop
                    break
        finally:
            # This background process is exiting, so clear the wakeup event to
            # indicate that a new one should be started when new data arrives.
            self._wakeup_event = None

            # The queue must be empty here.
            assert not self._queue

    def __len__(self) -> int:
        return len(self._queue)