File: listener.py

package info (click to toggle)
python-can 3.3.2.final~github-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,172 kB
  • sloc: python: 10,208; makefile: 30; sh: 12
file content (174 lines) | stat: -rw-r--r-- 4,789 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# coding: utf-8

"""
This module contains the implementation of `can.Listener` and some readers.
"""

from abc import ABCMeta, abstractmethod

try:
    # Python 3.7
    from queue import SimpleQueue, Empty
except ImportError:
    try:
        # Python 3.0 - 3.6
        from queue import Queue as SimpleQueue, Empty
    except ImportError:
        # Python 2
        from Queue import Queue as SimpleQueue, Empty

try:
    import asyncio
except ImportError:
    asyncio = None


class Listener(object):
    """The basic listener that can be called directly to handle some
    CAN message::

        listener = SomeListener()
        msg = my_bus.recv()

        # now either call
        listener(msg)
        # or
        listener.on_message_received(msg)

        # Important to ensure all outputs are flushed
        listener.stop()
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def on_message_received(self, msg):
        """This method is called to handle the given message.

        :param can.Message msg: the delivered message

        """
        pass

    def __call__(self, msg):
        return self.on_message_received(msg)

    def on_error(self, exc):
        """This method is called to handle any exception in the receive thread.

        :param Exception exc: The exception causing the thread to stop
        """

    def stop(self):
        """
        Stop handling new messages, carry out any final tasks to ensure
        data is persisted and cleanup any open resources.

        Concrete implementations override.
        """


class RedirectReader(Listener):
    """
    A RedirectReader sends all received messages to another Bus.

    """

    def __init__(self, bus):
        self.bus = bus

    def on_message_received(self, msg):
        self.bus.send(msg)


class BufferedReader(Listener):
    """
    A BufferedReader is a subclass of :class:`~can.Listener` which implements a
    **message buffer**: that is, when the :class:`can.BufferedReader` instance is
    notified of a new message it pushes it into a queue of messages waiting to
    be serviced. The messages can then be fetched with
    :meth:`~can.BufferedReader.get_message`.

    Putting in messages after :meth:`~can.BufferedReader.stop` has be called will raise
    an exception, see :meth:`~can.BufferedReader.on_message_received`.

    :attr bool is_stopped: ``True`` iff the reader has been stopped
    """

    def __init__(self):
        # set to "infinite" size
        self.buffer = SimpleQueue()
        self.is_stopped = False

    def on_message_received(self, msg):
        """Append a message to the buffer.

        :raises: BufferError
            if the reader has already been stopped
        """
        if self.is_stopped:
            raise RuntimeError("reader has already been stopped")
        else:
            self.buffer.put(msg)

    def get_message(self, timeout=0.5):
        """
        Attempts to retrieve the latest message received by the instance. If no message is
        available it blocks for given timeout or until a message is received, or else
        returns None (whichever is shorter). This method does not block after
        :meth:`can.BufferedReader.stop` has been called.

        :param float timeout: The number of seconds to wait for a new message.
        :rytpe: can.Message or None
        :return: the message if there is one, or None if there is not.
        """
        try:
            return self.buffer.get(block=not self.is_stopped, timeout=timeout)
        except Empty:
            return None

    def stop(self):
        """Prohibits any more additions to this reader.
        """
        self.is_stopped = True


if asyncio is not None:
    class AsyncBufferedReader(Listener):
        """A message buffer for use with :mod:`asyncio`.

        See :ref:`asyncio` for how to use with :class:`can.Notifier`.
        
        Can also be used as an asynchronous iterator::

            async for msg in reader:
                print(msg)
        """

        def __init__(self, loop=None):
            # set to "infinite" size
            self.buffer = asyncio.Queue(loop=loop)

        def on_message_received(self, msg):
            """Append a message to the buffer.
            
            Must only be called inside an event loop!
            """
            self.buffer.put_nowait(msg)

        def get_message(self):
            """
            Retrieve the latest message when awaited for::
            
                msg = await reader.get_message()

            :rtype: can.Message
            :return: The CAN message.
            """
            return self.buffer.get()

        def __aiter__(self):
            return self
        
        def __anext__(self):
            return self.buffer.get()