1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
from __future__ import print_function
def pre_listen(task_id, transport, attrs):
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "pre_listen":
raise RuntimeError("pre_listen called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def post_listen(task_id, transport, local_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# local_contact =", str(local_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "post_listen":
raise RuntimeError("post_listen called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def end_listen(task_id, transport, local_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# local_contact =", str(local_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "end_listen":
raise RuntimeError("post_listen called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def pre_accept(task_id, transport, local_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# local_contact =", str(local_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "pre_accept":
raise RuntimeError("pre_accept called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def post_accept(task_id, transport, local_contact, remote_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# local_contact =", str(local_contact))
print("# remote_contact =", str(remote_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "post_accept":
raise RuntimeError("post_accept called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def pre_connect(task_id, transport, remote_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# remote_contact =", str(remote_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "pre_connect":
raise RuntimeError("pre_connect called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def post_connect(task_id, transport, local_contact, remote_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# local_contact =", str(local_contact))
print("# remote_contact =", str(remote_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "post_connect":
raise RuntimeError("post_connect called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def pre_close(task_id, transport, local_contact, remote_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# local_contact =", str(local_contact))
print("# remote_contact =", str(remote_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "pre_close":
raise RuntimeError("pre_close called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
def post_close(task_id, transport, local_contact, remote_contact, attrs):
print("# task_id =", str(task_id))
print("# transport =", str(transport))
print("# local_contact =", str(local_contact))
print("# remote_contact =", str(remote_contact))
print("# attrs =", str(attrs))
res = None
for (scope, name, value) in attrs:
print("# (%s, %s, %s)" % (scope, name, value))
if scope == 'python':
if name == 'test_func' and value != "post_close":
raise RuntimeError("post_close called when %s expected" % value)
elif name == 'expected_result':
print("# evaluating", value)
ns = {}; exec(value, ns); res = ns['res']
print("# res =", res)
return res
|