并发是指一次处理多件事,而并行是指一次多件事。真正的并行需要多个核心。并发用于制定方案,用来解决可能并行的问题。

18.1 线程与协程对比

线程控制

  • 可以创建单独的信号量可变对象,将它传入线程中,以此从外部控制线程的继续和终止
  • 小技巧:
    • 光标从行末移动到行首:使用退格符 '\x08' * len(line)
    • 可以用 sys.stdout.writesys.stdout.flush 来控制标准控制台的输入输出:write(str) 写入缓冲区,flush() 刷新缓冲区到屏幕输出。

asyncio 包的基本使用

  • 使用 yield from 来交出控制权,由事件循环处理;
  • 在协程中使用 yield from asyncio.sleep(t) 来实现协程内部的休眠:这样不会阻塞事件循环,而是把控制权交给主循环(主循环就是 asyncio 控制事件轮转的循环,在模块内部实现),等待休眠结束后恢复这个协程;
  • 使用 asyncio.async(coro_or_future) 函数排定协程的运行时间,使用 Task 对象包装了该协程(作为参数传入),并立即返回。它相当于在主循环的事件队列中创建了一个事件,其操作就是执行该任务;
  • 在主函数中启动协程:先获取事件循环的引用 asyncio.get_event_loop(),然后调用其方法 run_until_complete(coroutine) 启动该协程并执行完毕,返回值即为调用的返回值。最后使用 loop.close() 关闭事件循环
  • 可以使用 Task.cancel() 方法取消任务:它会在协程当前暂停的 yield抛出 asyncioCancelledError 异常。需要在协程中捕获该异常,并正确处理取消;
  • 在协程前添加 @asyncio.coroutine 装饰器不是强制的,但它可以标记出协程,且可以在调试中对过早的垃圾回收等操作发出警告;
  • 注意:协程间通信和控制的部分 supervisor 也需要通过协程实现,在主函数中使用 loop.run_until_complete 方法执行。主函数会阻塞到返回结果;
  • 线程可能在任意执行过程被中断,因此需要使用保留锁去保护数据的完整性;协程则是安全的:它只可能在自行设置的 yield 处交出控制权

asyncio.Future:故意不阻塞

  • TaskFuture 的子类,用于包装协程。它可以通过 BaseEventLoop.create_task() 创建并排定时间。这与 Executor.submit() 创建 concurrent.futures.Future 实例类似;
  • asyncio.Future 类的 .result() 方法不能指定超时时间,但它不会阻塞到任务执行结束:如果还没运行完毕,则会抛出 asyncio.InvalidStateError 异常。不过一般使用 yield from 语句来获取运行的结果
  • yield from 的作用是把控制权还给事件循环。它的作用等同于回调函数:延迟的操作结束后,操作的返回值会赋给 yield from 表达式,并恢复执行协程,完成后续工作;
  • asyncio.Future 对象yield from 驱动

从 future、任务和协程中产出

  • BaseEventLoop.create_task(coro) 排定协程的执行时间,创建一个 Task 对象并返回;
  • asyncio.async(coro_or_future, *, loop=None) 可以利用协程创建任务(使用上面的 create_task 方法),返回得到的 Task 对象。也可以接受 Task/Future 对象,直接返回。默认的事件循环 loopasyncio.get_event_loop() 的结果,可以传入参数来另外指定;
  • 还有多个函数会自动把参数指定的协程包装在 Task 对象中,如 run_until_complete 等。

18.2 使用 asyncioaiohttp 包下载

当下使用 asyncio 实现 HTTP 客户端和服务器时,使用的大多是 aiohttp 包。

  • asyncio.wait() 接受由 future 或协程构成的可迭代对象,将它们分别包装成 Task 对象,最后返回一个协程对象,它会等待其中所有的协程运行完毕后结束。它接受 timeoutreturn_when 可选参数来控制提前终止。它运行结束后返回元组,分别包含结束的 future 和未结束的 future;
  • aiohttp 提供异步版的访问网络的函数,使用 yield from 可以将控制权交给事件循环;
  • 与之前使用的协程类似,使用 yield from 可以编写协程链条,将职责逐层委托到下一层协程,最终委托给某个协程函数或方法。这相当于架起了管道,让 asyncio 事件循环来直接驱动执行底层的异步函数。

