File: helpers.py

package info (click to toggle)
python-dlt 2.18.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 452 kB
  • sloc: python: 3,449; makefile: 55
file content (62 lines) | stat: -rw-r--r-- 1,956 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# Copyright (C) 2015. BMW Car IT GmbH. All rights reserved.
"""DLT client helpers"""


class LimitCondition(object):
    """Condition object for counting messages"""

    def __init__(self, limit):
        """Constructor

        :param int limit: The maximum number of the messages for the condition
        """
        self.limit = limit

    def __call__(self):
        if self.limit is None:
            return True

        self.limit = self.limit - 1
        return self.limit >= 0


class ContinuousnessChecker(object):
    """ContinuousnessChecker class is intended to find problems in the order of DLT messages"""

    _ignore = ["DA1-DC1-0"]  # control message will be ignored - there is no continuation

    def __init__(self, start=0):
        self._index = start
        self._counter = dict()

    def __call__(self, message):
        key = "{}-{}-{}".format(message.apid, message.ctid, message.seid)

        self._index += 1

        if key in self._ignore:
            return

        if key in self._counter:
            # message of current type already received - check the continuousness
            err_msg = "Missing message detected. Message"
            err_msg += " #{} (apid='%s', ctid='%s', seid='%s')" % (message.apid, message.ctid, message.seid)
            err_msg += " should have counter '{}' instead of '{}'"

            if not (self._counter[key] + 1) % 256 == message.mcnt:
                counter = self._counter[key]
                self._counter[key] = message.mcnt
                raise RuntimeError(err_msg.format(self._index - 1, (counter + 1) % 256, message.mcnt))

            self._counter[key] = message.mcnt
        else:
            # first message of current type
            self._counter[key] = message.mcnt


def bytes_to_str(byte_or_str):
    """Return string from bytes"""
    if isinstance(byte_or_str, bytes):
        return byte_or_str.decode("utf8", errors="replace")

    return str(byte_or_str)