File: utils.py

package info (click to toggle)
python-django-tasks 0.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 256 kB
  • sloc: python: 1,703; sh: 5; makefile: 4
file content (55 lines) | stat: -rw-r--r-- 1,642 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import inspect
from collections.abc import Callable, Mapping, Sequence
from traceback import format_exception
from typing import Any, TypeVar

from django.utils.crypto import get_random_string
from typing_extensions import ParamSpec

T = TypeVar("T")
P = ParamSpec("P")


def is_module_level_function(func: Callable) -> bool:
    if not inspect.isfunction(func) or inspect.isbuiltin(func):
        return False

    if "<locals>" in func.__qualname__:
        return False

    return True


def normalize_json(obj: Any) -> Any:
    """Recursively normalize an object into JSON-compatible types."""
    match obj:
        case Mapping():
            return {normalize_json(k): normalize_json(v) for k, v in obj.items()}
        case bytes():
            try:
                return obj.decode("utf-8")
            except UnicodeDecodeError as e:
                raise ValueError(f"Unsupported value: {type(obj)}") from e
        case str() | int() | float() | bool() | None:
            return obj
        case Sequence():  # str and bytes were already handled.
            return [normalize_json(v) for v in obj]
        case _:  # Other types can't be serialized to JSON
            raise TypeError(f"Unsupported type: {type(obj)}")


def get_module_path(val: Any) -> str:
    return f"{val.__module__}.{val.__qualname__}"


def get_exception_traceback(exc: BaseException) -> str:
    return "".join(format_exception(exc))


def get_random_id() -> str:
    """
    Return a random string for use as a Task or worker id.

    Whilst 64 characters is the max, just use 32 as a sensible middle-ground.
    """
    return get_random_string(32)