File: usb.py

package info (click to toggle)
nxt-python 3.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 812 kB
  • sloc: python: 6,857; xml: 22; makefile: 20; sh: 4
file content (117 lines) | stat: -rw-r--r-- 3,102 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# nxt.backend.usb module -- USB backend
# Copyright (C) 2006, 2007  Douglas P Lau
# Copyright (C) 2009  Marcus Wanner
# Copyright (C) 2011  Paul Hollensen, Marcus Wanner
# Copyright (C) 2021  Nicolas Schodet
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

import logging
import os

import usb.core

import nxt.brick

logger = logging.getLogger(__name__)

# LEGO USB vendor ID.
ID_VENDOR_LEGO = 0x0694

# NXT brick product ID.
ID_PRODUCT_NXT = 0x0002


class USBSock:
    """USB socket connected to a NXT brick."""

    #: Block size.
    bsize = 60

    #: Connection type, used to evaluate latency.
    type = "usb"

    def __init__(self, dev):
        self._dev = dev
        self._epout = None
        self._epin = None

    def __str__(self):
        return "USB (Bus %03d Device %03d)" % (self._dev.bus, self._dev.address)

    def connect(self):
        """Connect to NXT brick.

        :return: Connected brick.
        :rtype: Brick
        """
        logger.info("connecting via %s", self)
        if os.name != "nt":
            # Do not reset device on Windows, see
            # https://github.com/schodet/nxt-python/issues/182 and
            # https://github.com/schodet/nxt-python/issues/33
            self._dev.reset()
        self._dev.set_configuration()
        intf = self._dev.get_active_configuration()[(0, 0)]
        self._epout, self._epin = intf
        return nxt.brick.Brick(self)

    def close(self):
        """Close the connection."""
        if self._epout is not None:
            logger.info("closing %s connection", self)
            self._epout = None
            self._epin = None

    def send(self, data):
        """Send raw data.

        :param bytes data: Data to send.
        """
        logger.debug("send: %s", data.hex())
        self._epout.write(data)

    def recv(self):
        """Receive raw data.

        :return: Received data.
        :rtype: bytes
        """
        data = self._epin.read(64).tobytes()
        logger.debug("recv: %s", data.hex())
        return data


class Backend:
    """USB backend."""

    def find(self, **kwargs):
        """Find bricks connected using USB.

        :param kwargs: Parameters are ignored.
        :return: Iterator over all found bricks.
        :rtype: Iterator[Brick]
        """
        for dev in usb.core.find(
            find_all=True, idVendor=ID_VENDOR_LEGO, idProduct=ID_PRODUCT_NXT
        ):
            sock = USBSock(dev)
            brick = sock.connect()
            yield brick


def get_backend():
    """Get an instance of the Bluetooth backend.

    :return: Bluetooth backend.
    :rtype: Backend
    """
    return Backend()