1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
|
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
__doc__ = """
epics channel access python module
version: %s
Principal Authors:
Matthew Newville <newville@cars.uchicago.edu> CARS, University of Chicago
Angus Gratton <angus.gratton@anu.edu.au>, Australian National University
== License:
Except where explicitly noted, this file and all files in this
distribution are licensed under the Epics Open License See license.txt in
the top-level directory of this distribution.
== Overview:
Python Interface to the Epics Channel Access
protocol of the Epics control system.
""" % (__version__)
import time
import sys
import threading
from . import ca
from . import dbr
from . import pv
from . import alarm
from . import device
from . import motor
from . import multiproc
PV = pv.PV
Alarm = alarm.Alarm
Motor = motor.Motor
Device = device.Device
poll = ca.poll
get_pv = pv.get_pv
CAProcess = multiproc.CAProcess
CAPool = multiproc.CAPool
# some constants
NO_ALARM = 0
MINOR_ALARM = 1
MAJOR_ALARM = 2
INVALID_ALARM = 3
_PVmonitors_ = {}
def caput(pvname, value, wait=False, timeout=60):
"""caput(pvname, value, wait=False, timeout=60)
simple put to a pv's value.
>>> caput('xx.VAL',3.0)
to wait for pv to complete processing, use 'wait=True':
>>> caput('xx.VAL',3.0,wait=True)
"""
start_time = time.time()
thispv = get_pv(pvname, timeout=timeout, connect=True)
if thispv.connected:
timeout -= (time.time() - start_time)
return thispv.put(value, wait=wait, timeout=timeout)
def caget(pvname, as_string=False, count=None, as_numpy=True,
use_monitor=False, timeout=5.0):
"""caget(pvname, as_string=False,count=None,as_numpy=True,
use_monitor=False,timeout=5.0)
simple get of a pv's value..
>>> x = caget('xx.VAL')
to get the character string representation (formatted double,
enum string, etc):
>>> x = caget('xx.VAL', as_string=True)
to get a truncated amount of data from an array, you can specify
the count with
>>> x = caget('MyArray.VAL', count=1000)
"""
start_time = time.time()
thispv = get_pv(pvname, timeout=timeout, connect=True)
if thispv.connected:
if as_string:
thispv.get_ctrlvars()
timeout -= (time.time() - start_time)
val = thispv.get(count=count, timeout=timeout,
use_monitor=use_monitor,
as_string=as_string,
as_numpy=as_numpy)
poll()
return val
def cainfo(pvname, print_out=True, timeout=5.0):
"""cainfo(pvname,print_out=True,timeout=5.0)
return printable information about pv
>>>cainfo('xx.VAL')
will return a status report for the pv.
If print_out=False, the status report will be printed,
and not returned.
"""
start_time = time.time()
thispv = get_pv(pvname, timeout=timeout, connect=True)
if thispv.connected:
conn_time = time.time() - start_time
thispv.get(timeout=timeout-conn_time)
get_time = time.time() - start_time
thispv.get_ctrlvars(timeout=timeout-get_time)
if print_out:
ca.write(thispv.info)
else:
return thispv.info
def camonitor_clear(pvname):
"""clear a monitor on a PV"""
if pvname in _PVmonitors_:
_PVmonitors_[pvname].remove_callback(index=-999)
_PVmonitors_.pop(pvname)
def camonitor(pvname, writer=None, callback=None):
""" camonitor(pvname, writer=None, callback=None)
sets a monitor on a PV.
>>>camonitor('xx.VAL')
This will write a message with the latest value for that PV each
time the value changes and when ca.poll() is called.
To write the result to a file, provide the writer option a write method
to an open file or some other method that accepts a string.
To completely control where the output goes, provide a callback method
and you can do whatever you'd like with them.
Your callback will be sent keyword arguments for pvname, value, and
char_value Important: use **kwd!!
"""
if writer is None:
writer = ca.write
if callback is None:
def callback(pvname=None, value=None, char_value=None, **kwds):
"generic monitor callback"
if char_value is None:
char_value = repr(value)
writer("%.32s %s %s" % (pvname, pv.fmt_time(), char_value))
thispv = get_pv(pvname, connect=True)
if thispv.connected:
thispv.get()
thispv.add_callback(callback, index=-999, with_ctrlvars=True)
_PVmonitors_[pvname] = thispv
def caget_many(pvlist, as_string=False, count=None, as_numpy=True, timeout=5.0):
"""get values for a list of PVs
This does not maintain PV objects, and works as fast
as possible to fetch many values.
"""
chids, out = [], []
for name in pvlist: chids.append(ca.create_channel(name,
auto_cb=False,
connect=False))
for chid in chids: ca.connect_channel(chid)
for chid in chids: ca.get(chid, count=count, as_string=as_string, as_numpy=as_numpy, wait=False)
for chid in chids: out.append(ca.get_complete(chid,
count=count,
as_string=as_string,
as_numpy=as_numpy,
timeout=timeout))
return out
def caput_many(pvlist, values, wait=False, connection_timeout=None, put_timeout=60):
"""put values to a list of PVs, as fast as possible
This does not maintain the PV objects it makes. If
wait is 'each', *each* put operation will block until
it is complete or until the put_timeout duration expires.
If wait is 'all', this method will block until *all*
put operations are complete, or until the put_timeout
duration expires.
Note that the behavior of 'wait' only applies to the
put timeout, not the connection timeout.
Returns a list of integers for each PV, 1 if the put
was successful, or a negative number if the timeout
was exceeded.
"""
if len(pvlist) != len(values):
raise ValueError("List of PV names must be equal to list of values.")
out = []
pvs = [PV(name, auto_monitor=False, connection_timeout=connection_timeout) for name in pvlist]
conns = [p.connected for p in pvs]
wait_all = (wait == 'all')
wait_each = (wait == 'each')
for p, v in zip(pvs, values):
out.append(p.put(v, wait=wait_each, timeout=put_timeout, use_complete=wait_all))
if wait_all:
start_time = time.time()
while not all([(p.connected and p.put_complete) for p in pvs]):
ca.poll()
elapsed_time = time.time() - start_time
if elapsed_time > put_timeout:
break
return [1 if (p.connected and p.put_complete) else -1 for p in pvs]
else:
return [o if o == 1 else -1 for o in out]
|