1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
import aiohttp
import asyncio
import pytest
from aioresponses import aioresponses
from datetime import datetime
from foobot_async import FoobotClient
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = FoobotClient('token', 'example@example.com')
def test_get_devices_request():
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
status=200, body='''[{"uuid": "1234127987696AB",
"userId": 2353,
"mac": "013843C3C20A",
"name": "FooBot"}]''')
resp = loop.run_until_complete(client.get_devices())
assert [dict(uuid="1234127987696AB",
userId=2353,
mac="013843C3C20A",
name="FooBot")] == resp
# def test_get_devices_with_session_request():
# session = aiohttp.ClientSession()
# client = FoobotClient('token', 'example@example.com', session)
#
# with aioresponses() as mocked:
# mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
# status=200, body='''[{"uuid": "1234127987696AB",
# "userId": 2353,
# "mac": "013843C3C20A",
# "name": "FooBot"}]''')
# resp = loop.run_until_complete(client.get_devices())
#
# assert [dict(uuid="1234127987696AB",
# userId=2353,
# mac="013843C3C20A",
# name="FooBot")] == resp
def test_failed_auth_request():
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
status=401, body='{"message": "invalid key provided"}')
with pytest.raises(FoobotClient.AuthFailure):
loop.run_until_complete(client.get_devices())
def test_failed_bad_format_request():
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
status=400, body='''{"state": 400,
"message": "ParseException : Bad date format : bad date format [test]",
"requestedUri": "/v2/device/26025766336015C0/datapoint/test/last/test/",
"stack": "[obfuscated]",
"propagatedException": null}''')
with pytest.raises(FoobotClient.BadFormat):
loop.run_until_complete(client.get_devices())
def test_forbidden_access_request():
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
status=403, body='''{"{"state": 403, "message": null,
"stack": "403 - Forbidden",
"propagatedException": null }":
"invalid key provided"}''')
with pytest.raises(FoobotClient.ForbiddenAccess):
loop.run_until_complete(client.get_devices())
def test_overquota_request():
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
status=429, body='')
with pytest.raises(FoobotClient.TooManyRequests):
loop.run_until_complete(client.get_devices())
def test_internal_error_request():
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
status=500, body='')
with pytest.raises(FoobotClient.InternalError):
loop.run_until_complete(client.get_devices())
def test_unhandled_error_request():
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/owner/example@example.com/device/',
status=404, body='')
with pytest.raises(FoobotClient.ClientError):
loop.run_until_complete(client.get_devices())
def test_get_last_data_request():
body = '''{"uuid": "1234127987696AB",
"start": 1518131274,
"end": 1518131874,
"sensors": ["time", "pm", "tmp", "hum", "co2", "voc", "allpollu"],
"units": [ "s", "ugm3", "C", "pc", "ppm", "ppb", "%" ],
"datapoints": [ [ 1518131274, 135.70001, 21.046001, 46.6885, 1178.0,
325.5, 131.19643 ] ] }'''
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/device/1234127987696AB/datapoint/600/last/601/',
status=200, body=body)
resp = loop.run_until_complete(client.get_last_data("1234127987696AB",
600, 601))
assert dict(time=1518131274,
pm=135.70001,
tmp=21.046001,
hum=46.6885,
co2=1178.0,
voc=325.5,
allpollu=131.19643) == resp[0]
def test_get_historical_data_request():
body = '''{"uuid": "1234127987696AB",
"start": 1518131274,
"end": 1518131874,
"sensors": ["time", "pm", "tmp", "hum", "co2", "voc", "allpollu"],
"units": [ "s", "ugm3", "C", "pc", "ppm", "ppb", "%" ],
"datapoints": [ [ 1518131274, 135.70001, 21.046001, 46.6885, 1178.0,
325.5, 131.19643 ] ] }'''
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/device/1234127987696AB/datapoint/1518121274/1518131274/3600/',
status=200, body=body)
resp = loop.run_until_complete(client.get_historical_data("1234127987696AB",
datetime.utcfromtimestamp(1518121274),
datetime.utcfromtimestamp(1518131274),
3600))
assert dict(time=1518131274,
pm=135.70001,
tmp=21.046001,
hum=46.6885,
co2=1178.0,
voc=325.5,
allpollu=131.19643) == resp[0]
def test_get_bad_data_request():
body = '''{"uuid": "1234127987696AB",
"start": 1518131274,
"end": 1518131874}'''
with aioresponses() as mocked:
mocked.get('https://api.foobot.io/v2/device/1234127987696AB/datapoint/1518121274/1518131274/3600/',
status=200, body=body)
with pytest.raises(FoobotClient.InvalidData):
loop.run_until_complete(client.get_historical_data("1234127987696AB",
datetime.utcfromtimestamp(1518121274),
datetime.utcfromtimestamp(1518131274),
3600))
|