1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
from __future__ import print_function
import epics
import time
import multiprocessing as mp
import threading
import pvnames
PVN1 = pvnames.double_pv # 'Py:ao2'
PVN2 = pvnames.double_pv2 # 'Py:ao3'
def subprocess(*args):
print('==subprocess==', args)
mypvs = [epics.get_pv(pvname) for pvname in args]
for i in range(10):
time.sleep(0.750)
out = [(p.pvname, p.get(as_string=True)) for p in mypvs]
out = ', '.join(["%s=%s" % o for o in out])
print('==sub (%d): %s' % (i, out))
def test_mpprocess():
def monitor(pvname=None, char_value=None, **kwargs):
print('--main:monitor %s=%s' % (pvname, char_value))
print('--main:')
pv1 = epics.get_pv(PVN1)
print('--main:init %s=%s' % (PVN1, pv1.get()))
pv1.add_callback(callback=monitor)
try:
proc1 = epics.CAProcess(target=subprocess,
args=(PVN1, PVN2))
proc1.start()
proc1.join()
except KeyboardInterrupt:
print('--main: killing subprocess')
proc1.terminate()
print('--main: subprocess complete')
time.sleep(0.5)
print('--main:final %s=%s' % (PVN1, pv1.get()))
|