1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
import pytest
from aioinflux import iterpoints
from testing_utils import logger
@pytest.mark.asyncio
async def test_iterpoints_with_parser(iter_client):
r = await iter_client.query("SELECT * FROM cpu_load LIMIT 3")
parser = lambda *x, meta: dict(zip(meta['columns'], x)) # noqa
for i in iterpoints(r, parser):
logger.info(i)
assert 'time' in i
assert 'value' in i
assert 'host' in i
@pytest.mark.asyncio
async def test_aiter_point(iter_client):
resp = await iter_client.query('SELECT * from cpu_load', chunked=True, chunk_size=10)
points = []
async for chunk in resp:
for point in iterpoints(chunk):
points.append(point)
assert len(points) == 100
@pytest.mark.asyncio
async def test_iter_point_namedtuple(iter_client):
from collections import namedtuple
nt = namedtuple('cpu_load', ['time', 'direction', 'host', 'region', 'value'])
resp = await iter_client.query('SELECT * from cpu_load')
points = []
for point in iterpoints(resp, parser=nt):
points.append(point)
assert len(point) == 5
assert len(points) == 100
|