1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
# -*- coding: utf-8 -*-
"""Define the line_protocol handler."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from datetime import datetime
from numbers import Integral
from pytz import UTC
from dateutil.parser import parse
EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
def _to_nanos(timestamp):
delta = timestamp - EPOCH
nanos_in_days = delta.days * 86400 * 10 ** 9
nanos_in_seconds = delta.seconds * 10 ** 9
nanos_in_micros = delta.microseconds * 10 ** 3
return nanos_in_days + nanos_in_seconds + nanos_in_micros
def _convert_timestamp(timestamp, precision=None):
if isinstance(timestamp, Integral):
return timestamp # assume precision is correct if timestamp is int
if isinstance(_get_unicode(timestamp), str):
timestamp = parse(timestamp)
if isinstance(timestamp, datetime):
if not timestamp.tzinfo:
timestamp = UTC.localize(timestamp)
ns = _to_nanos(timestamp)
if precision is None or precision == 'n':
return ns
if precision == 'u':
return ns / 10**3
if precision == 'ms':
return ns / 10**6
if precision == 's':
return ns / 10**9
if precision == 'm':
return ns / 10**9 / 60
if precision == 'h':
return ns / 10**9 / 3600
raise ValueError(timestamp)
def _escape_tag(tag):
tag = _get_unicode(tag, force=True)
return tag.replace(
"\\", "\\\\"
).replace(
" ", "\\ "
).replace(
",", "\\,"
).replace(
"=", "\\="
).replace(
"\n", "\\n"
)
def _escape_tag_value(value):
ret = _escape_tag(value)
if ret.endswith('\\'):
ret += ' '
return ret
def quote_ident(value):
"""Indent the quotes."""
return "\"{}\"".format(value
.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n"))
def quote_literal(value):
"""Quote provided literal."""
return "'{}'".format(value
.replace("\\", "\\\\")
.replace("'", "\\'"))
def _is_float(value):
try:
float(value)
except (TypeError, ValueError):
return False
return True
def _escape_value(value):
if value is None:
return ''
value = _get_unicode(value)
if isinstance(value, str):
return quote_ident(value)
if isinstance(value, int) and not isinstance(value, bool):
return str(value) + 'i'
if isinstance(value, bool):
return str(value)
if _is_float(value):
return repr(float(value))
return str(value)
def _get_unicode(data, force=False):
"""Try to return a text aka unicode object from the given data."""
if isinstance(data, bytes):
return data.decode('utf-8')
if data is None:
return ''
if force:
return str(data)
return data
def make_line(measurement, tags=None, fields=None, time=None, precision=None):
"""Extract the actual point from a given measurement line."""
tags = tags or {}
fields = fields or {}
line = _escape_tag(_get_unicode(measurement))
# tags should be sorted client-side to take load off server
tag_list = []
for tag_key in sorted(tags.keys()):
key = _escape_tag(tag_key)
value = _escape_tag(tags[tag_key])
if key != '' and value != '':
tag_list.append(
"{key}={value}".format(key=key, value=value)
)
if tag_list:
line += ',' + ','.join(tag_list)
field_list = []
for field_key in sorted(fields.keys()):
key = _escape_tag(field_key)
value = _escape_value(fields[field_key])
if key != '' and value != '':
field_list.append("{key}={value}".format(
key=key,
value=value
))
if field_list:
line += ' ' + ','.join(field_list)
if time is not None:
timestamp = _get_unicode(str(int(
_convert_timestamp(time, precision)
)))
line += ' ' + timestamp
return line
def make_lines(data, precision=None):
"""Extract points from given dict.
Extracts the points from the given dict and returns a Unicode string
matching the line protocol introduced in InfluxDB 0.9.0.
"""
lines = []
static_tags = data.get('tags')
for point in data['points']:
if static_tags:
tags = dict(static_tags) # make a copy, since we'll modify
tags.update(point.get('tags') or {})
else:
tags = point.get('tags') or {}
line = make_line(
point.get('measurement', data.get('measurement')),
tags=tags,
fields=point.get('fields'),
precision=precision,
time=point.get('time')
)
lines.append(line)
return '\n'.join(lines) + '\n'
|