File: __init__.py

package info (click to toggle)
python-pyepics 3.4.1%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 2,080 kB
  • sloc: python: 11,184; makefile: 106; javascript: 104; sh: 1
file content (213 lines) | stat: -rw-r--r-- 7,208 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

__doc__ = """
   epics channel access python module

   version: %s
   Principal Authors:
      Matthew Newville <newville@cars.uchicago.edu> CARS, University of Chicago
      Angus Gratton <angus.gratton@anu.edu.au>, Australian National University

== License:

   Except where explicitly noted, this file and all files in this
   distribution are licensed under the Epics Open License See license.txt in
   the top-level directory of this distribution.

== Overview:
   Python Interface to the Epics Channel Access
   protocol of the Epics control system.

""" % (__version__)


import time
import sys
import threading
from . import ca
from . import dbr
from . import pv
from . import alarm
from . import device
from . import motor
from . import multiproc

PV    = pv.PV
Alarm = alarm.Alarm
Motor = motor.Motor
Device = device.Device
poll  = ca.poll

get_pv = pv.get_pv

CAProcess = multiproc.CAProcess
CAPool = multiproc.CAPool

# some constants
NO_ALARM = 0
MINOR_ALARM = 1
MAJOR_ALARM = 2
INVALID_ALARM = 3

_PVmonitors_ = {}

def caput(pvname, value, wait=False, timeout=60):
    """caput(pvname, value, wait=False, timeout=60)
    simple put to a pv's value.
       >>> caput('xx.VAL',3.0)

    to wait for pv to complete processing, use 'wait=True':
       >>> caput('xx.VAL',3.0,wait=True)
    """
    start_time = time.time()
    thispv = get_pv(pvname, timeout=timeout, connect=True)
    if thispv.connected:
        timeout -= (time.time() - start_time)
        return thispv.put(value, wait=wait, timeout=timeout)

def caget(pvname, as_string=False, count=None, as_numpy=True,
          use_monitor=False, timeout=5.0):
    """caget(pvname, as_string=False,count=None,as_numpy=True,
             use_monitor=False,timeout=5.0)
    simple get of a pv's value..
       >>> x = caget('xx.VAL')

    to get the character string representation (formatted double,
    enum string, etc):
       >>> x = caget('xx.VAL', as_string=True)

    to get a truncated amount of data from an array, you can specify
    the count with
       >>> x = caget('MyArray.VAL', count=1000)
    """
    start_time = time.time()
    thispv = get_pv(pvname, timeout=timeout, connect=True)
    if thispv.connected:
        if as_string:
            thispv.get_ctrlvars()
        timeout -= (time.time() - start_time)
        val = thispv.get(count=count, timeout=timeout,
                         use_monitor=use_monitor,
                         as_string=as_string,
                         as_numpy=as_numpy)
        poll()
        return val

def cainfo(pvname, print_out=True, timeout=5.0):
    """cainfo(pvname,print_out=True,timeout=5.0)

    return printable information about pv
       >>>cainfo('xx.VAL')

    will return a status report for the pv.

    If print_out=False, the status report will be printed,
    and not returned.
    """
    start_time = time.time()
    thispv = get_pv(pvname, timeout=timeout, connect=True)
    if thispv.connected:
        conn_time = time.time() - start_time
        thispv.get(timeout=timeout-conn_time)
        get_time = time.time() - start_time
        thispv.get_ctrlvars(timeout=timeout-get_time)
        if print_out:
            ca.write(thispv.info)
        else:
            return thispv.info

def camonitor_clear(pvname):
    """clear a monitor on a PV"""
    if pvname in _PVmonitors_:
        _PVmonitors_[pvname].remove_callback(index=-999)
        _PVmonitors_.pop(pvname)

def camonitor(pvname, writer=None, callback=None):
    """ camonitor(pvname, writer=None, callback=None)

    sets a monitor on a PV.
       >>>camonitor('xx.VAL')

    This will write a message with the latest value for that PV each
    time the value changes and when ca.poll() is called.

    To write the result to a file, provide the writer option a write method
    to an open file or some other method that accepts a string.

    To completely control where the output goes, provide a callback method
    and you can do whatever you'd like with them.

    Your callback will be sent keyword arguments for pvname, value, and
    char_value Important: use **kwd!!
    """

    if writer is None:
        writer = ca.write
    if callback is None:
        def callback(pvname=None, value=None, char_value=None, **kwds):
            "generic monitor callback"
            if char_value is None:
                char_value = repr(value)
            writer("%.32s %s %s" % (pvname, pv.fmt_time(), char_value))

    thispv = get_pv(pvname, connect=True)
    if thispv.connected:
        thispv.get()
        thispv.add_callback(callback, index=-999, with_ctrlvars=True)
        _PVmonitors_[pvname] = thispv

def caget_many(pvlist, as_string=False, count=None, as_numpy=True, timeout=5.0):
    """get values for a list of PVs
    This does not maintain PV objects, and works as fast
    as possible to fetch many values.
    """
    chids, out = [], []
    for name in pvlist: chids.append(ca.create_channel(name,
                                                       auto_cb=False,
                                                       connect=False))
    for chid in chids: ca.connect_channel(chid)
    for chid in chids: ca.get(chid, count=count, as_string=as_string, as_numpy=as_numpy, wait=False)
    for chid in chids: out.append(ca.get_complete(chid,
                                                  count=count,
                                                  as_string=as_string,
                                                  as_numpy=as_numpy,
                                                  timeout=timeout))
    return out

def caput_many(pvlist, values, wait=False, connection_timeout=None, put_timeout=60):
    """put values to a list of PVs, as fast as possible
    This does not maintain the PV objects it makes.  If
    wait is 'each', *each* put operation will block until
    it is complete or until the put_timeout duration expires.
    If wait is 'all', this method will block until *all*
    put operations are complete, or until the put_timeout
    duration expires.
    Note that the behavior of 'wait' only applies to the
    put timeout, not the connection timeout.
    Returns a list of integers for each PV, 1 if the put
    was successful, or a negative number if the timeout
    was exceeded.
    """
    if len(pvlist) != len(values):
        raise ValueError("List of PV names must be equal to list of values.")
    out = []
    pvs = [PV(name, auto_monitor=False, connection_timeout=connection_timeout) for name in pvlist]
    conns = [p.connected for p in pvs]
    wait_all = (wait == 'all')
    wait_each = (wait == 'each')
    for p, v in zip(pvs, values):
        out.append(p.put(v, wait=wait_each, timeout=put_timeout, use_complete=wait_all))
    if wait_all:
        start_time = time.time()
        while not all([(p.connected and p.put_complete) for p in pvs]):
            ca.poll()
            elapsed_time = time.time() - start_time
            if elapsed_time > put_timeout:
                break
        return [1 if (p.connected and p.put_complete) else -1 for p in pvs]
    else:
        return [o if o == 1 else -1 for o in out]