File: diagnostics.py

package info (click to toggle)
rtsp-to-webrtc 0.6.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 240 kB
  • sloc: python: 1,126; makefile: 7; sh: 5
file content (46 lines) | stat: -rw-r--r-- 1,233 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""Diagnostics for debugging."""

from __future__ import annotations
from collections import Counter
from collections.abc import Mapping
from typing import Any


class Diagnostics:
    """Information for RTSP to Web Client libraries."""

    def __init__(self) -> None:
        """Initialize Diagnostics."""
        self._counter: Counter = Counter()

    def increment(self, key: str) -> None:
        """Increment a counter for the specified key/event."""
        self._counter.update(Counter({key: 1}))

    def as_dict(self) -> Mapping[str, Any]:
        """Return diagnostics as a debug dictionary."""
        return {k: self._counter[k] for k in self._counter}

    def reset(self) -> None:
        """Clear all diagnostics, for testing."""
        self._counter = Counter()

DISCOVERY_DIAGNOSTICS = Diagnostics()
WEB_DIAGNOSTICS = Diagnostics()
WEBRTC_DIAGNOSTICS = Diagnostics()

MAP = {
    "discovery": DISCOVERY_DIAGNOSTICS,
    "web": WEB_DIAGNOSTICS,
    "webrtc": WEBRTC_DIAGNOSTICS,
}


def reset() -> None:
    """Clear all diagnostics, for testing."""
    for diagnostics in MAP.values():
        diagnostics.reset()


def get_diagnostics() -> dict[str, Any]:
    return { k: v.as_dict() for (k, v) in MAP.items() }