Chapter 17 Concurrency with Futures
futures模块封装了多线程并发的模式,易于使用。
17.1 示例:网络下载的三种风格
在公网中测试 HTTP 并发客户端可能不小心变成拒绝服务(DoS)攻击。大规模测试 HTTP 服务器时,应当自己架设测试服务器。
依序下载
- 使用
requests库下载图片等二进制文件的方法:使用requests.get(url)获取响应,使用响应的content属性得到字节序列,最后使用fp.write(data)写入二进制文件; - Python 中遇到换行才会刷新
stdout缓冲区。可以通过sys.stdout.flush()强制刷新。
使用 concurrent.futures 模块下载
futures.ThreadPoolExecutor和futures.ProcessPoolExecutor类在内部维护了工作线程或进程池以及任务队列,可以用来调度各可调用的对象;- 借助
with使用futures.ThreadPoolExecutor(num)作为上下文管理器,参数为创建的线程数。其__enter__方法返回的executor可以用来操作各线程(executor.map),关闭的__exit__方法会调用executor.shutdown(wait=True),阻塞至各线程执行完毕:
with futures.ThreadPoolExecutor(n) as executor:
res = executor.map(func, iterable) # similar to map()
executor.map返回一个生成器,可以使用它迭代得到各任务的返回值。
future 在哪里
Future类的实例都表示可能已经完成或尚未完成的延迟计算,封装了待完成的操作;- 通常情况下不应该自己创建、修改 future,而只能由并发框架实例化、控制:只有通过并发框架来排定任务的执行时间,并控制计算何时结束;
- future 支持的方法:
.done()返回链接的可调用对象是否已经执行;.add_done_callback()添加回调函数(可调用对象);.result()返回可调用对象的结果或重新抛出异常(Executor.map返回的生成器中使用的就是该方法,得到各 future 的运行结果);
- 可以利用
futures.as_completed函数手动控制 future:它接受 future 列表,返回迭代器。它不会阻塞,而是在调用时检查在 futures 运行结束之后产出 future; - 上述实现的并发脚本并不能并行下载:GIL 会限制同一时刻只执行一个线程;
- 使用协程实现的异步本质上也是在单线程中运行,具体见18.3 节。
17.2 阻塞型 I/O 和 GIL
- 全局解释器锁(GIL) 会限制一次只允许使用一个线程执行 Python 字节码(这是 CPython 解释器限制的,而非 Python 语言本身);
- 标准库中所有执行阻塞型 I/O 操作的函数,在等待返回结果时都会释放 GIL。线程在等待 I/O 时,Python 调度程序会切换到另一线程。因此可以利用这一点对 I/O 密集型程序实现高效的多线程。
17.3 使用 concurrent.futures 模块启动进程
- 使用
ProcessPoolExecutor可以将工作分配给多个进程处理,绕开 GIL 的限制,利用所有可用的 CPU 内核,适用于 CPU 密集型处理任务; ProcessPoolExecutor的可选参数大多数情况下选默认值——CPU 数量;- CPU 密集型工作可以尝试使用 PyPy 运行脚本。
17.4 实验 Executor.map 方法
- 调用
executor.map方法分配任务后,各线程开始执行,即刻返回用于生成结果的生成器,此时不会阻塞; - 当迭代结果生成器(隐式调用
next()函数)时,它会对各个 future 实例调用.result()方法。如果该示例运行结束则会返回结果,如果没有则会阻塞直到运行结束; - 该生成器产出结果的顺序与调用开始的顺序一致,因此任务次序会影响阻塞时间;
- 使用
Executor.sumbit方法和futures.as_completed函数结合更灵活,可以忽略提交的顺序,随时产出执行结束的任务,且能够指定不同内容(可调用对象)的任务; futures.as_completed也可以支持来自多个Executor实例的 future 实例,也就是支持线程、进程混合。
17.5 显示下载进度并处理错误
显示进度——TQDM 包
tqdm()函数能处理任何可迭代对象,生成一个迭代器。利用该迭代器迭代时,会显示进度条以及完成迭代预计的剩余时间。
处理错误
- 使用
request模块发起请求时,得到的响应会保存在Reponse对象中。其status_code表示状态码,raise_for_status()方法可以抛出HTTPError异常,异常对象的response属性包括了响应的信息; - 可以使用
Enum()创建枚举类,用来表示特定的数值。可传入类名和可迭代对象/由空格来分割各字段的字符串来创建; - 在
except的捕获语句块中使用单独的raise(无参数)可以再次将捕获的异常向上抛出;在正常运行的语句块中使用单独的raise会默认抛出RunTimeError; - 使用
Executor.sumbit方法和futures.as_completed函数来单独处理 future 时,迭代的对象是后面的函数返回的迭代器:它会产出最新完成的 future 实例,因此不能保证顺序。可以使用字典建立 future 实例到具体信息的映射。
17.6 线程和多进程的替代方案
- CPU 密集型工作更适合使用多进程,规避 GIL;
- 多线程可以使用
threading模块中的组件自行组织,如Thread、Lock、Semaphore等,也需要借助线程安全的队列queue模块等; - 多进程可以使用
multiprocessing模块,与threading类似。但进程间通信较难解决。