1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
# -*- coding: utf-8 -*-
"""
This module implements an OS and hardware independent
virtual CAN interface for testing purposes.
Any VirtualBus instances connecting to the same channel
will get the same messages. Sent messages will also be
echoed back to the same bus.
"""
import logging
import time
try:
import queue as queue
except ImportError:
import Queue as queue
from can.bus import BusABC
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Channels are lists of queues, one for each connection
channels = {}
class VirtualBus(BusABC):
"""Virtual CAN bus using an internal message queue for testing."""
def __init__(self, channel=None, **config):
self.channel_info = 'Virtual bus channel %s' % channel
# Create a new channel if one does not exist
if channel not in channels:
channels[channel] = []
self.queue = queue.Queue()
self.channel = channels[channel]
self.channel.append(self.queue)
def recv(self, timeout=None):
try:
msg = self.queue.get(block=True, timeout=timeout)
except queue.Empty:
return None
logger.log(9, 'Received message:\n%s', msg)
return msg
def send(self, msg):
msg.timestamp = time.time()
# Add message to all listening on this channel
for bus_queue in self.channel:
bus_queue.put(msg)
logger.log(9, 'Transmitted message:\n%s', msg)
def shutdown(self):
self.channel.remove(self.queue)
|