concurrent.future模块
已于 2025年05月10日 14:36 修改
访问次数:0
1)核心模块:concurrent.futures
线程池在 Python 里主要来自:
- concurrent.futures.Executor(抽象基类)
- concurrent.futures.ThreadPoolExecutor(线程池实现)
- concurrent.futures.Future(任务句柄)
- 一些辅助函数:as_completed、wait
你现在用的就是这一套。
2)ThreadPoolExecutor:线程池 API
2.1 构造函数
ThreadPoolExecutor(
max_workers=None,
thread_name_prefix="",
initializer=None,
initargs=()
)
参数解释
max_workers
- 最大线程数
- 默认值:min(32, os.cpu_count() + 4)(大概规则是:CPU核数+4,上限32)
- 用途:控制并发度
注意:线程池适合 IO 密集型,不适合 CPU 密集型(因为 GIL)。
thread_name_prefix
- 线程名的前缀
- 便于 debug 和日志定位
- 你用的 "test" 就很好
initializer / initargs
- 每个线程启动时会执行一次的初始化函数
- 比如:设置 thread-local、初始化数据库连接、设置某些 SDK 的线程环境
示例:
def init():
print("thread init")
pool = ThreadPoolExecutor(initializer=init)
2.2 提交任务:submit
future = pool.submit(fn, *args, **kwargs)
- 立即把任务放进队列
- 返回 Future
- 任务什么时候跑不保证(取决于空闲线程)
你现在就是这么用的:
pool.submit(func1, TaskId(i))
2.3 批量提交:map
pool.map(fn, iterable, timeout=None, chunksize=1)
特点
- 返回的是一个迭代器(你遍历它才会取结果)
- 结果顺序与输入顺序一致(这点跟 as_completed 不同)
- 某个任务抛异常,会在你迭代取结果时抛出
例子:
for r in pool.map(func1, range(20)):
print(r)
timeout
- 是“取结果时的总超时”
- 不是单任务超时
2.4 关闭线程池:shutdown
pool.shutdown(wait=True, cancel_futures=False)
参数解释
wait=True
- 等所有任务完成再返回
cancel_futures=False
- Python 3.9+ 才有
- 如果 True:取消所有“还没开始执行”的任务
- 已经在跑的任务取消不了(线程没法强杀)
2.5 上下文管理(with)
with ThreadPoolExecutor() as pool:
...
等价于:
pool = ThreadPoolExecutor()
try:
...
finally:
pool.shutdown(wait=True)
你现在用的就是最推荐的方式。
3)Future:任务句柄 API(非常重要)
Future 是你拿到任务状态/结果/异常的关键。
3.1 future.result(timeout=None)
- 获取任务返回值
- 如果任务抛异常,这里会重新抛出
- 如果任务没完成且 timeout 到了,抛 TimeoutError
示例:
try:
r = future.result(timeout=1)
except TimeoutError:
...
⚠️ 注意:timeout 是“等多久”,不是“任务最多执行多久”。
超时后任务仍然会继续跑。
3.2 future.exception(timeout=None)
- 返回任务抛出的异常对象
- 没异常返回 None
- timeout 行为同 result()
3.3 future.done()
future.done() # True/False
任务是否完成(成功/失败都算完成)。
3.4 future.running()
任务是否正在运行(已被线程取走执行)。
3.5 future.cancel()
尝试取消任务。
返回值:
- True:取消成功(一般意味着任务还在队列里没开始)
- False:取消失败(任务已经开始执行,或已经完成)
⚠️ 线程池任务开始跑以后基本取消不了。
3.6 future.cancelled()
任务是否已被取消。
3.7 future.add_done_callback(fn)
给 future 加回调:任务完成时调用。
def cb(fut: Future):
print("done", fut.result())
future.add_done_callback(cb)
⚠️ 注意:回调是在执行任务的那个工作线程里调用的,不是主线程。
3.8 future.set_running_or_notify_cancel() / set_result / set_exception
这些是 Future 内部机制用的,一般你不用(除非你自己实现 Executor)。
4)辅助函数:as_completed 与 wait
这俩非常常用。
4.1 as_completed(futures, timeout=None)
for fut in as_completed(futures, timeout=3):
...
特点
- 谁先完成先 yield 谁(不保证顺序)
- 如果 timeout 到了但还有没完成的,抛 TimeoutError
- 超时后你仍然可以继续处理剩余 futures(比如你现在那样标记 TIMEOUT_TOTAL)
你现在这段用法是很标准的。
4.2 wait(futures, timeout=None, return_when=ALL_COMPLETED)
done, not_done = wait(futures, timeout=3)
返回两个集合:
- done:已完成的 futures
- not_done:未完成的 futures
return_when 可选值
- FIRST_COMPLETED:任意一个完成就返回
- FIRST_EXCEPTION:任意一个抛异常就返回
- ALL_COMPLETED:全部完成才返回(默认)
这适合你想“等一波”然后一次性处理。
5)异常体系(你一定会遇到)
5.1 TimeoutError
来源:
- future.result(timeout=...)
- as_completed(..., timeout=...)
- wait(..., timeout=...)
注意:concurrent.futures.TimeoutError 和 asyncio.TimeoutError 很像,但不是同一个东西。
5.2 任务内部异常
任务里抛异常不会立刻炸线程池,异常会被 future 捕获。
只有当你:
- future.result()
- 或迭代 pool.map()
才会把异常重新抛出来。
6)线程池最常见的坑(你刚好踩过)
6.1 线程池无法真正“超时杀任务”
你能做的只有:
- “超时后不等它”
- 或者尝试 cancel(但通常 cancel 不掉正在跑的)
要强制终止任务通常要换:
- ProcessPoolExecutor
- 或者任务自己支持中断(检查 Event)
6.2 as_completed(timeout=...) 是总超时
不是单任务超时。
你现在的方案(总超时 + 事后判断单任务耗时)是合理的。
6.3 Future 作为 dict key 是常规用法
你用:
task_map = {future: task_id}
这是非常标准的模式。
7)一张“线程池 API 全景图”(你可以当速查表)
ThreadPoolExecutor
- submit(fn, *args, **kwargs) -> Future
- map(fn, iterable, timeout=None, chunksize=1) -> iterator
- shutdown(wait=True, cancel_futures=False)
- __enter__ / __exit__(with)
Future
- result(timeout=None)
- exception(timeout=None)
- done()
- running()
- cancel()
- cancelled()
- add_done_callback(fn)
工具函数
- as_completed(futures, timeout=None)
- wait(futures, timeout=None, return_when=...)
评论(0)