1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
from dlt.helpers import ContinuousnessChecker
import pytest
class Msg(object):
def __init__(self, apid, ctid, seid, mcnt):
self.apid = apid
self.ctid = ctid
self.seid = seid
self.mcnt = mcnt
def run_check(messages):
cont = ContinuousnessChecker()
for msg in messages:
cont(msg)
class TestsContinuousness(object):
def test_simple(self):
messages = [
Msg("X", "Y", "99", 4),
Msg("X", "Y", "99", 5),
Msg("X", "Y", "99", 6),
Msg("X", "Y", "99", 7),
Msg("X", "Y", "99", 8),
]
run_check(messages)
def test_simple_missing(self):
with pytest.raises(RuntimeError):
messages = [
Msg("X", "Y", "99", 4),
Msg("X", "Y", "99", 5),
Msg("X", "Y", "99", 6),
# 7 is missing
Msg("X", "Y", "99", 8),
Msg("X", "Y", "99", 9),
]
run_check(messages)
def test_simple_over(self):
# message counter is a unsigned char so counts till 255 and then restarted back to 0
messages = [Msg("X", "Y", "99", 254), Msg("X", "Y", "99", 255), Msg("X", "Y", "99", 0), Msg("X", "Y", "99", 1)]
run_check(messages)
def test_simple_reset(self):
with pytest.raises(RuntimeError):
messages = [Msg("X", "Y", "99", 230), Msg("X", "Y", "99", 231), Msg("X", "Y", "99", 0)]
run_check(messages)
def test_ignore_control(self):
messages = [Msg("DA1", "DC1", "0", 0), Msg("X", "Y", "99", 231), Msg("DA1", "DC1", "0", 0)]
run_check(messages)
def test_zeros_da1_dc1(self):
messages = [Msg("DA1", "DC1", "0", 0), Msg("DA1", "DC1", "0", 0)]
run_check(messages)
def test_zeros_non_da1_dc1(self):
with pytest.raises(RuntimeError):
messages = [Msg("X", "Y", "0", 0), Msg("X", "Y", "0", 0)]
run_check(messages)
|