File: background_task.py

package info (click to toggle)
mautrix-python 0.20.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,812 kB
  • sloc: python: 19,103; makefile: 16
file content (53 lines) | stat: -rw-r--r-- 1,820 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Copyright (c) 2023 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations

from typing import Coroutine
import asyncio
import logging

_tasks = set()
log = logging.getLogger("mau.background_task")


async def catch(coro: Coroutine, caller: str) -> None:
    try:
        await coro
    except Exception:
        log.exception(f"Uncaught error in background task (created in {caller})")


# Logger.findCaller finds the 3rd stack frame, so add an intermediate function
# to get the caller of create().
def _find_caller() -> tuple[str, int, str, None]:
    return log.findCaller()


def create(coro: Coroutine, *, name: str | None = None, catch_errors: bool = True) -> asyncio.Task:
    """
    Create a background asyncio task safely, ensuring a reference is kept until the task completes.
    It also catches and logs uncaught errors (unless disabled via the parameter).

    Args:
        coro: The coroutine to wrap in a task and execute.
        name: An optional name for the created task.
        catch_errors: Should the task be wrapped in a try-except block to log any uncaught errors?

    Returns:
        An asyncio Task object wrapping the given coroutine.
    """
    if catch_errors:
        try:
            file_name, line_number, function_name, _ = _find_caller()
            caller = f"{function_name} at {file_name}:{line_number}"
        except ValueError:
            caller = "unknown function"
        task = asyncio.create_task(catch(coro, caller), name=name)
    else:
        task = asyncio.create_task(coro, name=name)
    _tasks.add(task)
    task.add_done_callback(_tasks.discard)
    return task