1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
|
from __future__ import print_function
import sys
import time
import epics
import pvnames
pvlist = (
pvnames.str_pv,
pvnames.int_pv,
pvnames.float_pv,
pvnames.enum_pv,
pvnames.char_arr_pv,
pvnames.long_pv,
pvnames.long_arr_pv,
pvnames.double_pv,
pvnames.double_arr_pv,
pvnames.string_arr_pv,
)
def RunTest(pvlist, use_preempt=True, maxlen=16384,
use_numpy=True, use_time=False, use_ctrl=False):
msg= ">>>Run Test: %i pvs, numpy=%s, time=%s, ctrl=%s, preempt=%s"
print( msg % (len(pvlist), use_numpy, use_time, use_ctrl, use_preempt))
epics.ca.HAS_NUMPY = epics.ca.HAS_NUMPY and use_numpy
epics.ca.PREEMPTIVE_CALLBACK = use_preempt
epics.ca.AUTOMONITOR_MAXLENGTH = maxlen
chids= []
epics.ca.initialize_libca()
def onConnect(pvname=None, **kw):
print(' on Connect %s %s' % (pvname, repr(kw)))
def onChanges(chid=None, value=None, **kw):
print(' on Change chid=%i value=%s' % (int(chid), repr(value)))
for pvname in pvlist:
chid = epics.ca.create_channel(pvname, callback=onConnect)
epics.ca.connect_channel(chid)
eventID = epics.ca.create_subscription(chid, callback=onChanges)
chids.append((chid, eventID))
epics.poll(evt=0.025, iot=5.0)
epics.poll(evt=0.025, iot=10.0)
time.sleep(0.05)
for (chid, eventID) in chids:
print('=== %s chid=%s' % (epics.ca.name(chid), repr(chid)))
time.sleep(0.005)
ntype = epics.ca.promote_type(chid, use_ctrl=use_ctrl,
use_time=use_time)
val = epics.ca.get(chid, ftype=ntype)
cval = epics.ca.get(chid, as_string=True)
if epics.ca.element_count(chid) > 10:
val = val[:10]
time.sleep(0.005)
print("%i %s %s" % (ntype, epics.dbr.Name(ntype).lower(), cval))
time.sleep(0.5)
print('----- finalizing CA')
epics.ca.finalize_libca()
time.sleep(0.05)
for use_preempt in (True, False):
for use_numpy in (True, False):
for use_time, use_ctrl in ((False, False),
(True, False),
(False, True),
):
print("==== NUMPY/TIME/CTRL ", use_numpy, use_time, use_ctrl)
RunTest(pvlist,
use_preempt=use_preempt,
use_numpy=use_numpy,
use_time=use_time,
use_ctrl=use_ctrl)
# sys.exit()
|