File: bases.py

package info (click to toggle)
python-aiomeasures 0.5.14-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster
  • size: 556 kB
  • sloc: python: 5,181; makefile: 159
file content (101 lines) | stat: -rw-r--r-- 3,076 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import asyncio
from abc import ABCMeta, abstractmethod
from aiomeasures.checks import Check
from aiomeasures.events import Event
from aiomeasures.metrics import CountingMetric, GaugeMetric
from aiomeasures.metrics import HistogramMetric, SetMetric, TimingMetric
from time import perf_counter


class Client(metaclass=ABCMeta):

    def incr(self, name, value=None, rate=None, tags=None):
        value = abs(value or 1)
        metric = CountingMetric(name, value, rate=rate, tags=tags)
        return self.register(metric)

    def decr(self, name, value=None, rate=None, tags=None):
        value = -abs(value or 1)
        metric = CountingMetric(name, value, rate=rate, tags=tags)
        return self.register(metric)

    def counter(self, name, value, rate=None, tags=None):
        metric = CountingMetric(name, value, rate=rate, tags=tags)
        return self.register(metric)

    def timing(self, name, value=None, rate=None, tags=None):
        metric = TimingMetric(name, value, rate=rate, tags=tags)
        return self.register(metric)

    def timer(self, name, rate=None, tags=None):
        return Timer(client=self, name=name, rate=rate, tags=tags)

    def gauge(self, name, value, rate=None, delta=False):
        metric = GaugeMetric(name, value, rate=rate, delta=delta)
        return self.register(metric)

    def histogram(self, name, value, rate=None, delta=False):
        metric = HistogramMetric(name, value, rate=rate, delta=delta)
        return self.register(metric)

    def set(self, name, value, rate=None, tags=None):
        metric = SetMetric(name, value, rate=rate, tags=tags)
        return self.register(metric)

    def event(self, title, text, **kwargs):
        event = Event(title, text, **kwargs)
        return self.register(event)

    def check(self, name, status, **kwargs):
        check = Check(name, status, **kwargs)
        return self.register(check)

    @abstractmethod
    def format(self, metric, prefix=None):
        raise NotImplementedError()

    @asyncio.coroutine
    @abstractmethod
    def send(self):
        """Sends key/value pairs via UDP or TCP.
        """
        raise NotImplementedError()

    @abstractmethod
    def register(self, metric):
        raise NotImplementedError()

    @abstractmethod
    def close(self):
        raise NotImplementedError()


class Timer:

    def __init__(self, client, name, rate=None, tags=None):
        self.client = client
        self.name = name
        self.rate = rate
        self.tags = tags

    def __call__(self, func):
        def wrapper(*args, **kwargs):
            try:
                self.start()
                return func(*args, **kwargs)
            finally:
                self.stop()
        return wrapper

    def __enter__(self):
        self.start()

    def __exit__(self, type, value, tb):
        self.stop()

    def start(self):
        self._started = perf_counter()

    def stop(self):
        value = int((perf_counter() - self._started) * 1000)
        self.client.timing(self.name, value, rate=self.rate, tags=self.tags)