异步库:asyncio.subprocess

下面我给你一份完整、系统、带中文注释的 asyncio.subprocess 说明
定位是:你已经懂 subprocess,现在要写 async executor / agent / Web 服务



一、asyncio 中的 subprocess 是什么?

asyncio.subprocess 是 真正异步(非阻塞) 的子进程 API👉 可以 await,不会阻塞事件循环

它解决的是:

  • FastAPI / aiohttp 中不能用 subprocess.run
  • 同时跑多个外部命令
  • 实时异步读取 stdout / stderr


二、核心 API 总览(只需要记住这两个)

asyncio.create_subprocess_exec()   # ⭐ 推荐(无 shell)
asyncio.create_subprocess_shell()  # ⚠️ shell 方式


三、create_subprocess_exec(最重要 ⭐⭐⭐⭐⭐)

1️⃣ 函数签名(官方等价形式)

asyncio.create_subprocess_exec(
    *args,
    stdin=None,
    stdout=None,
    stderr=None,
    limit=asyncio.streams._DEFAULT_LIMIT,
    **kwds
) -> asyncio.subprocess.Process


2️⃣ 参数详解(逐项注释)

proc = await asyncio.create_subprocess_exec(
    "ping", "baidu.com", "-c", "3",     # *args:命令及参数(强烈推荐列表)
    
    stdin=None,                         # 标准输入(一般不用)
    stdout=asyncio.subprocess.PIPE,    # 捕获标准输出(异步)
    stderr=asyncio.subprocess.PIPE,    # 捕获错误输出(异步)
    
    limit=2**16                         # StreamReader 缓冲区大小
)

stdin / stdout / stderr 可选值

含义
None继承父进程
PIPE建立 asyncio Stream
DEVNULL丢弃


3️⃣ 返回值:asyncio.subprocess.Process

proc  # Process 对象

这是 async 版的 Popen



四、Process 对象详解(非常重要)

1️⃣ 核心属性(带注释)

proc.pid           # 子进程 PID
proc.returncode    # 返回码(None 表示未结束)

proc.stdin         # asyncio StreamWriter
proc.stdout        # asyncio StreamReader
proc.stderr        # asyncio StreamReader


2️⃣ 核心方法(带注释)

await proc.wait()

code = await proc.wait()
# 等待进程结束(非阻塞)
# 返回 returncode


proc.kill()

proc.kill()
# 立即强制杀死进程(SIGKILL)


proc.terminate()

proc.terminate()
# 优雅终止进程(SIGTERM)


proc.send_signal(sig)

import signal
proc.send_signal(signal.SIGINT)


五、⭐ 异步读取 stdout / stderr(最常见用法)

1️⃣ async for 实时读取 stdout

import asyncio

async def run():
    proc = await asyncio.create_subprocess_exec(
        "ping", "baidu.com", "-c", "3",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # 异步逐行读取 stdout
    async for line in proc.stdout:
        print("OUT:", line.decode().rstrip())

    await proc.wait()
    print("exit code:", proc.returncode)

asyncio.run(run())

✅ 不阻塞事件循环
✅ 可同时跑多个进程



2️⃣ 同时读取 stdout + stderr(推荐写法)

async def read_stream(prefix, stream):
    async for line in stream:
        print(prefix, line.decode().rstrip())

async def run():
    proc = await asyncio.create_subprocess_exec(
        "ping", "baidu.com", "-c", "3",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    await asyncio.gather(
        read_stream("OUT:", proc.stdout),
        read_stream("ERR:", proc.stderr),
    )

    await proc.wait()


六、create_subprocess_shell(⚠️ 谨慎)

1️⃣ 函数签名

asyncio.create_subprocess_shell(
    cmd,
    stdin=None,
    stdout=None,
    stderr=None,
    limit=...
) -> Process


2️⃣ 示例(支持管道)

proc = await asyncio.create_subprocess_shell(
    "ls | grep py",
    stdout=asyncio.subprocess.PIPE
)

⚠️ 风险与 shell=True 一样:

  • 命令注入
  • PID 是 shell 的
  • 进程管理困难


七、asyncio subprocess vs subprocess 对比

维度subprocessasyncio.subprocess
是否 await
是否阻塞
实时输出阻塞式异步
并发能力
Web / Agent


八、典型 executor / agent 模板(带注释)

async def exec_cmd(cmd: list[str]):
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    async for line in proc.stdout:
        handle_log(line)

    await proc.wait()

    return proc.returncode


九、常见坑(非常重要)

❌ 1. 忘了 await

proc = asyncio.create_subprocess_exec(...)  # ❌

✅ 必须:

proc = await asyncio.create_subprocess_exec(...)


❌ 2. 用 text=True(不存在)

# ❌ asyncio subprocess 没有 text=True

你必须手动:

line.decode()


❌ 3. stdout 不读导致死锁

  • 子进程 stdout 写满
  • 你不读
  • 子进程阻塞

👉 一定要消费 stdout/stderr



十、一句话总结 🧠

asyncio.create_subprocess_exec
    → 真 async
    → 可 await
    → 不阻塞事件循环
    → async agent / executor 标配


如果你愿意,我可以 直接帮你写一个「生产级 async executor 类」,包含:

  • 并发限制(Semaphore)
  • 超时 / 强杀
  • stdout/stderr 实时回调
  • PID / 进程组管理

这一步已经是「框架级」了。

文章标签:

评论(0)