1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import pytest
import sys, time
if sys.version_info < (3,):
import thread
else:
import _thread as thread
greenlet = pytest.importorskip('greenlet')
class TestThread:
def test_cannot_switch_to_main_of_different_thread(self):
mains = []
mains.append(greenlet.getcurrent())
lock = thread.allocate_lock()
lock.acquire()
got_exception = []
#
def run_thread():
main = greenlet.getcurrent()
assert main not in mains
mains.append(main)
try:
mains[0].switch()
except Exception as e:
got_exception.append(e)
lock.release()
#
thread.start_new_thread(run_thread, ())
lock.acquire()
assert isinstance(got_exception[0], greenlet.error)
def test_nonstarted_greenlet_is_still_attached_to_thread(self):
subs = []
lock = thread.allocate_lock()
lock.acquire()
#
def run_thread():
g = greenlet.greenlet(lambda *args: None)
subs.append(g)
lock.release()
time.sleep(1)
#
thread.start_new_thread(run_thread, ())
lock.acquire()
[g] = subs
pytest.raises(greenlet.error, g.switch)
def test_noninited_greenlet_is_still_attached_to_thread(self):
subs = []
lock = thread.allocate_lock()
lock.acquire()
#
def run_thread():
g = greenlet.greenlet.__new__(greenlet.greenlet)
subs.append(g)
lock.release()
time.sleep(1)
#
thread.start_new_thread(run_thread, ())
lock.acquire()
[g] = subs
pytest.raises(greenlet.error, g.switch)
g.__init__(lambda *args: None)
pytest.raises(greenlet.error, g.switch)
def test_noninited_greenlet_change_thread_via_parent(self):
subs = []
lock = thread.allocate_lock()
lock.acquire()
#
def run_thread():
g = greenlet.greenlet.__new__(greenlet.greenlet)
subs.append(g)
lock.release()
time.sleep(1)
#
thread.start_new_thread(run_thread, ())
lock.acquire()
[g] = subs
g.__init__(lambda *args: 44)
g.parent = greenlet.getcurrent()
x = g.switch()
assert x == 44
def test_nonstarted_greenlet_change_thread_via_parent(self):
subs = []
lock = thread.allocate_lock()
lock.acquire()
#
def run_thread():
g = greenlet.greenlet(lambda *args: 42)
subs.append(g)
lock.release()
time.sleep(1)
#
thread.start_new_thread(run_thread, ())
lock.acquire()
[g] = subs
g.parent = greenlet.getcurrent()
x = g.switch()
assert x == 42
def test_started_greenlet_cannot_change_thread_via_parent(self):
pytest.skip("not implemented on PyPy")
subs = []
lock = thread.allocate_lock()
lock.acquire()
#
def run_thread():
g_parent = greenlet.getcurrent()
g = greenlet.greenlet(lambda *args: g_parent.switch(42))
x = g.switch()
assert x == 42
subs.append(g)
lock.release()
time.sleep(1)
#
thread.start_new_thread(run_thread, ())
lock.acquire()
[g] = subs
with pytest.raises(ValueError) as e:
g.parent = greenlet.getcurrent()
assert str(e.value) == "parent cannot be on a different thread"
def test_finished_greenlet_cannot_change_thread_via_parent(self):
pytest.skip("not implemented on PyPy")
subs = []
lock = thread.allocate_lock()
lock.acquire()
#
def run_thread():
g = greenlet.greenlet(lambda *args: 42)
x = g.switch()
assert x == 42
subs.append(g)
lock.release()
time.sleep(1)
#
thread.start_new_thread(run_thread, ())
lock.acquire()
[g] = subs
with pytest.raises(ValueError) as e:
g.parent = greenlet.getcurrent()
assert str(e.value) == "parent cannot be on a different thread"
|