File: cedrus.py

package info (click to toggle)
psychopy 2020.2.10%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 44,056 kB
  • sloc: python: 119,649; javascript: 3,022; makefile: 148; sh: 125; xml: 9
file content (211 lines) | stat: -rw-r--r-- 7,078 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2020 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

"""Cedrus make a variety of input devices.
See http://www.cedrus.com/

DEPRECATED:
This sub-package is out of date. Please use the cedrus-written
pyxid package instead (bundled with Standalone PsychoPy)::
    import pyxid

----------
"""
from __future__ import absolute_import, print_function

from builtins import range
from builtins import object
from psychopy import core, logging
import struct
import sys

try:
    import serial
except ImportError:
    serial = False


class RB730(object):
    """Class to control/read a Cedrus RB-series response box
    """
    class KeyEvent(object):
        """Info about a keypress from Cedrus keypad XID string
        """

        def __init__(self, XID):
            """XID should contain a "k"<info><rt> where info is a byte
            and rt is 4 bytes (=int)
            """
            super(KeyEvent, self).__init__()
            if len(XID) != 6:
                # log.error("The XID string %s is %i bytes long and should
                #   be 6 bytes" %(str([XID]),len(XID)))
                self.key = None
            else:
                # a character and a ubyte of info
                info = struct.unpack('B', XID[1])[0]

                # was the key going down or up?
                if (info >> 4) % 2:  # this gives only the 4th bit
                    self.direction = 'down'
                else:
                    self.direction = 'up'

                # what was the key?
                self.key = info >> 5  # bits 5-7 give the button number

                # what was RT?
                self.rt = struct.unpack('i', XID[2:])[0]  # integer in ms

    def __init__(self, port, baudrate=115200, mode='XID'):
        super(RB730, self).__init__()

        if not serial:
            raise ImportError("The module serial is needed to connect to the"
                              " Cedrus response pad. On most systems this can"
                              " be installed with\n\t easy_install pyserial")

        self.model = 'RB703'
        # set name of port
        if type(port) in [int, float]:
            self.portNumber = port
            self.portString = 'COM%i' % self.portNumber
        else:
            self.portString = port
            self.portNumber = None
        self.mode = mode  # can be 'xid', 'rb', 'ascii'
        self.baudrate = baudrate
        # open the serial port
        self.port = serial.Serial(self.portString, baudrate=baudrate,
                                  bytesize=8, parity='N', stopbits=1,
                                  timeout=0.0001)
        if not self.port.isOpen():
            self.port.open()
        # self.buffer = ''  # our own buffer (in addition to the serial port
        # buffer)
        self.clearBuffer()

    def sendMessage(self, message):
        self.port.writelines(message)

    def _clearBuffer(self):
        """DEPRECATED as of 1.00.05
        """
        self.port.flushInput()

    def clearBuffer(self):
        """Empty the input buffer of all characters. Call this to clear
        any keypresses that haven't yet been handled.
        """
        self.port.flushInput()

    def getKeyEvents(self, allowedKeys=(1, 2, 3, 4, 5, 6, 7), downOnly=True):
        """Return a list of keyEvents
        Each event has the following attributes:

            keyEvt.key is the button pressed (or released) (an int)
            keyEvt.rt [=float] is the time (in secs) since the rt
            clock was last reset (a float)
            keyEvt.direction is the direction the button was going
            ('up' or 'down')

        allowedKeys will limit the set of keys that are returned
        (WARNING: info about other keys is discarded)
        downOnly limits the function to report only the downward
        stroke of the key
        """
        # get the raw string
        nToGet = self.port.inWaiting()
        # self.buffer += self.port.read(nToGet)  # extend our own buffer (then
        # remove the bits we use)
        # extend our own buffer (then remove the bits we use)
        inputStr = self.port.read(nToGet)
        keys = []

        # loop through messages for keys
        nKeys = inputStr.count('k')  # find the "k"s
        for keyN in range(nKeys):
            start = inputStr.find('k')  # find the next key
            stop = start + 6
            # check we have that many characters(in case we read the buffer
            # partway through output)
            if len(inputStr) < stop:
                inputStr += self.port.read(stop - len(inputStr))
            keyString = inputStr[start:stop]
            keyEvt = self.KeyEvent(XID=keyString)
            if keyEvt.key not in allowedKeys:
                continue  # ignore this keyEvt and move on
            if (downOnly == True and keyEvt.direction == 'up'):
                continue  # ignore this keyEvt and move on

            # we found a valid keyEvt
            keys.append(keyEvt)
            # remove the (1st occurrence of) string from the buffer
            inputStr = inputStr.replace(keyString, '', 1)

        return keys

    def readMessage(self):
        """Read and return an unformatted string from the device
        (and delete this from the buffer)
        """
        nToGet = self.port.inWaiting()
        return self.port.read(nToGet)

    def measureRoundTrip(self):
        # round trip
        self.sendMessage(b'e4')  # start round trip
        # wait for 'X'
        while True:
            if self.readMessage() == 'X':
                break
        self.sendMessage(b'X')  # send it back

        # wait for final time info
        msgBack = ''
        while len(msgBack) == 0:
            msgBack = self.readMessage()
        tStr = msgBack[2:]
        t = struct.unpack('H', tStr)[0]  # 2 bytes (an unsigned short)
        return t

    def waitKeyEvents(self, allowedKeys=(1, 2, 3, 4, 5, 6, 7), downOnly=True):
        """Like getKeyEvents, but waits until a key is pressed
        """
        noKeyYet = True
        while noKeyYet:
            keys = self.getKeyEvents(
                allowedKeys=allowedKeys, downOnly=downOnly)
            if len(keys) > 0:
                noKeyYet = False
        return keys

    def resetTrialTimer(self):
        self.sendMessage(b'e5')

    def resetBaseTimer(self):
        self.sendMessage(b'e1')

    def getBaseTimer(self):
        """Retrieve the current time on the base timer
        """
        self.sendMessage(b'e3')
        # core.wait(0.05)
        localTimer = core.Clock()

        msg = self.readMessage()
        ii = msg.find('e3')
        tStr = msg[ii + 2:ii + 6]  # 4 bytes (an int) of time info
        t = struct.unpack('I', tStr)[0]
        return t

    def getInfo(self):
        """Get the name of this device
        """
        self.sendMessage(b'_d1')
        core.wait(0.1)
        return self.readMessage()