File: test_message_sync.py

package info (click to toggle)
python-can 4.6.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,428 kB
  • sloc: python: 27,154; makefile: 32; sh: 16
file content (118 lines) | stat: -rw-r--r-- 3,625 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python

"""
This module tests :class:`can.MessageSync`.
"""

import gc
import time
import unittest
from copy import copy

import pytest

from can import Message, MessageSync

from .config import IS_CI, IS_GITHUB_ACTIONS, IS_LINUX, IS_OSX, IS_TRAVIS
from .data.example_data import TEST_MESSAGES_BASE
from .message_helper import ComparingMessagesTestCase

TEST_FEWER_MESSAGES = TEST_MESSAGES_BASE[::2]


def inc(value):
    """Makes the test boundaries give some more space when run on the CI server."""
    if IS_CI:
        return value * 1.5
    else:
        return value


skip_on_unreliable_platforms = unittest.skipIf(
    (IS_TRAVIS and IS_OSX) or (IS_GITHUB_ACTIONS and not IS_LINUX),
    "this environment's timings are too unpredictable",
)


@skip_on_unreliable_platforms
class TestMessageSync(unittest.TestCase, ComparingMessagesTestCase):
    def __init__(self, *args, **kwargs):
        unittest.TestCase.__init__(self, *args, **kwargs)
        ComparingMessagesTestCase.__init__(self)

    def setup_method(self, _):
        # disabling the garbage collector makes the time readings more reliable
        gc.disable()

    def teardown_method(self, _):
        # we need to reenable the garbage collector again
        gc.enable()

    def test_general(self):
        messages = [
            Message(timestamp=50.0),
            Message(timestamp=50.0),
            Message(timestamp=50.0 + 0.05),
            Message(timestamp=50.0 + 0.13),
            Message(timestamp=50.0),  # back in time
        ]
        sync = MessageSync(messages, gap=0.0, skip=0.0)

        t_start = time.perf_counter()
        collected = []
        timings = []
        for message in sync:
            t_now = time.perf_counter()
            collected.append(message)
            timings.append(t_now - t_start)

        self.assertMessagesEqual(messages, collected)
        self.assertEqual(len(timings), len(messages), "programming error in test code")

        self.assertTrue(0.0 <= timings[0] < 0.0 + inc(0.02), str(timings[0]))
        self.assertTrue(0.0 <= timings[1] < 0.0 + inc(0.02), str(timings[1]))
        self.assertTrue(0.045 <= timings[2] < 0.05 + inc(0.02), str(timings[2]))
        self.assertTrue(0.125 <= timings[3] < 0.13 + inc(0.02), str(timings[3]))
        self.assertTrue(0.125 <= timings[4] < 0.13 + inc(0.02), str(timings[4]))

    def test_skip(self):
        messages = copy(TEST_FEWER_MESSAGES)
        sync = MessageSync(messages, skip=0.005, gap=0.0)

        before = time.perf_counter()
        collected = list(sync)
        after = time.perf_counter()
        took = after - before

        # the handling of the messages itself also takes some time:
        # ~0.001 s/message on a ThinkPad T560 laptop (Ubuntu 18.04, i5-6200U)
        assert 0 < took < inc(len(messages) * (0.005 + 0.003)), f"took: {took}s"

        self.assertMessagesEqual(messages, collected)


@skip_on_unreliable_platforms
@pytest.mark.parametrize(
    "timestamp_1,timestamp_2", [(0.0, 0.0), (0.0, 0.01), (0.01, 1.5)]
)
def test_gap(timestamp_1, timestamp_2):
    """This method is alone so it can be parameterized."""
    messages = [
        Message(arbitration_id=0x1, timestamp=timestamp_1),
        Message(arbitration_id=0x2, timestamp=timestamp_2),
    ]
    sync = MessageSync(messages, timestamps=False, gap=0.1)

    gc.disable()
    before = time.perf_counter()
    collected = list(sync)
    after = time.perf_counter()
    gc.enable()
    took = after - before

    assert 0.195 <= took < 0.2 + inc(0.02)
    assert messages == collected


if __name__ == "__main__":
    unittest.main()