File: virtual.py

package info (click to toggle)
python-can 1.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 644 kB
  • ctags: 1,184
  • sloc: python: 4,373; makefile: 14
file content (60 lines) | stat: -rw-r--r-- 1,540 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# -*- coding: utf-8 -*-

"""
This module implements an OS and hardware independent
virtual CAN interface for testing purposes.

Any VirtualBus instances connecting to the same channel
will get the same messages. Sent messages will also be
echoed back to the same bus.
"""

import logging
import time
try:
    import queue as queue
except ImportError:
    import Queue as queue
from can.bus import BusABC


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


# Channels are lists of queues, one for each connection
channels = {}


class VirtualBus(BusABC):
    """Virtual CAN bus using an internal message queue for testing."""

    def __init__(self, channel=None, **config):
        self.channel_info = 'Virtual bus channel %s' % channel

        # Create a new channel if one does not exist
        if channel not in channels:
            channels[channel] = []

        self.queue = queue.Queue()
        self.channel = channels[channel]
        self.channel.append(self.queue)

    def recv(self, timeout=None):
        try:
            msg = self.queue.get(block=True, timeout=timeout)
        except queue.Empty:
            return None

        logger.log(9, 'Received message:\n%s', msg)
        return msg

    def send(self, msg):
        msg.timestamp = time.time()
        # Add message to all listening on this channel
        for bus_queue in self.channel:
            bus_queue.put(msg)
        logger.log(9, 'Transmitted message:\n%s', msg)

    def shutdown(self):
        self.channel.remove(self.queue)