File: dataframe_client.py

package info (click to toggle)
influxdb-python 5.3.2-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,576 kB
  • sloc: python: 7,274; makefile: 5
file content (177 lines) | stat: -rw-r--r-- 7,291 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# -*- coding: utf-8 -*-
"""DataFrame client for InfluxDB v0.8."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import math
import warnings

from .client import InfluxDBClient


class DataFrameClient(InfluxDBClient):
    """Primary defintion of the DataFrameClient for v0.8.

    The ``DataFrameClient`` object holds information necessary to connect
    to InfluxDB. Requests can be made to InfluxDB directly through the client.
    The client reads and writes from pandas DataFrames.
    """

    def __init__(self, ignore_nan=True, *args, **kwargs):
        """Initialize an instance of the DataFrameClient."""
        super(DataFrameClient, self).__init__(*args, **kwargs)

        try:
            global pd
            import pandas as pd
        except ImportError as ex:
            raise ImportError('DataFrameClient requires Pandas, '
                              '"{ex}" problem importing'.format(ex=str(ex)))

        self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
        self.ignore_nan = ignore_nan

    def write_points(self, data, *args, **kwargs):
        """Write to multiple time series names.

        :param data: A dictionary mapping series names to pandas DataFrames
        :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
            or 'u'.
        :param batch_size: [Optional] Value to write the points in batches
            instead of all at one time. Useful for when doing data dumps from
            one database to another or when doing a massive write operation
        :type batch_size: int
        """
        batch_size = kwargs.get('batch_size')
        time_precision = kwargs.get('time_precision', 's')
        if batch_size:
            kwargs.pop('batch_size')  # don't hand over to InfluxDBClient
            for key, data_frame in data.items():
                number_batches = int(math.ceil(
                    len(data_frame) / float(batch_size)))
                for batch in range(number_batches):
                    start_index = batch * batch_size
                    end_index = (batch + 1) * batch_size
                    outdata = [
                        self._convert_dataframe_to_json(
                            name=key,
                            dataframe=data_frame
                            .iloc[start_index:end_index].copy(),
                            time_precision=time_precision)]
                    InfluxDBClient.write_points(self, outdata, *args, **kwargs)
            return True

        outdata = [
            self._convert_dataframe_to_json(name=key, dataframe=dataframe,
                                            time_precision=time_precision)
            for key, dataframe in data.items()]
        return InfluxDBClient.write_points(self, outdata, *args, **kwargs)

    def write_points_with_precision(self, data, time_precision='s'):
        """Write to multiple time series names.

        DEPRECATED
        """
        warnings.warn(
            "write_points_with_precision is deprecated, and will be removed "
            "in future versions. Please use "
            "``DataFrameClient.write_points(time_precision='..')`` instead.",
            FutureWarning)
        return self.write_points(data, time_precision='s')

    def query(self, query, time_precision='s', chunked=False):
        """Query data into DataFrames.

        Returns a DataFrame for a single time series and a map for multiple
        time series with the time series as value and its name as key.

        :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
            or 'u'.
        :param chunked: [Optional, default=False] True if the data shall be
            retrieved in chunks, False otherwise.
        """
        result = InfluxDBClient.query(self, query=query,
                                      time_precision=time_precision,
                                      chunked=chunked)
        if len(result) == 0:
            return result
        elif len(result) == 1:
            return self._to_dataframe(result[0], time_precision)
        else:
            ret = {}
            for time_series in result:
                ret[time_series['name']] = self._to_dataframe(time_series,
                                                              time_precision)
            return ret

    @staticmethod
    def _to_dataframe(json_result, time_precision):
        dataframe = pd.DataFrame(data=json_result['points'],
                                 columns=json_result['columns'])
        if 'sequence_number' in dataframe.keys():
            dataframe.sort_values(['time', 'sequence_number'], inplace=True)
        else:
            dataframe.sort_values(['time'], inplace=True)

        pandas_time_unit = time_precision
        if time_precision == 'm':
            pandas_time_unit = 'ms'
        elif time_precision == 'u':
            pandas_time_unit = 'us'

        dataframe.index = pd.to_datetime(list(dataframe['time']),
                                         unit=pandas_time_unit,
                                         utc=True)
        del dataframe['time']
        return dataframe

    def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
        if not isinstance(dataframe, pd.DataFrame):
            raise TypeError('Must be DataFrame, but type was: {0}.'
                            .format(type(dataframe)))
        if not (isinstance(dataframe.index, pd.PeriodIndex) or
                isinstance(dataframe.index, pd.DatetimeIndex)):
            raise TypeError('Must be DataFrame with DatetimeIndex or \
                            PeriodIndex.')

        if isinstance(dataframe.index, pd.PeriodIndex):
            dataframe.index = dataframe.index.to_timestamp()
        else:
            dataframe.index = pd.to_datetime(dataframe.index)

        if dataframe.index.tzinfo is None:
            dataframe.index = dataframe.index.tz_localize('UTC')
        dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
                             for dt in dataframe.index]
        data = {'name': name,
                'columns': [str(column) for column in dataframe.columns],
                'points': [self._convert_array(x) for x in dataframe.values]}
        return data

    def _convert_array(self, array):
        try:
            global np
            import numpy as np
        except ImportError as ex:
            raise ImportError('DataFrameClient requires Numpy, '
                              '"{ex}" problem importing'.format(ex=str(ex)))

        if self.ignore_nan:
            number_types = (int, float, np.number)
            condition = (all(isinstance(el, number_types) for el in array) and
                         np.isnan(array))
            return list(np.where(condition, None, array))

        return list(array)

    def _datetime_to_epoch(self, datetime, time_precision='s'):
        seconds = (datetime - self.EPOCH).total_seconds()
        if time_precision == 's':
            return seconds
        elif time_precision == 'm' or time_precision == 'ms':
            return seconds * 1000
        elif time_precision == 'u':
            return seconds * 1000000