1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
# coding: utf-8
"""
This module contains the implementation of `can.Listener` and some readers.
"""
from abc import ABCMeta, abstractmethod
try:
# Python 3.7
from queue import SimpleQueue, Empty
except ImportError:
try:
# Python 3.0 - 3.6
from queue import Queue as SimpleQueue, Empty
except ImportError:
# Python 2
from Queue import Queue as SimpleQueue, Empty
try:
import asyncio
except ImportError:
asyncio = None
class Listener(object):
"""The basic listener that can be called directly to handle some
CAN message::
listener = SomeListener()
msg = my_bus.recv()
# now either call
listener(msg)
# or
listener.on_message_received(msg)
# Important to ensure all outputs are flushed
listener.stop()
"""
__metaclass__ = ABCMeta
@abstractmethod
def on_message_received(self, msg):
"""This method is called to handle the given message.
:param can.Message msg: the delivered message
"""
pass
def __call__(self, msg):
return self.on_message_received(msg)
def on_error(self, exc):
"""This method is called to handle any exception in the receive thread.
:param Exception exc: The exception causing the thread to stop
"""
def stop(self):
"""
Stop handling new messages, carry out any final tasks to ensure
data is persisted and cleanup any open resources.
Concrete implementations override.
"""
class RedirectReader(Listener):
"""
A RedirectReader sends all received messages to another Bus.
"""
def __init__(self, bus):
self.bus = bus
def on_message_received(self, msg):
self.bus.send(msg)
class BufferedReader(Listener):
"""
A BufferedReader is a subclass of :class:`~can.Listener` which implements a
**message buffer**: that is, when the :class:`can.BufferedReader` instance is
notified of a new message it pushes it into a queue of messages waiting to
be serviced. The messages can then be fetched with
:meth:`~can.BufferedReader.get_message`.
Putting in messages after :meth:`~can.BufferedReader.stop` has be called will raise
an exception, see :meth:`~can.BufferedReader.on_message_received`.
:attr bool is_stopped: ``True`` iff the reader has been stopped
"""
def __init__(self):
# set to "infinite" size
self.buffer = SimpleQueue()
self.is_stopped = False
def on_message_received(self, msg):
"""Append a message to the buffer.
:raises: BufferError
if the reader has already been stopped
"""
if self.is_stopped:
raise RuntimeError("reader has already been stopped")
else:
self.buffer.put(msg)
def get_message(self, timeout=0.5):
"""
Attempts to retrieve the latest message received by the instance. If no message is
available it blocks for given timeout or until a message is received, or else
returns None (whichever is shorter). This method does not block after
:meth:`can.BufferedReader.stop` has been called.
:param float timeout: The number of seconds to wait for a new message.
:rytpe: can.Message or None
:return: the message if there is one, or None if there is not.
"""
try:
return self.buffer.get(block=not self.is_stopped, timeout=timeout)
except Empty:
return None
def stop(self):
"""Prohibits any more additions to this reader.
"""
self.is_stopped = True
if asyncio is not None:
class AsyncBufferedReader(Listener):
"""A message buffer for use with :mod:`asyncio`.
See :ref:`asyncio` for how to use with :class:`can.Notifier`.
Can also be used as an asynchronous iterator::
async for msg in reader:
print(msg)
"""
def __init__(self, loop=None):
# set to "infinite" size
self.buffer = asyncio.Queue(loop=loop)
def on_message_received(self, msg):
"""Append a message to the buffer.
Must only be called inside an event loop!
"""
self.buffer.put_nowait(msg)
def get_message(self):
"""
Retrieve the latest message when awaited for::
msg = await reader.get_message()
:rtype: can.Message
:return: The CAN message.
"""
return self.buffer.get()
def __aiter__(self):
return self
def __anext__(self):
return self.buffer.get()
|