File: mapping.py

package info (click to toggle)
python-aioinflux 0.9.0-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 320 kB
  • sloc: python: 1,402; makefile: 41
file content (74 lines) | stat: -rw-r--r-- 2,238 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import time
from typing import Mapping

import ciso8601

from .common import *


def serialize(point: Mapping, measurement=None, **extra_tags) -> bytes:
    """Converts dictionary-like data into a single line protocol line (point)"""
    tags = _serialize_tags(point, extra_tags)
    return (
        f'{_serialize_measurement(point, measurement)}'
        f'{"," if tags else ""}{tags} '
        f'{_serialize_fields(point)} '
        f'{_serialize_timestamp(point)}'
    ).encode()


def _serialize_measurement(point, measurement):
    try:
        return escape(point['measurement'], measurement_escape)
    except KeyError:
        if measurement is None:
            raise ValueError("'measurement' missing")
        return escape(measurement, measurement_escape)


def _serialize_tags(point, extra_tags):
    output = []
    for k, v in {**point.get('tags', {}), **extra_tags}.items():
        k = escape(k, key_escape)
        v = escape(v, tag_escape)
        if not v:
            continue  # ignore blank/null string tags
        output.append(f'{k}={v}')
    return ','.join(output)


def _serialize_timestamp(point):
    dt = point.get('time')
    if not dt:
        return ''
    elif isinstance(dt, int):
        return dt
    elif isinstance(dt, (str, bytes)):
        dt = ciso8601.parse_datetime(dt)
        if not dt:
            raise ValueError(f'Invalid datetime string: {dt!r}')

    if not dt.tzinfo:
        # Assume tz-naive input to be in UTC, not local time
        return int(dt.timestamp() - time.timezone) * 10 ** 9 + dt.microsecond * 1000
    return int(dt.timestamp()) * 10 ** 9 + dt.microsecond * 1000


def _serialize_fields(point):
    """Field values can be floats, integers, strings, or Booleans."""
    output = []
    for k, v in point['fields'].items():
        k = escape(k, key_escape)
        if isinstance(v, bool):
            output.append(f'{k}={v}')
        elif isinstance(v, int):
            output.append(f'{k}={v}i')
        elif isinstance(v, str):
            output.append(f'{k}="{v.translate(str_escape)}"')
        elif v is None:
            # Empty values
            continue
        else:
            # Floats
            output.append(f'{k}={v}')
    return ','.join(output)