File: device.py

package info (click to toggle)
python-pyepics 3.5.7%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,336 kB
  • sloc: python: 10,539; makefile: 112; javascript: 104; sh: 53
file content (329 lines) | stat: -rw-r--r-- 11,687 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#!/usr/bin/env python
#  M Newville <newville@cars.uchicago.edu>
#  The University of Chicago, 2010
#  Epics Open License
"""
basic device object defined
"""
import time
from .utils import IOENCODING
from .ca import poll
from .pv  import get_pv

class Device:
    """A simple collection of related PVs, sharing a common prefix
    string for their names, but having many 'attributes'.

    Many groups of PVs will have names made up of
         Prefix+Delimiter+Attribute
    with common Prefix and Delimiter, but a range of Attribute names.
    Many Epics Records follow this model, but a Device is only about
    PV names, and so is not exactly a mapping to an Epics Record.

    This class allows a collection of PVs to be represented simply.

      >>> dev = epics.Device('XX:m1', delim='.')
      >>> dev.put('OFF',0 )
      >>> dev.put('VAL', 0.25)
      >>> dev.get('RBV')
      >>> print(dev.FOFF)
      >>> print(dev.get('FOFF', as_string=True))

    This will put a 0 to XX:m1.OFF, a 0.25 to XX:m1.VAL, and then
    get XX:m1.RBV and XX.m1.FOFF.

    Note access to the underlying PVs can either be with get()/put()
    methods or with attributes derived from the Attribute (Suffix) for
    that PV.  Thus
      >>> print(dev.FOFF)
      >>> print(dev.get('FOFF'))

    are equivalent, as are:
      >>> dev.VAL = 1
      >>> dev.put('VAL', 1)

    The methods do provide more options.  For example, to get the PV
    value as a string,
       Device.get(Attribute, as_string=True)
    must be used.   To put-with-wait, use
       Device.put(Attribute, value, wait=True)

    The list of attributes can be pre-loaded at initialization time.


    The attribute PVs are built as needed and held in an internal
    buffer (self._pvs).  This class is kept intentionally simple
    so that it may be subclassed.

    To pre-load attribute names on initialization, provide a
    list or tuple of attributes:

      >>> struck = epics.Device('13IDC:str:',
      ...                       attrs=('ChannelAdvance',
      ...                              'EraseStart','StopAll'))
      >>> print(struck.PV('ChannelAdvance').char_value)
      'External'

    The prefix is optional, and when left off, this class can
    be used as an arbitrary container of PVs, or to turn
    any subclass into an epics Device:

      >>> class MyClass(epics.Device):
      ...     def __init__(self,**kw):
      ...         epics.Device.__init__() # no Prefix!!
      ...
      >>> x = MyClass()
      >>> pv_m1 = x.PV('13IDC:m1.VAL')
      >>> x.put('13IDC:m3.VAL', 2)
      >>> print(x.PV('13IDC:m3.DIR').get(as_string=True))

    Attribute aliases can also be used to be make the device
    more user-friendly:

      >>> dev = epics.Device('IOC:m1', delim='.',
      ...                    aliases={'readback': 'RBV'})
      >>> print('rbv is', dev.RBV)  # IOC:m1.RBV
      >>> print('readback is', dev.readback)  # also IOC:m1.RBV

    If you encounter issues with introspection (with IPython,
    for example), and your device has a well-defined list of
    attributes, consider clearing the `mutable` flag. Attributes
    not already in the list will then be assumed to be invalid.
    AttributeError is raised quickly without using channel access:

      >>> dev = epics.Device('IOC:m1', delim='.', mutable=False,
      ...                    aliases={'readback': 'RBV'})
      >>> print(dev.foobar)
      Traceback (most recent call last):
          ...
      AttributeError: Device has no attribute foobar
    """

    _prefix = None
    _delim = ''
    _pvs = {}
    _init = False
    _aliases = {}
    _mutable = True
    _nonpvs = ('_prefix', '_pvs', '_delim', '_init', '_aliases',
               '_mutable', '_nonpvs')
    def __init__(self, prefix='', attrs=None,
                 nonpvs=None, delim='', timeout=None,
                 mutable=True, aliases=None, with_poll=True):

        self._nonpvs = list(self._nonpvs)
        self._delim = delim
        self._prefix = prefix + delim
        self._pvs = {}
        self._mutable = mutable
        if aliases is None:
             aliases = {}
        self._aliases = aliases
        if nonpvs is not None:
            for npv in nonpvs:
                if npv not in self._nonpvs:
                    self._nonpvs.append(npv)

        if attrs is not None:
            for attr in attrs:
                self.PV(attr, connect=False, timeout=timeout)

        if aliases:
            for attr in aliases.values():
                if attrs is None or attr not in attrs:
                    self.PV(attr, connect=False, timeout=timeout)

        if with_poll:
            poll()
        self._init = True

    def PV(self, attr, connect=True, **kw):
        """return epics.PV for a device attribute"""
        if attr in self._aliases:
            attr = self._aliases[attr]

        if attr not in self._pvs:
            pvname = attr
            if self._prefix is not None:
                pvname = f"{self._prefix}{attr}"
            self._pvs[attr] = get_pv(pvname, **kw)
        if connect and not self._pvs[attr].connected:
            self._pvs[attr].wait_for_connection()
        return self._pvs[attr]

    def add_pv(self, pvname, attr=None, **kw):
        """add a PV with an optional attribute name that may not exactly
        correspond to the mapping of Attribute -> Prefix + Delim + Attribute
        That is, with a device defined as
        >>> dev = Device('XXX', delim='.')

        getting the VAL attribute
        >>> dev.get('VAL')   # or dev.VAL

        becomes   'caget(XXX.VAL)'.  With add_pv(), one can add a
        non-conforming PV to the collection:
        >>> dev.add_pv('XXX_status.VAL', attr='status')

        and then use as
        >>> dev.get('status')  # or dev.status

        If attr is not specified, the full pvname will be used.
        """
        if attr is None:
            attr = pvname
        self._pvs[attr] = get_pv(pvname, **kw)
        return self._pvs[attr]

    def put(self, attr, value, wait=False, use_complete=False, timeout=10):
        """put an attribute value,
        optionally wait for completion or
        up to a supplied timeout value"""
        thispv = self.PV(attr)
        thispv.wait_for_connection()
        return thispv.put(value, wait=wait, use_complete=use_complete,
                          timeout=timeout)

    def get(self, attr, as_string=False, count=None, timeout=None):
        """get an attribute value,
        option as_string returns a string representation"""
        return self.PV(attr).get(as_string=as_string, count=count,
                                 timeout=timeout)

    def save_state(self):
        """return a dictionary of the values of all
        current attributes"""
        out = {}
        for key in self._pvs:
            out[key] = self._pvs[key].get()
            if  (self._pvs[key].count > 1 and
                 'char' == self._pvs[key].type):
                out[key] = self._pvs[key].get(as_string=True)
        return out

    def restore_state(self, state):
        """restore a dictionary of the values, as saved from save_state"""
        for key, val in state.items():
            if key in self._pvs and  'write' in self._pvs[key].access:
                self._pvs[key].put(val)

    def write_state(self, fname, state=None):
        """write save state  to external file.
        If state is not provided, the current state is used

        Note that this only writes data for PVs with write-access,
        and count=1 (except CHAR)"""
        if state is None:
            state = self.save_state()
        parts = [f"{self.__class__.__name__}",
                 f"prefix='{self._prefix}'",
                 f"time={time.ctime()}"]
        out = [f"#Device Saved State for {','.join(parts)}"]
        for key in sorted(state.keys()):
            if (key in self._pvs and
                'write' in self._pvs[key].access and
                (1 == self._pvs[key].count or
                 'char' == self._pvs[key].type)):
                out.append(f"{key} {state[key]}")
        out.append('')
        fout = open(fname, 'w', encoding=IOENCODING)
        fout.writelines('\n'.join(out))
        fout.close()


    def read_state(self, fname, restore=False):
        """read state from file, optionally restore it"""
        finp = open(fname, 'r', encoding=IOENCODING)
        textlines = finp.readlines()
        finp.close()
        state = {}
        for line in textlines:
            if line.startswith('#'):
                continue
            key, strval =  line[:-1].split(' ', 1)
            if key in self._pvs:
                dtype = self._pvs[key].type
                count = self._pvs[key].count
                val = strval
                if dtype in ('double', 'float'):
                    val = float(val)
                elif dtype in ('int', 'long', 'short', 'enum'):
                    val = int(val)
                state[key] = val
        if restore:
            self.restore_state(state)
        return state


    def get_all(self):
        """return a dictionary of the values of all
        current attributes"""
        return self.save_state()

    def add_callback(self, attr, callback, **kws):
        """add a callback function to an attribute PV,
        so that the callback function will be run when
        the attribute's value changes"""
        self.PV(attr).get()
        return self.PV(attr).add_callback(callback, **kws)

    def remove_callbacks(self, attr, index=None):
        """remove a callback function to an attribute PV"""
        self.PV(attr).remove_callback(index=index)


    def __getattr__(self, attr):
        if attr in self._aliases:
            attr = self._aliases[attr]

        if attr in self._pvs:
            return self.get(attr)
        elif attr in self.__dict__:
            return self.__dict__[attr]
        elif self._init and self._mutable and not attr.startswith('__'):
            pv = self.PV(attr, connect=True)
            if pv.connected:
                return pv.get()

        raise AttributeError(f'{self.__class__.__name__} has no attribute {attr}')

    def __setattr__(self, attr, val):
        if attr in self._aliases:
            attr = self._aliases[attr]

        if attr in self._nonpvs:
            self.__dict__[attr] = val
        elif attr in self._pvs:
            self.put(attr, val)
        elif self._init and self._mutable and not attr.startswith('__'):
            try:
                self.PV(attr)
                self.put(attr, val)
            except:
                raise AttributeError(f'{self.__class__.__name__} has no attribute {attr}')

        elif attr in self.__dict__:
            self.__dict__[attr] = val
        elif self._init:
            raise AttributeError(f'{self.__class__.__name__} has no attribute {attr}')

    def __dir__(self):
        # there's no cleaner method to do this until Python 3.3
        all_attrs = set(list(self._aliases.keys()) + list(self._pvs.keys()) +
                        list(self._nonpvs) +
                        list(self.__dict__.keys()) + dir(Device))
        return list(sorted(all_attrs))

    def __repr__(self):
        "string representation"
        pref = self._prefix
        if pref.endswith('.'):
            pref = pref[:-1]
        return f"<Device '{pref}', len(self._pvs) attributes>"


    def pv_property(attr, as_string=False, wait=False, timeout=10.0):
        return property(lambda self:     \
                        self.get(attr, as_string=as_string),
                        lambda self,val: \
                        self.put(attr, val, wait=wait, timeout=timeout),
                        None, None)