1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
|
#!/usr/bin/python
# test of simplest device
from epics import PV, get_pv, caget, camonitor, camonitor_clear
import os
import psutil
import time
import pvnames
mypv = pvnames.updating_pv1
def test_connect_disconnect():
pv = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv.wait_for_connection()
# check that PV is connected
assert pv.connected is True
# check that data is received
value = pv.get()
assert value is not None
pv.disconnect()
# check that PV is disconnected
assert pv.connected is False
# check that no data is received after disconnect
value = pv.get()
assert value is None
def test_reconnect():
# connect and disconnect
pv = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv.wait_for_connection()
pv.disconnect()
# try to reconnect to the same PV
connected = pv.reconnect()
# check that PV is connected
assert connected is True
# check that data is received
value = pv.get()
assert value is not None
def test_with_two_PVs():
# create 2 PV objects connecting to the same PV signal
pv1 = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv2 = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv1.wait_for_connection()
pv2.wait_for_connection()
# check that both PVs are connected
assert pv1.connected is True
assert pv2.connected is True
# check that data is received
assert pv1.get() is not None
assert pv2.get() is not None
# disconnect 1 PV
pv1.disconnect()
# check that the first PV is disconnected and doesn't receive data
assert pv1.connected is False
assert pv1.get() is None
# check that the other PV is connected and still receives data
assert pv2.connected is True
assert pv2.get() is not None
def test_with_PV_and_getPV():
# create 2 PV objects connecting to the same PV signal, one using PV class and the other one using get_pv()
pv1 = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv2 = get_pv(mypv)
pv1.wait_for_connection()
pv2.wait_for_connection()
# check that both PVs are connected
assert pv1.connected is True
assert pv2.connected is True
# check that data is received
assert pv1.get() is not None
assert pv2.get() is not None
# disconnect 1 PV
pv1.disconnect()
time.sleep(1)
# check that the first PV is disconnected and doesn't receive data
assert pv1.connected is False
assert pv1.get() is None
# check that the other PV is connected and still receives data
assert pv2.connected is True
assert pv2.get() is not None
def test_with_getPV():
# create 2 PV objects connecting to the same PV signal using get_pv()
pv1 = get_pv(mypv)
pv2 = get_pv(mypv)
pv1.wait_for_connection()
pv2.wait_for_connection()
# check that both PVs are connected
assert pv1.connected is True
assert pv2.connected is True
# check that data is received
assert pv1.get() is not None
assert pv2.get() is not None
# disconnect 1 PV
pv1.disconnect()
time.sleep(1)
# check that the first PV is disconnected and doesn't receive data
assert pv1.connected is False
assert pv1.get() is None
# check that the other PV is also disconnected and doesn't receive data either
assert pv2.connected is False
assert pv2.get() is None
def test_with_caget():
pv = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv.wait_for_connection()
# check that the PV is connected and data is received
assert pv.connected is True
assert pv.get() is not None
# use caget to get data from the same PV
assert caget(mypv) is not None
# disconnect PV object
pv.disconnect()
# check that the PV is disconnected and doesn't receive data
assert pv.connected is False
assert pv.get() is None
# check that you can still use caget to get data from the same PV
assert caget(mypv) is not None
def test_with_caget_nomonitor():
pv = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv.wait_for_connection()
# check that the PV is connected and data is received
assert pv.connected is True
assert pv.get() is not None
# use caget to get data from the same PV
assert caget(mypv, use_monitor=False) is not None
# disconnect PV object
pv.disconnect()
# check that the PV is disconnected and doesn't receive data
assert pv.connected is False
assert pv.get() is None
# check that you can still use caget to get data from the same PV
assert caget(mypv, use_monitor=False) is not None
def test_with_camonitor():
pv = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv.wait_for_connection()
# check that the PV is connected and data is received
assert pv.connected is True
assert pv.get() is not None
# use camonitor
received = {'flag': False}
def callback(**args):
received['flag'] = True
camonitor(mypv, callback=callback)
time.sleep(1)
# check that the monitor receives data
assert received['flag'] is True
# disconnect PV object
pv.disconnect()
time.sleep(1)
# check that the PV is disconnected and doesn't receive data
assert pv.connected is False
assert pv.get() is None
# reset the flag to check that new data is received by camonitor
received['flag'] = False
time.sleep(1)
assert received['flag'] is True
# clear the monitor
camonitor_clear(mypv)
time.sleep(1)
# reset the flag to check that no new data is received by camonitor
received['flag'] = False
time.sleep(1)
assert received['flag'] is False
def test_memleak_disconnect():
# try to connect multiple times to the same PV
mem = []
for i in range(int(2)):
for j in range(int(1000)):
pv = PV(mypv, auto_monitor=True, callback=lambda **args: ...)
pv.disconnect()
process = psutil.Process(os.getpid())
mem.append(process.memory_info().rss)
# check used memory by the process didn't increase by more than 1%
assert mem[1]/mem[0] < 1.01
|