futures 模块封装了多线程并发的模式,易于使用。

17.1 示例:网络下载的三种风格

在公网中测试 HTTP 并发客户端可能不小心变成拒绝服务(DoS)攻击。大规模测试 HTTP 服务器时,应当自己架设测试服务器。

依序下载

  • 使用 requests下载图片等二进制文件的方法:使用 requests.get(url) 获取响应,使用响应的 content 属性得到字节序列,最后使用 fp.write(data) 写入二进制文件;
  • Python 中遇到换行才会刷新 stdout 缓冲区。可以通过 sys.stdout.flush() 强制刷新。

使用 concurrent.futures 模块下载

  • futures.ThreadPoolExecutorfutures.ProcessPoolExecutor 类在内部维护了工作线程或进程池以及任务队列,可以用来调度各可调用的对象;
  • 借助 with 使用 futures.ThreadPoolExecutor(num) 作为上下文管理器,参数为创建的线程数。其 __enter__ 方法返回的 executor 可以用来操作各线程(executor.map),关闭的 __exit__ 方法会调用 executor.shutdown(wait=True),阻塞至各线程执行完毕:
with futures.ThreadPoolExecutor(n) as executor:
    res = executor.map(func, iterable)      # similar to map()
  • executor.map 返回一个生成器,可以使用它迭代得到各任务的返回值。

future 在哪里

  • Future 类的实例都表示可能已经完成或尚未完成的延迟计算,封装了待完成的操作
  • 通常情况下不应该自己创建、修改 future,而只能由并发框架实例化、控制:只有通过并发框架来排定任务的执行时间,并控制计算何时结束;
  • future 支持的方法:
    • .done() 返回链接的可调用对象是否已经执行;
    • .add_done_callback() 添加回调函数(可调用对象);
    • .result() 返回可调用对象的结果或重新抛出异常(Executor.map 返回的生成器中使用的就是该方法,得到各 future 的运行结果);
  • 可以利用 futures.as_completed 函数手动控制 future:它接受 future 列表,返回迭代器。它不会阻塞,而是在调用时检查在 futures 运行结束之后产出 future
  • 上述实现的并发脚本并不能并行下载:GIL 会限制同一时刻只执行一个线程;
  • 使用协程实现的异步本质上也是在单线程中运行,具体见18.3 节

17.2 阻塞型 I/O 和 GIL

  • 全局解释器锁(GIL) 会限制一次只允许使用一个线程执行 Python 字节码(这是 CPython 解释器限制的,而非 Python 语言本身);
  • 标准库中所有执行阻塞型 I/O 操作的函数,在等待返回结果时都会释放 GIL。线程在等待 I/O 时,Python 调度程序会切换到另一线程。因此可以利用这一点对 I/O 密集型程序实现高效的多线程。

17.3 使用 concurrent.futures 模块启动进程

  • 使用 ProcessPoolExecutor 可以将工作分配给多个进程处理,绕开 GIL 的限制,利用所有可用的 CPU 内核,适用于 CPU 密集型处理任务;
  • ProcessPoolExecutor可选参数大多数情况下选默认值——CPU 数量;
  • CPU 密集型工作可以尝试使用 PyPy 运行脚本。

17.4 实验 Executor.map 方法

  • 调用 executor.map 方法分配任务后,各线程开始执行,即刻返回用于生成结果的生成器,此时不会阻塞
  • 当迭代结果生成器(隐式调用 next() 函数)时,它会对各个 future 实例调用 .result() 方法。如果该示例运行结束则会返回结果,如果没有则会阻塞直到运行结束
  • 该生成器产出结果的顺序与调用开始的顺序一致,因此任务次序会影响阻塞时间
  • 使用 Executor.sumbit 方法和 futures.as_completed 函数结合更灵活,可以忽略提交的顺序,随时产出执行结束的任务,且能够指定不同内容(可调用对象)的任务;
  • futures.as_completed 也可以支持来自多个 Executor 实例的 future 实例,也就是支持线程、进程混合。

17.5 显示下载进度并处理错误

显示进度——TQDM 包

  • tqdm() 函数能处理任何可迭代对象,生成一个迭代器。利用该迭代器迭代时,会显示进度条以及完成迭代预计的剩余时间。

处理错误

  • 使用 request 模块发起请求时,得到的响应会保存在 Reponse 对象中。其 status_code 表示状态码, raise_for_status() 方法可以抛出 HTTPError 异常,异常对象的 response 属性包括了响应的信息;
  • 可以使用 Enum() 创建枚举类,用来表示特定的数值。可传入类名和可迭代对象/由空格来分割各字段的字符串来创建;
  • except 的捕获语句块中使用单独的 raise (无参数)可以再次将捕获的异常向上抛出;在正常运行的语句块中使用单独的 raise 会默认抛出 RunTimeError
  • 使用 Executor.sumbit 方法和 futures.as_completed 函数来单独处理 future 时,迭代的对象是后面的函数返回的迭代器:它会产出最新完成的 future 实例,因此不能保证顺序。可以使用字典建立 future 实例到具体信息的映射。

17.6 线程和多进程的替代方案

  • CPU 密集型工作更适合使用多进程,规避 GIL;
  • 多线程可以使用 threading 模块中的组件自行组织,如 ThreadLockSemaphore 等,也需要借助线程安全的队列 queue 模块等;
  • 多进程可以使用 multiprocessing 模块,与 threading 类似。但进程间通信较难解决