from functools import wraps
import sys
import types
import greenlet
from tornado import concurrent, gen
from tornado.ioloop import IOLoop
class TimeoutError(Exception):
"""Exception raised by ``greenletyield`` in timeout."""
def greenletsleep(timeout):
gr = greenlet.getcurrent()
assert gr.parent is not None, \
"greenletsleep() can only be called from \
functions that have the @greenado.groutine \
decorator in the call stack."
if timeout <= 0:
raise ValueError("Invalid timeout value '%s'" % timeout)
io_loop = IOLoop.current()
done = [False]
def on_timeout():
done[0] = True
gr.switch()
io_loop.add_timeout(io_loop.time() + timeout, on_timeout)
while not done[0]:
gr.parent.switch()
def generator(f):
@wraps(f)
def wrapper(*args, **kwargs):
assert greenlet.getcurrent(
).parent is not None, "Functionscan only be \
called within @run_on_greenlet decorator in the call stack."
try:
result = f(*args, **kwargs)
except (gen.Return, StopIteration) as e:
result = getattr(e, 'value', None)
else:
if isinstance(result, types.GeneratorType):
try:
future = next(result)
while True:
try:
value = greenletyield(future)
except Exception:
result.throw(*sys.exc_info())
else:
future = result.send(value)
except (gen.Return, StopIteration) as e:
return getattr(e, 'value', None)
return result
return wrapper
def run_on_greenlet(f):
@wraps(f)
def wrapper(*args, **kwargs):
future = concurrent.TracebackFuture()
def greenlet_base():
try:
future.set_result(f(*args, **kwargs))
except:
future.set_exc_info(sys.exc_info())
gr = greenlet.greenlet(greenlet_base)
gr.switch()
return future
return wrapper
def greenletyield(future, timeout=None):
gr = greenlet.getcurrent()
assert gr.parent is not None, "greenletyield() can only \
be called from functions that have the @run_on_greenlet \
decorator in the call stack."
if not future.done():
io_loop = IOLoop.current()
timeout_handle = None
if timeout is not None and timeout > 0:
timeout_handle = io_loop.add_timeout(
io_loop.time() + timeout,
lambda: future.set_exception(
TimeoutError("Timeout after %s seconds" % timeout))
)
def on_complete(result):
if timeout_handle is not None:
io_loop.remove_timeout(timeout_handle)
gr.switch()
io_loop.add_future(future, on_complete)
gr.parent.switch()
while not future.done():
gr.parent.switch()
return future.result()