File: dataframe.py

package info (click to toggle)
python-aioinflux 0.9.0-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 320 kB
  • sloc: python: 1,402; makefile: 41
file content (127 lines) | stat: -rw-r--r-- 4,245 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import re
from functools import reduce
from itertools import chain
from typing import Union, Dict, List

import pandas as pd
import numpy as np

from .common import *

DataFrameType = Union[pd.DataFrame, Dict[str, pd.DataFrame], List[Dict[str, pd.DataFrame]]]


# Serialization helper functions
# -------------------------------

def _serializer(series) -> pd.DataFrame:
    df = pd.DataFrame(series.get('values', []), columns=series['columns'])
    if 'time' not in df.columns:
        return df
    df: pd.DataFrame = df.set_index(pd.to_datetime(df['time'])).drop('time', axis=1)
    df.index = df.index.tz_localize('UTC')
    df.index.name = None
    if 'tags' in series:
        for k, v in series['tags'].items():
            df[k] = v
    if 'name' in series:
        df.name = series['name']
    return df


def _get_name(series):
    tags = [f'{k}={v}' for k, v in series.get('tags', {}).items()]
    return ','.join(filter(None, [series.get('name'), *tags])) or None


def _drop_zero_index(df):
    if isinstance(df.index, pd.DatetimeIndex):
        if all(i.value == 0 for i in df.index):
            return df.reset_index(drop=True)
    return df


def parse(resp) -> DataFrameType:
    """Makes a dictionary of DataFrames from a response object"""
    statements = []
    for statement in resp['results']:
        series = {}
        for s in statement.get('series', []):
            series[_get_name(s)] = _drop_zero_index(_serializer(s))
        statements.append(series)

    if len(statements) == 1:
        series: dict = statements[0]
        if len(series) == 1:
            return list(series.values())[0]  # DataFrame
        else:
            return series  # dict
    return statements  # list


# Parsing helper functions
# -------------------------

def _itertuples(df):
    """Custom implementation of ``DataFrame.itertuples`` that
    returns plain tuples instead of namedtuples. About 50% faster.
    """
    cols = [df.iloc[:, k] for k in range(len(df.columns))]
    return zip(df.index, *cols)


def _replace(df):
    obj_cols = {k for k, v in dict(df.dtypes).items() if v is np.dtype('O')}
    other_cols = set(df.columns) - obj_cols
    obj_nans = (f'{k}="nan"' for k in obj_cols)
    other_nans = (f'{k}=nani?' for k in other_cols)
    replacements = [
        ('|'.join(chain(obj_nans, other_nans)), ''),
        (',{2,}', ','),
        ('|'.join([', ,', ', ', ' ,']), ' '),
    ]
    return replacements


def serialize(df, measurement, tag_columns=None, **extra_tags) -> bytes:
    """Converts a Pandas DataFrame into line protocol format"""
    # Pre-processing
    if measurement is None:
        raise ValueError("Missing 'measurement'")
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError('DataFrame index is not DatetimeIndex')
    tag_columns = set(tag_columns or [])
    isnull = df.isnull().any(axis=1)

    # Make parser function
    tags = []
    fields = []
    for k, v in extra_tags.items():
        tags.append(f"{k}={escape(v, key_escape)}")
    for i, (k, v) in enumerate(df.dtypes.items()):
        k = k.translate(key_escape)
        if k in tag_columns:
            tags.append(f"{k}={{p[{i+1}]}}")
        elif issubclass(v.type, np.integer):
            fields.append(f"{k}={{p[{i+1}]}}i")
        elif issubclass(v.type, (np.float, np.bool_)):
            fields.append(f"{k}={{p[{i+1}]}}")
        else:
            # String escaping is skipped for performance reasons
            # Strings containing double-quotes can cause strange write errors
            # and should be sanitized by the user.
            # e.g., df[k] = df[k].astype('str').str.translate(str_escape)
            fields.append(f"{k}=\"{{p[{i+1}]}}\"")
    fmt = (f'{measurement}', f'{"," if tags else ""}', ','.join(tags),
           ' ', ','.join(fields), ' {p[0].value}')
    f = eval("lambda p: f'{}'".format(''.join(fmt)))

    # Map/concat
    if isnull.any():
        lp = map(f, _itertuples(df[~isnull]))
        rep = _replace(df)
        lp_nan = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
                  for p in _itertuples(df[isnull]))
        return '\n'.join(chain(lp, lp_nan)).encode('utf-8')
    else:
        return '\n'.join(map(f, _itertuples(df))).encode('utf-8')