Chapter 18 Concurrency with asyncio
并发是指一次处理多件事,而并行是指一次做多件事。真正的并行需要多个核心。并发用于制定方案,用来解决可能并行的问题。
18.1 线程与协程对比
线程控制
- 可以创建单独的信号量可变对象,将它传入线程中,以此从外部控制线程的继续和终止;
- 小技巧:
- 将光标从行末移动到行首:使用退格符
'\x08' * len(line); - 可以用
sys.stdout.write和sys.stdout.flush来控制标准控制台的输入输出:write(str)写入缓冲区,flush()刷新缓冲区到屏幕输出。
- 将光标从行末移动到行首:使用退格符
asyncio 包的基本使用
- 使用
yield from来交出控制权,由事件循环处理; - 在协程中使用
yield from asyncio.sleep(t)来实现协程内部的休眠:这样不会阻塞事件循环,而是把控制权交给主循环(主循环就是asyncio控制事件轮转的循环,在模块内部实现),等待休眠结束后恢复这个协程; - 使用
asyncio.async(coro_or_future)函数排定协程的运行时间,使用Task对象包装了该协程(作为参数传入),并立即返回。它相当于在主循环的事件队列中创建了一个事件,其操作就是执行该任务; - 在主函数中启动协程:先获取事件循环的引用
asyncio.get_event_loop(),然后调用其方法run_until_complete(coroutine)启动该协程并执行完毕,返回值即为调用的返回值。最后使用loop.close()关闭事件循环; - 可以使用
Task.cancel()方法取消任务:它会在协程当前暂停的yield处抛出asyncioCancelledError异常。需要在协程中捕获该异常,并正确处理取消; - 在协程前添加
@asyncio.coroutine装饰器不是强制的,但它可以标记出协程,且可以在调试中对过早的垃圾回收等操作发出警告; - 注意:协程间通信和控制的部分
supervisor也需要通过协程实现,在主函数中使用loop.run_until_complete方法执行。主函数会阻塞到返回结果; - 线程可能在任意执行过程被中断,因此需要使用保留锁去保护数据的完整性;协程则是安全的:它只可能在自行设置的
yield处交出控制权。
asyncio.Future:故意不阻塞
Task是Future的子类,用于包装协程。它可以通过BaseEventLoop.create_task()创建并排定时间。这与Executor.submit()创建concurrent.futures.Future实例类似;asyncio.Future类的.result()方法不能指定超时时间,但它不会阻塞到任务执行结束:如果还没运行完毕,则会抛出asyncio.InvalidStateError异常。不过一般使用yield from语句来获取运行的结果;yield from的作用是把控制权还给事件循环。它的作用等同于回调函数:延迟的操作结束后,操作的返回值会赋给yield from表达式,并恢复执行协程,完成后续工作;asyncio.Future对象由yield from驱动。
从 future、任务和协程中产出
BaseEventLoop.create_task(coro)排定协程的执行时间,创建一个Task对象并返回;asyncio.async(coro_or_future, *, loop=None)可以利用协程创建任务(使用上面的create_task方法),返回得到的Task对象。也可以接受Task/Future对象,直接返回。默认的事件循环loop为asyncio.get_event_loop()的结果,可以传入参数来另外指定;- 还有多个函数会自动把参数指定的协程包装在
Task对象中,如run_until_complete等。
18.2 使用 asyncio 和 aiohttp 包下载
当下使用
asyncio实现 HTTP 客户端和服务器时,使用的大多是aiohttp包。
asyncio.wait()接受由 future 或协程构成的可迭代对象,将它们分别包装成Task对象,最后返回一个协程对象,它会等待其中所有的协程运行完毕后结束。它接受timeout和return_when可选参数来控制提前终止。它运行结束后返回元组,分别包含结束的 future 和未结束的 future;aiohttp提供异步版的访问网络的函数,使用yield from可以将控制权交给事件循环;- 与之前使用的协程类似,使用
yield from可以编写协程链条,将职责逐层委托到下一层协程,最终委托给某个协程函数或方法。这相当于架起了管道,让asyncio事件循环来直接驱动执行底层的异步函数。
18.3 避免阻塞型调用
- 使用基于事件循环的协程并发,协程的
.send()方法和事件的回调函数效果相近,且协程比线程消耗的内存数量级小,协程还能避免“回调地狱”(见18.5 节)。
18.4 改进 asyncio 下载脚本
使用 asyncio.as_completed 函数
- 使用
asyncio包的程序中只有一个主线程,其中不能有阻塞型调用,事件循环也在其中运行。若要对具体协程引入自定义的等待与处理(如书中的进度条实例),可以新建一个协程,在其中控制各协程并阻塞处理(必须使用yield from获取各 future 的结果,这就使得函数变为协程。这个协程就相当于wait()创建的协程),并在主程序中启动该协程; - 可以使用
asyncio.Semaphore类(信号量)的实例来限制并发请求数量:使用with (yield from semaphore)上下文管理器来表示临界区,退出上下文时会还原信号量。还可用.acquire()和.release()方法来使用。如果超过了所允许的最大值,则会阻塞该协程; raise X from Y句法可以将新抛出的异常链接原来的异常;asyncio.as_completed()接受包含不同协程的可迭代对象,返回迭代器,迭代它可以阻塞直至产出最新完成的 future 实例。这与concurrent.futures库中的类似操作基本相同;- 注意:不可以用之前的字典映射方式来建立 future 到具体信息的映射,这是因为
asyncio包内部可能会根据我们提供的 future 生成新的对象。可以将错误信息存储在异常对象中。
使用 Executor 对象,防止阻塞事件循环
- 与线程不同,协程中的阻塞型函数会阻塞整个程序(只有一个运行线程);
asyncio的事件循环背后维护了ThreadPoolExecutor对象,可以对事件循环调用.run_in_executor方法,把可调用对象发给它在另一线程中执行,避免阻塞整个线程。
18.5 从回调到 future 和协程
回调地狱:如果一个操作需要依赖多个之前操作的结果,那就需要嵌套回调。
- 回调地狱的问题:由于需要多个分开定义的函数各做一部分工作,并通过函数调用串联起来,不同函数就会使用到不同的上下文,无法实现全部数据共享。同时如果需要处理错误情形,还需要注册两个回调函数,分别处理正确和错误情形;
- 可以使用协程式异步 API 来避免上面的问题:将所有的操作顺序写在同一定义体中,使用
yield交出控制权并执行异步操作,这样就不需要实现嵌套的回调函数,能共享上下文,处理异常时也更方便(try/except); - 不过协程必须使用事件循环来显式调度执行(如
loop.create_task()),不能直接调用。
18.6 使用 asyncio 包编写服务器
使用 asyncio 包编写 TCP 服务器
- 在使用
StreamWriter和StreamReader的 I/O 方法中,有的是协程,必须用yield from驱动,另一些则是普通的函数; asyncio.start_server()可以使用 handler、地址等信息创建服务器协程并返回。使用run_until_complete时会启动该服务器,并立刻返回服务器对象(此时还不能用)。之后用loop.run_forever()启动事件循环,此时服务器开始可以处理请求,主程序在此阻塞。- 按上述方法创建的服务器
server在结束时,使用server.close()关闭,再运行由server.wait_closed()得到的 future ,等待关闭; - 在事件循环运行期间,只要有新客户端连接服务器,就会启动一个 handler 协程实例。
使用 aiohttp 包编写服务器
aiohttp.web部分支持服务器端的 HTTP 操作,如web.Application(loop)可以创建 Web 应用,可以使用这一应用实例来进行路由配置,并将 handler 绑定到利用事件循环创建的基于协程的服务器上。
更好地支持并发的智能客户端
- 对于上面讨论的协程服务器来说,访问数据库时应当使用异步访问,否则在等待查询结果时,事件循环会阻塞;
- 高并发的系统需要把复杂的工作分成多步,如响应时间太长时可采用分页。
18.7 本章小结
- 异步系统能避免用户级线程的开销,这是它能比多线程系统管理更多并发连接的主要原因。应用中只需要确保没有阻塞的代码即可。