File: devfile.py

package info (click to toggle)
nxt-python 3.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 812 kB
  • sloc: python: 6,857; xml: 22; makefile: 20; sh: 4
file content (151 lines) | stat: -rw-r--r-- 4,591 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# nxt.backend.devfile module -- Device file backend
# Copyright (C) 2013  Dave Churchill, Marcus Wanner
# Copyright (C) 2021  Nicolas Schodet
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

import glob
import logging
import platform
import struct
import tty

import nxt.brick

logger = logging.getLogger(__name__)


class DevFileSock:
    """Device file socket connected to a NXT brick."""

    #: Block size.
    bsize = 118

    #: Connection type, used to evaluate latency.
    type = "bluetooth"

    def __init__(self, filename):
        self._filename = filename

    def __str__(self):
        return f"DevFile ({self._filename})"

    def connect(self):
        """Connect to NXT brick.

        :return: Connected brick.
        :rtype: Brick
        """
        logger.info("connecting via %s", self._filename)
        self._device = open(self._filename, "r+b", buffering=0)
        tty.setraw(self._device)
        return nxt.brick.Brick(self)

    def close(self):
        """Close the connection."""
        if self._device is not None:
            logger.info("closing %s connection", self._filename)
            self._device.close()
            self._device = None

    def send(self, data):
        """Send raw data.

        :param bytes data: Data to send.
        """
        data = struct.pack("<H", len(data)) + data
        logger.debug("send: %s", data.hex())
        self._device.write(data)

    def recv(self):
        """Receive raw data.

        :return: Received data.
        :rtype: bytes
        """
        data = self._device.read(2)
        logger.debug("recv: %s", data.hex())
        (plen,) = struct.unpack("<H", data)
        data = self._device.read(plen)
        logger.debug("recv: %s", data.hex())
        return data


class Backend:
    """Device file backend.

    This uses a device file present on Linux or macOS /dev file system, which allows to
    connect to the NXT brick without needing a Bluetooth python package.

    You only need to use this backend if the :mod:`~nxt.backend.bluetooth` backend is
    not working for you.

    On Linux, you need to pair the NXT brick, then you can use the rfcomm tool::

        sudo rfcomm bind 0 00:16:53:01:02:03

    Where ``00:16:53:01:02:03`` is the Bluetooth address of your NXT brick. This will
    create a ``/dev/rfcomm0`` device file which can be used to communicate with the NXT
    brick.

    On macOS, you need to pair the NXT brick, then open Bluetooth preferences, select
    the NXT brick, click “Edit serial ports”. It should show “NXT-DevB-1”. If not, add
    a serial port using:

    - Port name: NXT-DevB-1
    - Device service: Dev B
    - Port type: RS-232

    This should create a ``/dev/tty.NXT-DevB-1`` device file which can be used to
    communicate with the NXT brick.
    """

    def find(self, name=None, filename=None, **kwargs):
        """Find bricks connected using Bluetooth using device file.

        :param name: Brick name (example: ``"NXT"``).
        :type name: str or None
        :param filename: Device file name (example: ``"/dev/rfcomm0"``).
        :type filename: str or None
        :param kwargs: Other parameters are ignored.
        :return: Iterator over all found bricks.
        :rtype: Iterator[Brick]
        """
        if filename:
            matches = [filename]
        else:
            system = platform.system()
            if system == "Linux":
                matches = glob.glob("/dev/rfcomm*")
            elif system == "Darwin":
                if name:
                    matches = glob.glob("/dev/*%s*" % name)
                else:
                    matches = glob.glob("/dev/*-DevB*")
            else:
                matches = []
        for match in matches:
            sock = DevFileSock(match)
            try:
                brick = sock.connect()
            except OSError:
                logger.exception("failed to connect to device %s", sock)
            else:
                yield brick


def get_backend():
    """Get an instance of the device file backend.

    :return: Device file backend.
    :rtype: Backend
    """
    return Backend()