File: daqmx.py

package info (click to toggle)
python-pymeasure 0.14.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 8,788 kB
  • sloc: python: 47,201; makefile: 155
file content (171 lines) | stat: -rw-r--r-- 7,448 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#
# This file is part of the PyMeasure package.
#
# Copyright (c) 2013-2024 PyMeasure Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

# Most of this code originally from:
# http://www.scipy.org/Cookbook/Data_Acquisition_with_NIDAQmx

import logging
import ctypes
import numpy as np
from sys import platform

log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())

try:
    if platform == "win32":
        nidaq = ctypes.windll.nicaiu
except OSError as err:
    log.info('Failed loading the NI-DAQmx library. '
             + 'Check the NI-DAQmx documentation on how to '
             + 'install this external dependency. '
             + f'OSError: {err}')
    raise

# Data Types
int32 = ctypes.c_long
uInt32 = ctypes.c_ulong
uInt64 = ctypes.c_ulonglong
float64 = ctypes.c_double
TaskHandle = uInt32
# Constants
DAQmx_Val_Cfg_Default = int32(-1)
DAQmx_Val_Volts = 10348
DAQmx_Val_Rising = 10280
DAQmx_Val_FiniteSamps = 10178
DAQmx_Val_GroupByChannel = 1


class DAQmx:
    """Instrument object for interfacing with NI-DAQmx devices."""

    def __init__(self, name, *args, **kwargs):
        super().__init__()
        self.resourceName = name  # NOTE: Device number, e.g. Dev1 or PXI1Slot2
        self.numChannels = 0
        self.numSamples = 0
        self.dataBuffer = 0
        self.taskHandleAI = TaskHandle(0)
        self.taskHandleAO = TaskHandle(0)
        self.terminated = False

    def setup_analog_voltage_in(self, channelList, numSamples, sampleRate=10000, scale=3.0):
        resourceString = ""
        for num, channel in enumerate(channelList):
            if num > 0:
                resourceString += ", "  # Add a comma before entries 2 and so on
            resourceString += self.resourceName + "/ai" + str(num)
        self.numChannels = len(channelList)
        self.numSamples = numSamples
        self.taskHandleAI = TaskHandle(0)
        self.dataBuffer = np.zeros((self.numSamples, self.numChannels), dtype=np.float64)
        self.CHK(nidaq.DAQmxCreateTask("", ctypes.byref(self.taskHandleAI)))
        self.CHK(nidaq.DAQmxCreateAIVoltageChan(self.taskHandleAI, resourceString, "",
                                                DAQmx_Val_Cfg_Default,
                                                float64(-scale), float64(scale),
                                                DAQmx_Val_Volts, None))
        self.CHK(nidaq.DAQmxCfgSampClkTiming(self.taskHandleAI, "", float64(sampleRate),
                                             DAQmx_Val_Rising, DAQmx_Val_FiniteSamps,
                                             uInt64(self.numSamples)))

    def setup_analog_voltage_out(self, channel=0):
        resourceString = self.resourceName + "/ao" + str(channel)
        self.taskHandleAO = TaskHandle(0)
        self.CHK(nidaq.DAQmxCreateTask("", ctypes.byref(self.taskHandleAO)))
        self.CHK(nidaq.DAQmxCreateAOVoltageChan(self.taskHandleAO,
                                                resourceString, "",
                                                float64(-10.0), float64(10.0),
                                                DAQmx_Val_Volts, None))

    def setup_analog_voltage_out_multiple_channels(self, channelList):
        resourceString = ""
        for num, channel in enumerate(channelList):
            if num > 0:
                resourceString += ", "  # Add a comma before entries 2 and so on
            resourceString += self.resourceName + "/ao" + str(num)
        self.taskHandleAO = TaskHandle(0)
        self.CHK(nidaq.DAQmxCreateTask("", ctypes.byref(self.taskHandleAO)))
        self.CHK(nidaq.DAQmxCreateAOVoltageChan(self.taskHandleAO,
                                                resourceString, "",
                                                float64(-10.0), float64(10.0),
                                                DAQmx_Val_Volts, None))

    def write_analog_voltage(self, value):
        timeout = -1.0
        self.CHK(nidaq.DAQmxWriteAnalogScalarF64(self.taskHandleAO,
                                                 1,  # Autostart
                                                 float64(timeout),
                                                 float64(value),
                                                 None))

    def write_analog_voltage_multiple_channels(self, values):
        timeout = -1.0
        self.CHK(nidaq.DAQmxWriteAnalogF64(self.taskHandleAO,
                                           1,  # Samples per channel
                                           1,  # Autostart
                                           float64(timeout),
                                           DAQmx_Val_GroupByChannel,
                                           (np.array(values)).ctypes.data,
                                           None, None))

    def acquire(self):
        read = int32()
        self.CHK(nidaq.DAQmxReadAnalogF64(self.taskHandleAI, self.numSamples, float64(10.0),
                                          DAQmx_Val_GroupByChannel, self.dataBuffer.ctypes.data,
                                          self.numChannels * self.numSamples, ctypes.byref(read),
                                          None))
        return self.dataBuffer.transpose()

    def acquire_average(self):
        if not self.terminated:
            avg = np.mean(self.acquire(), axis=1)
            return avg
        else:
            return np.zeros(3)

    def stop(self):
        if self.taskHandleAI.value != 0:
            nidaq.DAQmxStopTask(self.taskHandleAI)
            nidaq.DAQmxClearTask(self.taskHandleAI)
        if self.taskHandleAO.value != 0:
            nidaq.DAQmxStopTask(self.taskHandleAO)
            nidaq.DAQmxClearTask(self.taskHandleAO)

    def CHK(self, err):
        """a simple error checking routine"""
        if err < 0:
            buf_size = 100
            buf = ctypes.create_string_buffer('\000' * buf_size)
            nidaq.DAQmxGetErrorString(err, ctypes.byref(buf), buf_size)
            raise RuntimeError('nidaq call failed with error %d: %s' % (err, repr(buf.value)))
        if err > 0:
            buf_size = 100
            buf = ctypes.create_string_buffer('\000' * buf_size)
            nidaq.DAQmxGetErrorString(err, ctypes.byref(buf), buf_size)
            raise RuntimeError('nidaq generated warning %d: %s' % (err, repr(buf.value)))

    def shutdown(self):
        self.stop()
        self.terminated = True
        super().shutdown()