File: test_uart.py

package info (click to toggle)
zigpy-deconz 0.25.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 360 kB
  • sloc: python: 3,911; makefile: 5
file content (156 lines) | stat: -rw-r--r-- 3,962 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""Tests for the uart module."""

import logging
from unittest import mock

import pytest
from zigpy.config import (
    CONF_DEVICE_BAUDRATE,
    CONF_DEVICE_FLOW_CONTROL,
    CONF_DEVICE_PATH,
)
import zigpy.serial

from zigpy_deconz import uart


@pytest.fixture
def gw():
    gw = uart.Gateway(mock.MagicMock())
    gw._transport = mock.MagicMock()
    return gw


async def test_connect(monkeypatch):
    api = mock.MagicMock()

    async def mock_conn(loop, protocol_factory, **kwargs):
        protocol = protocol_factory()
        loop.call_soon(protocol.connection_made, None)
        return None, protocol

    monkeypatch.setattr(zigpy.serial, "create_serial_connection", mock_conn)

    await uart.connect(
        {
            CONF_DEVICE_PATH: "/dev/null",
            CONF_DEVICE_BAUDRATE: 115200,
            CONF_DEVICE_FLOW_CONTROL: None,
        },
        api,
    )


def test_send(gw):
    data = b"test"
    gw.send(data)

    packet = b""
    packet += b"\xC0"  # END
    packet += b"test"  # data
    packet += b"\x40\xFE"  # checksum
    packet += b"\xC0"  # END

    gw._transport.write.assert_called_once_with(packet)


def test_close(gw):
    gw.close()
    assert gw._transport.close.call_count == 1


def test_data_received_chunk_frame(gw):
    data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0"
    gw.data_received(data[:-4])
    assert gw._api.data_received.call_count == 0
    gw.data_received(data[-4:])
    assert gw._api.data_received.call_count == 1
    assert gw._api.data_received.call_args[0][0] == data[:-3]


def test_data_received_full_frame(gw):
    data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0"
    gw.data_received(data)
    assert gw._api.data_received.call_count == 1
    assert gw._api.data_received.call_args[0][0] == data[:-3]


def test_data_received_incomplete_frame(gw):
    data = b"~\x00\x00"
    gw.data_received(data)
    assert gw._api.data_received.call_count == 0


def test_data_received_runt_frame(gw):
    data = b"\x02\x44\xC0"
    gw.data_received(data)
    assert gw._api.data_received.call_count == 0


def test_data_received_extra(gw):
    data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0\x00"
    gw.data_received(data)
    assert gw._api.data_received.call_count == 1
    assert gw._api.data_received.call_args[0][0] == data[:-4]
    assert gw._buffer == b"\x00"


def test_data_received_wrong_checksum(gw):
    data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFE\xC0"
    gw.data_received(data)
    assert gw._api.data_received.call_count == 0


def test_data_received_error(gw, caplog):
    data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0"

    gw._api.data_received.side_effect = [RuntimeError("error")]

    with caplog.at_level(logging.ERROR):
        gw.data_received(data)

    assert "RuntimeError" in caplog.text and "handling the frame" in caplog.text

    assert gw._api.data_received.call_count == 1
    assert gw._api.data_received.call_args[0][0] == data[:-3]


def test_unescape(gw):
    data = b"\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00"
    data_unescaped = b"\x00\xC0\x00\xDB\x00\x00\x00"
    r = gw._unescape(data)
    assert r == data_unescaped


def test_unescape_error(gw):
    data = b"\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00\xDB"
    r = gw._unescape(data)
    assert r is None


def test_escape(gw):
    data = b"\x00\xC0\x00\xDB\x00\x00\x00"
    data_escaped = b"\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00"
    r = gw._escape(data)
    assert r == data_escaped


def test_checksum(gw):
    data = b"\x07\x01\x00\x08\x00\xaa\x00\x02"
    checksum = b"\x44\xFF"
    r = gw._checksum(data)
    assert r == checksum


def test_connection_lost_exc(gw):
    gw.connection_lost(mock.sentinel.exception)

    conn_lost = gw._api.connection_lost
    assert conn_lost.call_count == 1
    assert conn_lost.call_args[0][0] is mock.sentinel.exception


def test_connection_closed(gw):
    gw.connection_lost(None)

    assert gw._api.connection_lost.call_count == 1