File: resultset.py

package info (click to toggle)
influxdb-python 5.3.2-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,576 kB
  • sloc: python: 7,274; makefile: 5
file content (206 lines) | stat: -rw-r--r-- 6,756 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# -*- coding: utf-8 -*-
"""Module to prepare the resultset."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import warnings

from influxdb.exceptions import InfluxDBClientError

_sentinel = object()


class ResultSet(object):
    """A wrapper around a single InfluxDB query result."""

    def __init__(self, series, raise_errors=True):
        """Initialize the ResultSet."""
        self._raw = series
        self._error = self._raw.get('error', None)

        if self.error is not None and raise_errors is True:
            raise InfluxDBClientError(self.error)

    @property
    def raw(self):
        """Raw JSON from InfluxDB."""
        return self._raw

    @raw.setter
    def raw(self, value):
        self._raw = value

    @property
    def error(self):
        """Error returned by InfluxDB."""
        return self._error

    def __getitem__(self, key):
        """Retrieve the series name or specific set based on key.

        :param key: Either a series name, or a tags_dict, or
                    a 2-tuple(series_name, tags_dict).
                    If the series name is None (or not given) then any serie
                    matching the eventual given tags will be given its points
                    one after the other.
                    To get the points of every series in this resultset then
                    you have to provide None as key.
        :return: A generator yielding `Point`s matching the given key.
        NB:
        The order in which the points are yielded is actually undefined but
        it might change..
        """
        warnings.warn(
            ("ResultSet's ``__getitem__`` method will be deprecated. Use"
             "``get_points`` instead."),
            DeprecationWarning
        )

        if isinstance(key, tuple):
            if len(key) != 2:
                raise TypeError('only 2-tuples allowed')

            name = key[0]
            tags = key[1]

            if not isinstance(tags, dict) and tags is not None:
                raise TypeError('tags should be a dict')
        elif isinstance(key, dict):
            name = None
            tags = key
        else:
            name = key
            tags = None

        return self.get_points(name, tags)

    def get_points(self, measurement=None, tags=None):
        """Return a generator for all the points that match the given filters.

        :param measurement: The measurement name
        :type measurement: str

        :param tags: Tags to look for
        :type tags: dict

        :return: Points generator
        """
        # Raise error if measurement is not str or bytes
        if not isinstance(measurement,
                          (bytes, type(b''.decode()), type(None))):
            raise TypeError('measurement must be an str or None')

        for series in self._get_series():
            series_name = series.get('measurement',
                                     series.get('name', 'results'))
            if series_name is None:
                # this is a "system" query or a query which
                # doesn't return a name attribute.
                # like 'show retention policies' ..
                if tags is None:
                    for item in self._get_points_for_series(series):
                        yield item

            elif measurement in (None, series_name):
                # by default if no tags was provided then
                # we will matches every returned series
                series_tags = series.get('tags', {})
                for item in self._get_points_for_series(series):
                    if tags is None or \
                            self._tag_matches(item, tags) or \
                            self._tag_matches(series_tags, tags):
                        yield item

    def __repr__(self):
        """Representation of ResultSet object."""
        items = []

        for item in self.items():
            items.append("'%s': %s" % (item[0], list(item[1])))

        return "ResultSet({%s})" % ", ".join(items)

    def __iter__(self):
        """Yield one dict instance per series result."""
        for key in self.keys():
            yield list(self.__getitem__(key))

    @staticmethod
    def _tag_matches(tags, filter):
        """Check if all key/values in filter match in tags."""
        for tag_name, tag_value in filter.items():
            # using _sentinel as I'm not sure that "None"
            # could be used, because it could be a valid
            # series_tags value : when a series has no such tag
            # then I think it's set to /null/None/.. TBC..
            series_tag_value = tags.get(tag_name, _sentinel)
            if series_tag_value != tag_value:
                return False

        return True

    def _get_series(self):
        """Return all series."""
        return self.raw.get('series', [])

    def __len__(self):
        """Return the len of the keys in the ResultSet."""
        return len(self.keys())

    def keys(self):
        """Return the list of keys in the ResultSet.

        :return: List of keys. Keys are tuples (series_name, tags)
        """
        keys = []
        for series in self._get_series():
            keys.append(
                (series.get('measurement',
                            series.get('name', 'results')),
                 series.get('tags', None))
            )
        return keys

    def items(self):
        """Return the set of items from the ResultSet.

        :return: List of tuples, (key, generator)
        """
        items = []
        for series in self._get_series():
            series_key = (series.get('measurement',
                                     series.get('name', 'results')),
                          series.get('tags', None))
            items.append(
                (series_key, self._get_points_for_series(series))
            )
        return items

    def _get_points_for_series(self, series):
        """Return generator of dict from columns and values of a series.

        :param series: One series
        :return: Generator of dicts
        """
        for point in series.get('values', []):
            yield self.point_from_cols_vals(
                series['columns'],
                point
            )

    @staticmethod
    def point_from_cols_vals(cols, vals):
        """Create a dict from columns and values lists.

        :param cols: List of columns
        :param vals: List of values
        :return: Dict where keys are columns.
        """
        point = {}
        for col_index, col_name in enumerate(cols):
            point[col_name] = vals[col_index]

        return point