1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
from datetime import datetime
from unittest import TestCase
from unittest.mock import patch, Mock, MagicMock
from requests import Response
from youless_api.gateway import fetch_enologic_api
def mock_ls120_ok(*args, **kwargs) -> Response:
if args[0] == 'http://localhost/e':
return Mock(
ok=True,
headers={'Content-Type': 'application/json'},
json=lambda: [{
"tm": 1611929119,
"net": 9194.164,
"pwr": 2382,
"ts0": 1608654000,
"cs0": 0.000,
"ps0": 0,
"p1": 4703.562,
"p2": 4490.631,
"n1": 0.029,
"n2": 0.000,
"gas": 1624.264,
"gts": int(datetime.now().strftime("%y%m%d%H00")),
"wtr": 1234.564,
"wts": int(datetime.now().strftime("%y%m%d%H00"))
}]
)
return Mock(ok=False)
def mock_stale_gas(*args, **kwargs) -> Response:
if args[0] == 'http://localhost/e':
return Mock(
ok=True,
headers={'Content-Type': 'application/json'},
json=lambda: [{
"tm": 1611929119,
"net": 9194.164,
"pwr": 2382,
"ts0": 1608654000,
"cs0": 15.000,
"ps0": 10,
"p1": 4703.562,
"p2": 4490.631,
"n1": 0.029,
"n2": 0.000,
"gas": 1624.264,
"gts": 3894900,
"wtr": 1234.564,
"wts": 3894900
}]
)
return Mock(ok=False)
def mock_enologic_missing_values(*args, **kwargs) -> Response:
if args[0] == 'http://localhost/e':
return Mock(
ok=True,
headers={'Content-Type': 'application/json'},
json=lambda: [{
"tm": 1611929119,
"net": 9194.164,
"pwr": 2382,
"ts0": 1608654000,
"cs0": 0.000,
"ps0": 0,
"gas": 1624.264,
"gts": int(datetime.now().strftime("%y%m%d%H00")),
"wtr": 1234.564,
"wts": int(datetime.now().strftime("%y%m%d%H00"))
}]
)
return Mock(ok=False)
def mock_enologic_error(*args, **kwargs) -> Response:
return Mock(ok=False)
class GatewayTest(TestCase):
@patch('youless_api.gateway.requests.get', side_effect=mock_ls120_ok)
def test_enologic_correct(self, mock_get: MagicMock):
dataset = fetch_enologic_api('localhost', None)
self.assertEqual(dataset['net'], 9194.164)
self.assertEqual(dataset['p2'], 4490.631)
self.assertEqual(dataset['p1'], 4703.562)
self.assertEqual(dataset['pwr'], 2382)
self.assertEqual(dataset['gas'], 1624.264)
self.assertEqual(dataset['wtr'], 1234.564)
self.assertEqual(dataset['cs0'], 0.000)
self.assertEqual(dataset['n1'], 0.029)
mock_get.assert_any_call('http://localhost/e', auth=None, timeout=2)
@patch('youless_api.gateway.requests.get', side_effect=mock_stale_gas)
def test_enologic_stale_gas(self, mock_get: MagicMock):
dataset = fetch_enologic_api('localhost', None)
self.assertEqual(dataset['net'], 9194.164)
self.assertEqual(dataset['p2'], 4490.631)
self.assertEqual(dataset['p1'], 4703.562)
self.assertEqual(dataset['pwr'], 2382)
self.assertEqual(dataset['gas'], None)
self.assertEqual(dataset['wtr'], None)
self.assertEqual(dataset['cs0'], 15.000)
self.assertEqual(dataset['ps0'], 10)
self.assertEqual(dataset['n1'], 0.029)
mock_get.assert_any_call('http://localhost/e', auth=None, timeout=2)
@patch('youless_api.gateway.requests.get', side_effect=mock_enologic_missing_values)
def test_enologic_missing_p_and_n(self, mock_get: MagicMock):
dataset = fetch_enologic_api('localhost', None)
self.assertEqual(dataset['p1'], None)
self.assertEqual(dataset['p2'], None)
self.assertEqual(dataset['n1'], None)
self.assertEqual(dataset['n2'], None)
mock_get.assert_any_call('http://localhost/e', auth=None, timeout=2)
@patch('youless_api.gateway.requests.get', side_effect=mock_enologic_error)
def test_enologic_error(self, mock_get: MagicMock):
dataset = fetch_enologic_api('localhost', None)
self.assertEqual(dataset, None)
mock_get.assert_any_call('http://localhost/e', auth=None, timeout=2)
|