File: remote.py

package info (click to toggle)
graphite-web 1.1.10-9
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 7,864 kB
  • sloc: javascript: 86,828; python: 12,228; sh: 91; makefile: 54
file content (92 lines) | stat: -rw-r--r-- 3,398 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from traceback import format_exc

from django.conf import settings

from graphite.logger import log
from graphite.readers.utils import BaseReader


class RemoteReader(BaseReader):
    __slots__ = (
        'finder',
        'metric_path',
        'intervals',
        'bulk_query',
    )

    def __init__(self, finder, node_info, bulk_query=None):
        self.finder = finder
        self.metric_path = node_info.get('path') or node_info.get('metric_path')
        self.intervals = node_info.get('intervals', [])
        self.bulk_query = set(bulk_query) if bulk_query else (
            [self.metric_path] if self.metric_path else []
        )

    def __repr__(self):
        return '<RemoteReader[%x]: %s %s>' % (id(self), self.finder.host, ','.join(self.bulk_query))

    def get_intervals(self):
        return self.intervals

    def fetch(self, startTime, endTime, now=None, requestContext=None):
        for series in self.fetch_multi(startTime, endTime, now, requestContext):
            if series['name'] == self.metric_path:
                return (series['time_info'], series['values'])

    def fetch_multi(self, startTime, endTime, now=None, requestContext=None):
        if not self.bulk_query:
            return []

        query_params = [
            ('format', self.finder.params.get('format', 'pickle')),
            ('local', self.finder.params.get('local', '1')),
            ('noCache', '1'),
            ('from', int(startTime)),
            ('until', int(endTime))
        ]

        for target in self.bulk_query:
            query_params.append(('target', target))

        if now is not None:
            query_params.append(('now', int(now)))

        headers = requestContext.get('forwardHeaders') if requestContext else None

        retries = 1  # start counting at one to make log output and settings more readable
        while True:
            try:
                result = self.finder.request(
                    '/render/',
                    fields=query_params,
                    headers=headers,
                    timeout=settings.FETCH_TIMEOUT,
                )
                break
            except Exception:
                if retries >= settings.MAX_FETCH_RETRIES:
                    log.exception("Failed after %s attempts! Root cause:\n%s" %
                                  (settings.MAX_FETCH_RETRIES, format_exc()))
                    raise
                else:
                    log.exception("Got an exception when fetching data! Try: %i of %i. Root cause:\n%s" %
                                  (retries, settings.MAX_FETCH_RETRIES, format_exc()))
                retries += 1

        data = self.finder.deserialize(result)

        try:
            return [
                {
                    'pathExpression': series.get('pathExpression', series['name']),
                    'name': series['name'],
                    'time_info': (series['start'], series['end'], series['step']),
                    'values': series['values'],
                }
                for series in data
            ]
        except Exception as err:
            self.finder.fail()
            log.exception("RemoteReader[%s] Invalid render response from %s: %s" %
                          (self.finder.host, result.url_full, repr(err)))
            raise Exception("Invalid render response from %s: %s" % (result.url_full, repr(err)))