concurrent.future模块


1)核心模块:concurrent.futures

线程池在 Python 里主要来自:

  • concurrent.futures.Executor(抽象基类)
  • concurrent.futures.ThreadPoolExecutor(线程池实现)
  • concurrent.futures.Future(任务句柄)
  • 一些辅助函数:as_completed、wait

你现在用的就是这一套。



2)ThreadPoolExecutor:线程池 API

2.1 构造函数

ThreadPoolExecutor(
    max_workers=None,
    thread_name_prefix="",
    initializer=None,
    initargs=()
)

参数解释

max_workers

  • 最大线程数
  • 默认值:min(32, os.cpu_count() + 4)(大概规则是:CPU核数+4,上限32)
  • 用途:控制并发度
注意:线程池适合 IO 密集型,不适合 CPU 密集型(因为 GIL)。


thread_name_prefix

  • 线程名的前缀
  • 便于 debug 和日志定位
  • 你用的 "test" 就很好


initializer / initargs

  • 每个线程启动时会执行一次的初始化函数
  • 比如:设置 thread-local、初始化数据库连接、设置某些 SDK 的线程环境

示例:

def init():
    print("thread init")

pool = ThreadPoolExecutor(initializer=init)


2.2 提交任务:submit

future = pool.submit(fn, *args, **kwargs)
  • 立即把任务放进队列
  • 返回 Future
  • 任务什么时候跑不保证(取决于空闲线程)

你现在就是这么用的:

pool.submit(func1, TaskId(i))


2.3 批量提交:map

pool.map(fn, iterable, timeout=None, chunksize=1)

特点

  • 返回的是一个迭代器(你遍历它才会取结果)
  • 结果顺序与输入顺序一致(这点跟 as_completed 不同)
  • 某个任务抛异常,会在你迭代取结果时抛出

例子:

for r in pool.map(func1, range(20)):
    print(r)

timeout

  • 是“取结果时的总超时”
  • 不是单任务超时


2.4 关闭线程池:shutdown

pool.shutdown(wait=True, cancel_futures=False)

参数解释

wait=True

  • 等所有任务完成再返回

cancel_futures=False

  • Python 3.9+ 才有
  • 如果 True:取消所有“还没开始执行”的任务
  • 已经在跑的任务取消不了(线程没法强杀)


2.5 上下文管理(with)

with ThreadPoolExecutor() as pool:
    ...

等价于:

pool = ThreadPoolExecutor()
try:
    ...
finally:
    pool.shutdown(wait=True)

你现在用的就是最推荐的方式。



3)Future:任务句柄 API(非常重要)

Future 是你拿到任务状态/结果/异常的关键。



3.1 future.result(timeout=None)

  • 获取任务返回值
  • 如果任务抛异常,这里会重新抛出
  • 如果任务没完成且 timeout 到了,抛 TimeoutError

示例:

try:
    r = future.result(timeout=1)
except TimeoutError:
    ...

⚠️ 注意:
timeout 是“等多久”,不是“任务最多执行多久”。
超时后任务仍然会继续跑。



3.2 future.exception(timeout=None)

  • 返回任务抛出的异常对象
  • 没异常返回 None
  • timeout 行为同 result()


3.3 future.done()

future.done()  # True/False

任务是否完成(成功/失败都算完成)。



3.4 future.running()

任务是否正在运行(已被线程取走执行)。



3.5 future.cancel()

尝试取消任务。

返回值:

  • True:取消成功(一般意味着任务还在队列里没开始)
  • False:取消失败(任务已经开始执行,或已经完成)

⚠️ 线程池任务开始跑以后基本取消不了。



3.6 future.cancelled()

任务是否已被取消。



3.7 future.add_done_callback(fn)

给 future 加回调:任务完成时调用。

def cb(fut: Future):
    print("done", fut.result())

future.add_done_callback(cb)

⚠️ 注意:回调是在执行任务的那个工作线程里调用的,不是主线程。



3.8 future.set_running_or_notify_cancel() / set_result / set_exception

这些是 Future 内部机制用的,一般你不用(除非你自己实现 Executor)。



4)辅助函数:as_completedwait

这俩非常常用。



4.1 as_completed(futures, timeout=None)

for fut in as_completed(futures, timeout=3):
    ...

特点

  • 谁先完成先 yield 谁(不保证顺序)
  • 如果 timeout 到了但还有没完成的,抛 TimeoutError
  • 超时后你仍然可以继续处理剩余 futures(比如你现在那样标记 TIMEOUT_TOTAL)

你现在这段用法是很标准的。



4.2 wait(futures, timeout=None, return_when=ALL_COMPLETED)

done, not_done = wait(futures, timeout=3)

返回两个集合:

  • done:已完成的 futures
  • not_done:未完成的 futures

return_when 可选值

  • FIRST_COMPLETED:任意一个完成就返回
  • FIRST_EXCEPTION:任意一个抛异常就返回
  • ALL_COMPLETED:全部完成才返回(默认)

这适合你想“等一波”然后一次性处理。



5)异常体系(你一定会遇到)

5.1 TimeoutError

来源:

  • future.result(timeout=...)
  • as_completed(..., timeout=...)
  • wait(..., timeout=...)

注意:
concurrent.futures.TimeoutErrorasyncio.TimeoutError 很像,但不是同一个东西。



5.2 任务内部异常

任务里抛异常不会立刻炸线程池,异常会被 future 捕获。

只有当你:

  • future.result()
  • 或迭代 pool.map()

才会把异常重新抛出来。



6)线程池最常见的坑(你刚好踩过)

6.1 线程池无法真正“超时杀任务”

你能做的只有:

  • “超时后不等它”
  • 或者尝试 cancel(但通常 cancel 不掉正在跑的)

要强制终止任务通常要换:

  • ProcessPoolExecutor
  • 或者任务自己支持中断(检查 Event)


6.2 as_completed(timeout=...) 是总超时

不是单任务超时。

你现在的方案(总超时 + 事后判断单任务耗时)是合理的。



6.3 Future 作为 dict key 是常规用法

你用:

task_map = {future: task_id}

这是非常标准的模式。



7)一张“线程池 API 全景图”(你可以当速查表)

ThreadPoolExecutor

  • submit(fn, *args, **kwargs) -> Future
  • map(fn, iterable, timeout=None, chunksize=1) -> iterator
  • shutdown(wait=True, cancel_futures=False)
  • __enter__ / __exit__(with)

Future

  • result(timeout=None)
  • exception(timeout=None)
  • done()
  • running()
  • cancel()
  • cancelled()
  • add_done_callback(fn)

工具函数

  • as_completed(futures, timeout=None)
  • wait(futures, timeout=None, return_when=...)
文章标签:

评论(0)