Tornado中同步方法的异步化

缘由

上次在公司用 Tornado 做了路由平台管理后台的API, 但是由于没有用到tornado的异步操作, 全部采用同步执行, 所以效率很差, 公司要求改进.

资料

抽空找了下python下的异步微型库, 发现了 gevent.
但是gevent似乎不能直接用在tornado上, 看了看gevent的文档, 发现它用了一个叫 greenlet 的协程库, 看了下它的文档, 也很简单, 就是一个切换, 貌似用 C 语言封装了线程, 重写了python库的线程库.

代码

Tornado中的异步也是基于协程的, 所以自己找了找资料, 改了一个基于greenlet的装饰器(文件就保存为greenlet), 代码如下:

# coding: utf-8
from functools import wraps
import sys
import types
import greenlet
from tornado import concurrent, gen
from tornado.ioloop import IOLoop
class TimeoutError(Exception):
"""Exception raised by ``greenletyield`` in timeout."""
def greenletsleep(timeout):
gr = greenlet.getcurrent()
assert gr.parent is not None, \
"greenletsleep() can only be called from \
functions that have the @greenado.groutine \
decorator in the call stack."
if timeout <= 0:
raise ValueError("Invalid timeout value '%s'" % timeout)
io_loop = IOLoop.current()
done = [False]
def on_timeout():
done[0] = True
gr.switch()
io_loop.add_timeout(io_loop.time() + timeout, on_timeout)
while not done[0]:
gr.parent.switch()
def generator(f):
@wraps(f)
def wrapper(*args, **kwargs):
assert greenlet.getcurrent(
).parent is not None, "Functionscan only be \
called within @run_on_greenlet decorator in the call stack."
try:
result = f(*args, **kwargs)
except (gen.Return, StopIteration) as e:
result = getattr(e, 'value', None)
else:
if isinstance(result, types.GeneratorType):
try:
future = next(result)
while True:
try:
value = greenletyield(future)
except Exception:
result.throw(*sys.exc_info())
else:
future = result.send(value)
except (gen.Return, StopIteration) as e:
return getattr(e, 'value', None)
return result
return wrapper
def run_on_greenlet(f):
@wraps(f)
def wrapper(*args, **kwargs):
future = concurrent.TracebackFuture()
def greenlet_base():
try:
future.set_result(f(*args, **kwargs))
except:
future.set_exc_info(sys.exc_info())
gr = greenlet.greenlet(greenlet_base)
gr.switch()
return future
return wrapper
def greenletyield(future, timeout=None):
gr = greenlet.getcurrent()
assert gr.parent is not None, "greenletyield() can only \
be called from functions that have the @run_on_greenlet \
decorator in the call stack."
if not future.done():
io_loop = IOLoop.current()
timeout_handle = None
if timeout is not None and timeout > 0:
timeout_handle = io_loop.add_timeout(
io_loop.time() + timeout,
lambda: future.set_exception(
TimeoutError("Timeout after %s seconds" % timeout))
)
def on_complete(result):
if timeout_handle is not None:
io_loop.remove_timeout(timeout_handle)
gr.switch()
io_loop.add_future(future, on_complete)
gr.parent.switch()
while not future.done():
gr.parent.switch()
return future.result()

测试

下面的代码可以测试以上的代码

from greenlet import run_on_greenlet
import time
from tornado import gen
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
r = yield self.test()
self.write("Blocked %s" % r)
self.finish()
@run_on_greenlet
def test(self):
print("Sleeping 10")
time.sleep(2)
return 10
def main():
tornado.options.parse_command_line()
application = tornado.web.Application([
(r"/", MainHandler),
])
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
main()

结论

同步方法经过 run_on_greenlet 装饰后, 已经实现了异步操作, 但是性能如何, 还需要测试. 另外大并发下 数据是否准确也需要验证.

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器