File: utils.py

package info (click to toggle)
prometheus-fastapi-instrumentator 7.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 492 kB
  • sloc: python: 2,586; makefile: 5
file content (50 lines) | stat: -rw-r--r-- 1,540 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import os
import shutil

from prometheus_client import REGISTRY


def reset_collectors() -> None:
    """Resets collectors in the default Prometheus registry.

    Modifies the `REGISTRY` registry. Supposed to be called at the beginning
    of individual test functions. Else registry is reused across test functions
    and so we can run into issues like duplicate metrics or unexpected values
    for metrics.
    """

    # Unregister all collectors.
    collectors = list(REGISTRY._collector_to_names.keys())
    print(f"before unregister collectors={collectors}")
    for collector in collectors:
        REGISTRY.unregister(collector)

    # Import default collectors.
    from prometheus_client import gc_collector, platform_collector, process_collector

    # Re-register default collectors.
    process_collector.ProcessCollector()
    platform_collector.PlatformCollector()
    gc_collector.GCCollector()

    print(f"after re-register collectors={list(REGISTRY._collector_to_names.keys())}")


def is_prometheus_multiproc_valid() -> bool:
    """Checks if PROMETHEUS_MULTIPROC_DIR is set and a directory."""

    if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
        pmd = os.environ["PROMETHEUS_MULTIPROC_DIR"]
        if os.path.isdir(pmd):
            return True
    else:
        return False


def delete_dir_content(dirpath):
    for filename in os.listdir(dirpath):
        filepath = os.path.join(dirpath, filename)
        try:
            shutil.rmtree(filepath)
        except OSError:
            os.remove(filepath)