1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
# -*- coding: utf-8 -*-
"""DataFrame client for InfluxDB v0.8."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import math
import warnings
from .client import InfluxDBClient
class DataFrameClient(InfluxDBClient):
"""Primary defintion of the DataFrameClient for v0.8.
The ``DataFrameClient`` object holds information necessary to connect
to InfluxDB. Requests can be made to InfluxDB directly through the client.
The client reads and writes from pandas DataFrames.
"""
def __init__(self, ignore_nan=True, *args, **kwargs):
"""Initialize an instance of the DataFrameClient."""
super(DataFrameClient, self).__init__(*args, **kwargs)
try:
global pd
import pandas as pd
except ImportError as ex:
raise ImportError('DataFrameClient requires Pandas, '
'"{ex}" problem importing'.format(ex=str(ex)))
self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
self.ignore_nan = ignore_nan
def write_points(self, data, *args, **kwargs):
"""Write to multiple time series names.
:param data: A dictionary mapping series names to pandas DataFrames
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
or 'u'.
:param batch_size: [Optional] Value to write the points in batches
instead of all at one time. Useful for when doing data dumps from
one database to another or when doing a massive write operation
:type batch_size: int
"""
batch_size = kwargs.get('batch_size')
time_precision = kwargs.get('time_precision', 's')
if batch_size:
kwargs.pop('batch_size') # don't hand over to InfluxDBClient
for key, data_frame in data.items():
number_batches = int(math.ceil(
len(data_frame) / float(batch_size)))
for batch in range(number_batches):
start_index = batch * batch_size
end_index = (batch + 1) * batch_size
outdata = [
self._convert_dataframe_to_json(
name=key,
dataframe=data_frame
.iloc[start_index:end_index].copy(),
time_precision=time_precision)]
InfluxDBClient.write_points(self, outdata, *args, **kwargs)
return True
outdata = [
self._convert_dataframe_to_json(name=key, dataframe=dataframe,
time_precision=time_precision)
for key, dataframe in data.items()]
return InfluxDBClient.write_points(self, outdata, *args, **kwargs)
def write_points_with_precision(self, data, time_precision='s'):
"""Write to multiple time series names.
DEPRECATED
"""
warnings.warn(
"write_points_with_precision is deprecated, and will be removed "
"in future versions. Please use "
"``DataFrameClient.write_points(time_precision='..')`` instead.",
FutureWarning)
return self.write_points(data, time_precision='s')
def query(self, query, time_precision='s', chunked=False):
"""Query data into DataFrames.
Returns a DataFrame for a single time series and a map for multiple
time series with the time series as value and its name as key.
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
or 'u'.
:param chunked: [Optional, default=False] True if the data shall be
retrieved in chunks, False otherwise.
"""
result = InfluxDBClient.query(self, query=query,
time_precision=time_precision,
chunked=chunked)
if len(result) == 0:
return result
elif len(result) == 1:
return self._to_dataframe(result[0], time_precision)
else:
ret = {}
for time_series in result:
ret[time_series['name']] = self._to_dataframe(time_series,
time_precision)
return ret
@staticmethod
def _to_dataframe(json_result, time_precision):
dataframe = pd.DataFrame(data=json_result['points'],
columns=json_result['columns'])
if 'sequence_number' in dataframe.keys():
dataframe.sort_values(['time', 'sequence_number'], inplace=True)
else:
dataframe.sort_values(['time'], inplace=True)
pandas_time_unit = time_precision
if time_precision == 'm':
pandas_time_unit = 'ms'
elif time_precision == 'u':
pandas_time_unit = 'us'
dataframe.index = pd.to_datetime(list(dataframe['time']),
unit=pandas_time_unit,
utc=True)
del dataframe['time']
return dataframe
def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
if not isinstance(dataframe, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(dataframe)))
if not (isinstance(dataframe.index, pd.PeriodIndex) or
isinstance(dataframe.index, pd.DatetimeIndex)):
raise TypeError('Must be DataFrame with DatetimeIndex or \
PeriodIndex.')
if isinstance(dataframe.index, pd.PeriodIndex):
dataframe.index = dataframe.index.to_timestamp()
else:
dataframe.index = pd.to_datetime(dataframe.index)
if dataframe.index.tzinfo is None:
dataframe.index = dataframe.index.tz_localize('UTC')
dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
for dt in dataframe.index]
data = {'name': name,
'columns': [str(column) for column in dataframe.columns],
'points': [self._convert_array(x) for x in dataframe.values]}
return data
def _convert_array(self, array):
try:
global np
import numpy as np
except ImportError as ex:
raise ImportError('DataFrameClient requires Numpy, '
'"{ex}" problem importing'.format(ex=str(ex)))
if self.ignore_nan:
number_types = (int, float, np.number)
condition = (all(isinstance(el, number_types) for el in array) and
np.isnan(array))
return list(np.where(condition, None, array))
return list(array)
def _datetime_to_epoch(self, datetime, time_precision='s'):
seconds = (datetime - self.EPOCH).total_seconds()
if time_precision == 's':
return seconds
elif time_precision == 'm' or time_precision == 'ms':
return seconds * 1000
elif time_precision == 'u':
return seconds * 1000000
|