1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
##
# Copyright (c) 2013 Yury Selivanov
# License: Apache 2.0
##
"""greenio package allows to compose greenlets and asyncio coroutines."""
__all__ = ['task', 'yield_from']
import greenlet
import sys
try:
import asyncio
except ImportError:
asyncio = None
try:
import trollius
except ImportError:
trollius = None
if asyncio is None:
raise
if asyncio is None:
asyncio = trollius
def _create_task(coro, loop):
if loop is None:
loop = asyncio.get_event_loop()
if hasattr(loop, 'create_task'):
return loop.create_task(coro)
else:
return GreenTask(coro, loop=loop)
if trollius is not None:
def _async(future, loop):
# trollius iscoroutine() accepts trollius and asyncio coroutine
# objects
if trollius.iscoroutine(future):
return _create_task(future, loop)
else:
return future
else:
def _async(future, loop):
if asyncio.iscoroutine(future):
return _create_task(future, loop)
else:
return future
_FUTURE_CLASSES = (asyncio.Future,)
if trollius is not None and trollius is not asyncio:
_FUTURE_CLASSES += (trollius.Future,)
import sys
class _LoopGreenlet(greenlet.greenlet):
"""Main greenlet (analog to main thread) for the event-loop.
It's a policy task to provide event-loop implementation with
its "run_*" methods executed in _LoopGreenlet context"""
class _TaskGreenlet(greenlet.greenlet):
"""Each task (and its subsequent coroutines) decorated with
``@greenio.task`` is executed in this greenlet"""
class _GreenTaskMixin(object):
def __init__(self, *args, **kwargs):
self._greenlet = None
super(_GreenTaskMixin, self).__init__(*args, **kwargs)
def _step(self, value=None, exc=None):
if self._greenlet is None:
# Means that the task is not currently in a suspended greenlet
# waiting for results for "yield_from"
ovr = super(_GreenTaskMixin, self)._step
self._greenlet = _TaskGreenlet(ovr)
# Store a reference to the current task for "yield_from"
self._greenlet.task = self
# Now invoke overloaded "Task._step" in "_TaskGreenlet"
result = self._greenlet.switch(value, exc)
# If "result" is "_YIELDED" it means that the "yield_from"
# method was called
if result is not _YIELDED:
# And if not - then task jumped out of greenlet without
# calling "yield_from"
self._greenlet.task = None
self._greenlet = None
else:
self.__class__._current_tasks.pop(self._loop)
else:
# The task is in the greenlet, that means that we have a result
# for the "yield_from"
self.__class__._current_tasks[self._loop] = self
if exc is not None:
if hasattr(exc, '__traceback__'):
tb = exc.__traceback__
else:
tb = sys.exc_info()[2]
result = self._greenlet.throw(
type(exc), exc, tb)
else:
result = self._greenlet.switch(value)
# Again, if "result" is "_YIELDED" then we just called "yield_from"
# again
if result is not _YIELDED:
self._greenlet.task = None
self._greenlet = None
else:
self.__class__._current_tasks.pop(self._loop)
class _GreenLoopMixin(object):
def _green_run(self, method, args, kwargs):
return _LoopGreenlet(method).switch(*args, **kwargs)
def run_until_complete(self, *args, **kwargs):
ovr = super(_GreenLoopMixin, self).run_until_complete
return self._green_run(ovr, args, kwargs)
def run_forever(self, *args, **kwargs):
ovr = super(_GreenLoopMixin, self).run_forever
return self._green_run(ovr, args, kwargs)
class GreenTask(_GreenTaskMixin, asyncio.Task):
pass
class GreenUnixSelectorLoop(_GreenLoopMixin, asyncio.SelectorEventLoop):
def create_task(self, coro):
return GreenTask(coro)
class GreenEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return GreenUnixSelectorLoop()
if trollius is not None:
if trollius is not asyncio:
class GreenTrolliusTask(_GreenTaskMixin, trollius.Task):
pass
class GreenTrolliusUnixSelectorLoop(_GreenLoopMixin,
trollius.SelectorEventLoop):
def create_task(self, coro):
return GreenTrolliusTask(coro)
class GreenTrolliusEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return GreenTrolliusUnixSelectorLoop()
else:
GreenTrolliusTask = GreenTask
GreenTrolliusUnixSelectorLoop = GreenUnixSelectorLoop
GreenTrolliusEventLoopPolicy = GreenEventLoopPolicy
def yield_from(future, loop=None):
"""A function to use instead of ``yield from`` statement."""
future = _async(future, loop)
gl = greenlet.getcurrent()
if __debug__:
if not isinstance(gl.parent, _LoopGreenlet):
raise RuntimeError(
'"greenio.yield_from" requires GreenEventLoopPolicy '
'or compatible')
# or something went horribly wrong...
if not isinstance(gl, _TaskGreenlet):
raise RuntimeError(
'"greenio.yield_from" was supposed to be called from a '
'"greenio.task" or a subsequent coroutine')
# ...ditto
task = gl.task
if not isinstance(future, _FUTURE_CLASSES):
raise RuntimeError(
'greenlet.yield_from was supposed to receive only Futures, '
'got {!r} in task {!r}'.format(future, task))
# "_wakeup" will call the "_step" method (which we overloaded in
# GreenTask, and therefore wakeup the awaiting greenlet)
future.add_done_callback(task._wakeup)
task._fut_waiter = future
# task cancellation has been delayed.
if task._must_cancel:
if task._fut_waiter.cancel():
task._must_cancel = False
# Jump out of the current task greenlet (we'll return to GreenTask._step)
return gl.parent.switch(_YIELDED)
def task(func, loop=None):
"""A decorator, allows use of ``yield_from`` in the decorated or
subsequent coroutines."""
coro_func = asyncio.coroutine(func)
def task_wrapper(*args, **kwds):
coro_obj = coro_func(*args, **kwds)
return _create_task(coro_obj, loop)
return task_wrapper
class _YIELDED(object):
"""Marker, don't use it"""
|