18.3 避免阻塞型调用

  • 使用基于事件循环的协程并发,协程的 .send() 方法和事件的回调函数效果相近,且协程比线程消耗的内存数量级小,协程还能避免“回调地狱”(见18.5 节)。

18.4 改进 asyncio 下载脚本

使用 asyncio.as_completed 函数

  • 使用 asyncio 包的程序中只有一个主线程,其中不能有阻塞型调用,事件循环也在其中运行。若要对具体协程引入自定义的等待与处理(如书中的进度条实例),可以新建一个协程,在其中控制各协程并阻塞处理(必须使用 yield from 获取各 future 的结果,这就使得函数变为协程。这个协程就相当于 wait() 创建的协程),并在主程序中启动该协程;
  • 可以使用 asyncio.Semaphore 类(信号量)的实例来限制并发请求数量:使用 with (yield from semaphore) 上下文管理器来表示临界区,退出上下文时会还原信号量。还可用 .acquire().release() 方法来使用。如果超过了所允许的最大值,则会阻塞该协程
  • raise X from Y 句法可以将新抛出的异常链接原来的异常
  • asyncio.as_completed() 接受包含不同协程的可迭代对象,返回迭代器,迭代它可以阻塞直至产出最新完成的 future 实例。这与 concurrent.futures 库中的类似操作基本相同;
  • 注意:不可以用之前的字典映射方式来建立 future 到具体信息的映射,这是因为 asyncio 包内部可能会根据我们提供的 future 生成新的对象。可以将错误信息存储在异常对象中

使用 Executor 对象,防止阻塞事件循环

  • 与线程不同,协程中的阻塞型函数会阻塞整个程序(只有一个运行线程);
  • asyncio事件循环背后维护了 ThreadPoolExecutor 对象,可以对事件循环调用 .run_in_executor 方法,把可调用对象发给它在另一线程中执行,避免阻塞整个线程。

18.5 从回调到 future 和协程

回调地狱:如果一个操作需要依赖多个之前操作的结果,那就需要嵌套回调。

  • 回调地狱的问题:由于需要多个分开定义的函数各做一部分工作,并通过函数调用串联起来,不同函数就会使用到不同的上下文,无法实现全部数据共享。同时如果需要处理错误情形,还需要注册两个回调函数,分别处理正确和错误情形;
  • 可以使用协程式异步 API 来避免上面的问题:将所有的操作顺序写在同一定义体中,使用 yield 交出控制权并执行异步操作,这样就不需要实现嵌套的回调函数,能共享上下文处理异常时也更方便(try/except);
  • 不过协程必须使用事件循环来显式调度执行(如 loop.create_task()),不能直接调用。

18.6 使用 asyncio 包编写服务器

使用 asyncio 包编写 TCP 服务器

  • 在使用 StreamWriterStreamReader 的 I/O 方法中,有的是协程,必须用 yield from 驱动,另一些则是普通的函数;
  • asyncio.start_server() 可以使用 handler、地址等信息创建服务器协程并返回。使用 run_until_complete 时会启动该服务器,并立刻返回服务器对象(此时还不能用)。之后用 loop.run_forever() 启动事件循环此时服务器开始可以处理请求,主程序在此阻塞。
  • 按上述方法创建的服务器 server 在结束时,使用 server.close() 关闭,再运行由 server.wait_closed() 得到的 future ,等待关闭;
  • 在事件循环运行期间,只要有新客户端连接服务器,就会启动一个 handler 协程实例。

使用 aiohttp 包编写服务器

  • aiohttp.web 部分支持服务器端的 HTTP 操作,如 web.Application(loop) 可以创建 Web 应用,可以使用这一应用实例来进行路由配置,并将 handler 绑定到利用事件循环创建的基于协程的服务器上。

更好地支持并发的智能客户端

  • 对于上面讨论的协程服务器来说,访问数据库时应当使用异步访问,否则在等待查询结果时,事件循环会阻塞;
  • 高并发的系统需要把复杂的工作分成多步,如响应时间太长时可采用分页

18.7 本章小结

  • 异步系统能避免用户级线程的开销,这是它能比多线程系统管理更多并发连接的主要原因。应用中只需要确保没有阻塞的代码即可。