File: utils.py

package info (click to toggle)
graphite-web 1.1.8-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 7,592 kB
  • sloc: javascript: 86,823; python: 11,977; sh: 61; makefile: 50
file content (120 lines) | stat: -rw-r--r-- 4,027 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from __future__ import division

import abc

from graphite.logger import log


class BaseReader(object):
    __metaclass__ = abc.ABCMeta

    supported = True

    @abc.abstractmethod
    def get_intervals(self):
        """Get the supported interval by a reader.

        Returns:
          IntervalSet(): set of supported intervals.
        """

    @abc.abstractmethod
    def fetch(self, startTime, endTime, now=None, requestContext=None):
        """Fetches points for a given interval.

        Args:
          startTime: int
          endTime: int
          now: int
          requestContext: RequestContext

        Returns:
          (time_info, values)
        """


def merge_with_cache(cached_datapoints, start, step, values, func=None, raw_step=None):
    """Merge values with datapoints from a buffer/cache."""
    consolidated = []

    # Similar to the function in render/datalib:TimeSeries
    def consolidate(func, values):
        usable = [v for v in values if v is not None]
        if not usable:
            return None
        if func == 'avg_zero':
            return sum([0 if v is None else v for v in values]) / len(values)
        if func == 'sum':
            return sum(usable)
        if func == 'average':
            return sum(usable) / len(usable)
        if func == 'max':
            return max(usable)
        if func == 'min':
            return min(usable)
        if func == 'last':
            return usable[-1]
        raise Exception("Invalid consolidation function: '%s'" % func)

    # if we have a raw_step, start by taking only the last data point for each interval to match what whisper will do
    if raw_step is not None and raw_step > 1:
        consolidated_dict = {}
        for (timestamp, value) in cached_datapoints:
            interval = timestamp - (timestamp % raw_step)
            consolidated_dict[interval] = value
        cached_datapoints = list(consolidated_dict.items())

    # if we have a consolidation function and the step is not the default interval, consolidate to the requested step
    if func and step != raw_step:
        consolidated_dict = {}
        for (timestamp, value) in cached_datapoints:
            interval = timestamp - (timestamp % step)
            if interval in consolidated_dict:
                consolidated_dict[interval].append(value)
            else:
                consolidated_dict[interval] = [value]
        consolidated = [(i, consolidate(func, consolidated_dict[i])) for i in consolidated_dict]
    # otherwise just use the points
    else:
        consolidated = cached_datapoints

    for (interval, value) in consolidated:
        try:
            i = int(interval - start) // step
            if i < 0:
                # cached data point is earlier then the requested data point.
                # meaning we can definitely ignore the cache result.
                # note that we cannot rely on the 'except'
                # in this case since 'values[-n]='
                # is equivalent to 'values[len(values) - n]='
                continue
            values[i] = value
        except BaseException:
            pass

    return values


def CarbonLink():
    """Return a carbonlink instance."""
    # Late import to avoid pulling out too many dependencies with
    # readers.py which is usually imported by plugins.
    from graphite.carbonlink import CarbonLink
    return CarbonLink()


def merge_with_carbonlink(metric, start, step, values, aggregation_method=None, raw_step=None):
    """Get points from carbonlink and merge them with existing values."""
    cached_datapoints = []
    try:
        cached_datapoints = CarbonLink().query(metric)
    except BaseException:
        log.exception("Failed CarbonLink query '%s'" % metric)
        cached_datapoints = []

    if isinstance(cached_datapoints, dict):
        cached_datapoints = list(cached_datapoints.items())

    return merge_with_cache(
        cached_datapoints, start, step, values,
        func=aggregation_method, raw_step=raw_step)