1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
# nxt.backend.usb module -- USB backend
# Copyright (C) 2006, 2007 Douglas P Lau
# Copyright (C) 2009 Marcus Wanner
# Copyright (C) 2011 Paul Hollensen, Marcus Wanner
# Copyright (C) 2021 Nicolas Schodet
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import logging
import os
import usb.core
import nxt.brick
logger = logging.getLogger(__name__)
# LEGO USB vendor ID.
ID_VENDOR_LEGO = 0x0694
# NXT brick product ID.
ID_PRODUCT_NXT = 0x0002
class USBSock:
"""USB socket connected to a NXT brick."""
#: Block size.
bsize = 60
#: Connection type, used to evaluate latency.
type = "usb"
def __init__(self, dev):
self._dev = dev
self._epout = None
self._epin = None
def __str__(self):
return "USB (Bus %03d Device %03d)" % (self._dev.bus, self._dev.address)
def connect(self):
"""Connect to NXT brick.
:return: Connected brick.
:rtype: Brick
"""
logger.info("connecting via %s", self)
if os.name != "nt":
# Do not reset device on Windows, see
# https://github.com/schodet/nxt-python/issues/182 and
# https://github.com/schodet/nxt-python/issues/33
self._dev.reset()
self._dev.set_configuration()
intf = self._dev.get_active_configuration()[(0, 0)]
self._epout, self._epin = intf
return nxt.brick.Brick(self)
def close(self):
"""Close the connection."""
if self._epout is not None:
logger.info("closing %s connection", self)
self._epout = None
self._epin = None
def send(self, data):
"""Send raw data.
:param bytes data: Data to send.
"""
logger.debug("send: %s", data.hex())
self._epout.write(data)
def recv(self):
"""Receive raw data.
:return: Received data.
:rtype: bytes
"""
data = self._epin.read(64).tobytes()
logger.debug("recv: %s", data.hex())
return data
class Backend:
"""USB backend."""
def find(self, **kwargs):
"""Find bricks connected using USB.
:param kwargs: Parameters are ignored.
:return: Iterator over all found bricks.
:rtype: Iterator[Brick]
"""
for dev in usb.core.find(
find_all=True, idVendor=ID_VENDOR_LEGO, idProduct=ID_PRODUCT_NXT
):
sock = USBSock(dev)
brick = sock.connect()
yield brick
def get_backend():
"""Get an instance of the Bluetooth backend.
:return: Bluetooth backend.
:rtype: Backend
"""
return Backend()
|