异步库:asyncio.subprocess
已于 2025年12月29日 13:47 修改
访问次数:0
下面我给你一份完整、系统、带中文注释的 asyncio.subprocess 说明。
定位是:你已经懂 subprocess,现在要写 async executor / agent / Web 服务。
一、asyncio 中的 subprocess 是什么?
asyncio.subprocess 是 真正异步(非阻塞) 的子进程 API👉 可以 await,不会阻塞事件循环
它解决的是:
- FastAPI / aiohttp 中不能用 subprocess.run
- 同时跑多个外部命令
- 实时异步读取 stdout / stderr
二、核心 API 总览(只需要记住这两个)
asyncio.create_subprocess_exec() # ⭐ 推荐(无 shell)
asyncio.create_subprocess_shell() # ⚠️ shell 方式
三、create_subprocess_exec(最重要 ⭐⭐⭐⭐⭐)
1️⃣ 函数签名(官方等价形式)
asyncio.create_subprocess_exec(
*args,
stdin=None,
stdout=None,
stderr=None,
limit=asyncio.streams._DEFAULT_LIMIT,
**kwds
) -> asyncio.subprocess.Process
2️⃣ 参数详解(逐项注释)
proc = await asyncio.create_subprocess_exec(
"ping", "baidu.com", "-c", "3", # *args:命令及参数(强烈推荐列表)
stdin=None, # 标准输入(一般不用)
stdout=asyncio.subprocess.PIPE, # 捕获标准输出(异步)
stderr=asyncio.subprocess.PIPE, # 捕获错误输出(异步)
limit=2**16 # StreamReader 缓冲区大小
)
stdin / stdout / stderr 可选值
| 值 | 含义 |
|---|---|
None | 继承父进程 |
PIPE | 建立 asyncio Stream |
DEVNULL | 丢弃 |
3️⃣ 返回值:asyncio.subprocess.Process
proc # Process 对象
这是 async 版的 Popen
四、Process 对象详解(非常重要)
1️⃣ 核心属性(带注释)
proc.pid # 子进程 PID
proc.returncode # 返回码(None 表示未结束)
proc.stdin # asyncio StreamWriter
proc.stdout # asyncio StreamReader
proc.stderr # asyncio StreamReader
2️⃣ 核心方法(带注释)
await proc.wait()
code = await proc.wait()
# 等待进程结束(非阻塞)
# 返回 returncode
proc.kill()
proc.kill()
# 立即强制杀死进程(SIGKILL)
proc.terminate()
proc.terminate()
# 优雅终止进程(SIGTERM)
proc.send_signal(sig)
import signal
proc.send_signal(signal.SIGINT)
五、⭐ 异步读取 stdout / stderr(最常见用法)
1️⃣ async for 实时读取 stdout
import asyncio
async def run():
proc = await asyncio.create_subprocess_exec(
"ping", "baidu.com", "-c", "3",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 异步逐行读取 stdout
async for line in proc.stdout:
print("OUT:", line.decode().rstrip())
await proc.wait()
print("exit code:", proc.returncode)
asyncio.run(run())
✅ 不阻塞事件循环
✅ 可同时跑多个进程
2️⃣ 同时读取 stdout + stderr(推荐写法)
async def read_stream(prefix, stream):
async for line in stream:
print(prefix, line.decode().rstrip())
async def run():
proc = await asyncio.create_subprocess_exec(
"ping", "baidu.com", "-c", "3",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await asyncio.gather(
read_stream("OUT:", proc.stdout),
read_stream("ERR:", proc.stderr),
)
await proc.wait()
六、create_subprocess_shell(⚠️ 谨慎)
1️⃣ 函数签名
asyncio.create_subprocess_shell(
cmd,
stdin=None,
stdout=None,
stderr=None,
limit=...
) -> Process
2️⃣ 示例(支持管道)
proc = await asyncio.create_subprocess_shell(
"ls | grep py",
stdout=asyncio.subprocess.PIPE
)
⚠️ 风险与 shell=True 一样:
- 命令注入
- PID 是 shell 的
- 进程管理困难
七、asyncio subprocess vs subprocess 对比
| 维度 | subprocess | asyncio.subprocess |
|---|---|---|
| 是否 await | ❌ | ✅ |
| 是否阻塞 | ✅ | ❌ |
| 实时输出 | 阻塞式 | 异步 |
| 并发能力 | 差 | 强 |
| Web / Agent | ❌ | ✅ |
八、典型 executor / agent 模板(带注释)
async def exec_cmd(cmd: list[str]):
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async for line in proc.stdout:
handle_log(line)
await proc.wait()
return proc.returncode
九、常见坑(非常重要)
❌ 1. 忘了 await
proc = asyncio.create_subprocess_exec(...) # ❌
✅ 必须:
proc = await asyncio.create_subprocess_exec(...)
❌ 2. 用 text=True(不存在)
# ❌ asyncio subprocess 没有 text=True
你必须手动:
line.decode()
❌ 3. stdout 不读导致死锁
- 子进程 stdout 写满
- 你不读
- 子进程阻塞
👉 一定要消费 stdout/stderr
十、一句话总结 🧠
asyncio.create_subprocess_exec
→ 真 async
→ 可 await
→ 不阻塞事件循环
→ async agent / executor 标配
如果你愿意,我可以 直接帮你写一个「生产级 async executor 类」,包含:
- 并发限制(Semaphore)
- 超时 / 强杀
- stdout/stderr 实时回调
- PID / 进程组管理
这一步已经是「框架级」了。
评论(0)