Skip to content

第 11 章 后台执行 —— Agent 不等待

Agent 的工具调用默认是阻塞的——模型发出一条 tool_use,harness 执行完毕、拿到结果后才进入下一轮推理。后台执行的核心命题是打破这个串行瓶颈:慢操作交给独立线程/协程,主循环继续思考,完成后通过通知队列将结果注入模型的下一轮上下文。


11.1 阻塞式执行的代价

串行瓶颈的本质

标准 Agent Loop 的执行模型是严格同步的:

text
LLM call → tool_use(A) → execute A → tool_result → LLM call → tool_use(B) → execute B → ...

每一步工具执行期间,模型处于空闲等待状态。这在快速工具(读文件、字符串操作)上不是问题,但在以下场景中代价巨大:

操作典型耗时阻塞影响
npm install30-120s模型空等,用户无交互
pytest (大型项目)60-300s一轮工具调用消耗整个超时窗口
docker build60-600s任务可能直接超时失败
git clone (大仓库)30-180s后续工作全部排队

并行需求的场景

用户的自然指令常常隐含并行意图:

  • "安装依赖,同时帮我创建配置文件"
  • "跑一下测试,然后开始写文档"(测试和写文档之间无依赖)
  • "lint 代码和跑 type check,有报错再一起修"

阻塞式执行下,Agent 只能串行处理这些请求:先等 npm install 跑完 120 秒,再创建配置文件——即使这两件事之间零依赖。用户体验从"助手在帮我干活"退化为"我在等助手干活"。

超时风险叠加

更隐蔽的问题是超时叠加。假设 Agent 每次工具调用的超时是 120 秒,一个任务需要依次执行 3 个慢命令(每个 90 秒),串行执行总耗时 270 秒。如果系统对整个 Agent turn 有超时限制(例如 300 秒),留给模型推理的时间只剩 30 秒。而并行执行这 3 个命令,工具执行阶段只需 90 秒。


11.2 后台任务架构

核心设计:daemon 线程 + 通知队列

后台执行的架构只有两个组件:一个管理后台任务生命周期的 BackgroundManager,和一个线程安全的通知队列(notification queue)。

python
Main thread                     Background thread(s)
+-----------------------+       +-------------------+
| Agent Loop            |       | Task A (daemon)   |
|   LLM call            |       |   subprocess.run  |
|   tool dispatch       |       |   ...             |
|   ...                 |  ←──  |   enqueue(result) |
|   [drain queue]       |       +-------------------+
|   LLM call (with      |       +-------------------+
|     injected results) |       | Task B (daemon)   |
+-----------------------+       |   subprocess.run  |
                                |   enqueue(result) |
                                +-------------------+

关键设计决策:

  1. 主循环保持单线程——不引入并发到 Agent 推理路径中,只有 I/O 密集的子进程执行被并行化。
  2. daemon 线程——后台线程设为 daemon,主进程退出时自动终止,无需显式清理。
  3. 通知队列而非回调——后台线程不直接修改 messages 列表,而是将结果推入队列,主循环在安全时机(下一次 LLM 调用前)统一消费。

BackgroundManager 实现

核心数据结构:

python
class BackgroundManager:
    def __init__(self):
        self.tasks = {}                # task_id -> {status, result, command}
        self._notification_queue = []  # 已完成任务的结果
        self._lock = threading.Lock()  # 保护通知队列

三个字段各司其职:tasks 存储所有任务的全生命周期状态(用于主动查询),_notification_queue 存储尚未被主循环消费的完成通知(用于被动注入),_lock 保证队列的线程安全。

工具设计:三件套

后台执行只需要 3 个工具:

background_run——启动后台任务,立即返回 task_id:

python
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {
        "status": "running",
        "result": None,
        "command": command
    }
    thread = threading.Thread(
        target=self._execute,
        args=(task_id, command),
        daemon=True
    )
    thread.start()
    return f"Background task {task_id} started: {command[:80]}"

这是 fire-and-forget 模式:模型得到 task_id 后立即进入下一步推理,不等待执行结果。task_id 使用 UUID 前 8 位,足够在一个会话内唯一标识。

check_background——查询任务状态:

python
def check(self, task_id: str = None) -> str:
    if task_id:
        t = self.tasks.get(task_id)
        if not t:
            return f"Error: Unknown task {task_id}"
        return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
    # 省略 task_id 时列出所有任务
    lines = []
    for tid, t in self.tasks.items():
        lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
    return "\n".join(lines) if lines else "No background tasks."

模型可以主动调用此工具检查某个任务是否完成,但这不是获取结果的主要路径——通知注入机制(11.3 节)会自动将结果送到模型面前。

bg_cancel(可选)——取消任务。在 Python threading 模型下,无法直接杀死线程,实际实现通常是设置一个 cancelled 标志位,让 _execute 方法定期检查:

python
def cancel(self, task_id: str) -> str:
    t = self.tasks.get(task_id)
    if not t:
        return f"Error: Unknown task {task_id}"
    if t["status"] != "running":
        return f"Task {task_id} already {t['status']}"
    t["cancelled"] = True
    return f"Cancellation requested for {task_id}"

这揭示了线程模型的固有限制:Python 线程不支持强制终止,取消只能是 cooperative 的。下文 11.4 节会讨论 async 模型如何解决这个问题。

后台执行的线程安全

_execute 方法是后台线程的入口,运行在独立线程中:

python
def _execute(self, task_id: str, command: str):
    try:
        r = subprocess.run(
            command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300
        )
        output = (r.stdout + r.stderr).strip()[:50000]
        status = "completed"
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
        status = "timeout"
    except Exception as e:
        output = f"Error: {e}"
        status = "error"

    self.tasks[task_id]["status"] = status
    self.tasks[task_id]["result"] = output or "(no output)"

    with self._lock:
        self._notification_queue.append({
            "task_id": task_id,
            "status": status,
            "command": command[:80],
            "result": (output or "(no output)")[:500],
        })

两处写操作的线程安全性值得分析:

  1. self.tasks[task_id] 的写入——没有加锁。这在 CPython 中是安全的,因为 dict 的单键赋值是 GIL 保护的原子操作。但严格来说这依赖 CPython 实现细节,生产代码应该使用锁或 concurrent.futures
  2. self._notification_queue.append——显式加锁。必须加锁,因为 drain_notifications 在主线程中会 clear() 队列,append 和 clear 的竞争必须互斥。

结果截断到 500 字符推入通知队列,但 tasks 字典中保留完整输出(最多 50000 字符)。这是因为通知注入会占用 context window 空间,应该只推送摘要;模型如果需要完整输出,可以主动调用 check_background 获取。


11.3 通知注入机制

核心问题:后台结果如何进入模型视野

后台任务完成时,主循环可能正在做其他事(等待 LLM 响应、执行其他工具)。问题是:如何在不打断当前流程的前提下,让模型在"下一个自然时机"感知到后台结果?

两种方案:

方案机制优缺点
模型主动轮询模型定期调用 check_background浪费 LLM 调用次数,模型可能忘记轮询
系统注入通知harness 在每轮 LLM 调用前自动注入零额外 LLM 调用,模型被动感知

正确答案是后者——模型不需要"记住"有后台任务在跑,harness 负责在合适的时机将结果送到模型面前。

drain-and-inject 模式

实现只需要在 Agent Loop 的 LLM 调用前插入一段 drain 逻辑:

python
def agent_loop(messages: list):
    while True:
        # === 关键:在每次 LLM 调用前,排空通知队列 ===
        notifs = BG.drain_notifications()
        if notifs and messages:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['status']}: {n['result']}"
                for n in notifs
            )
            messages.append({
                "role": "user",
                "content": f"<background-results>\n{notif_text}\n</background-results>"
            })
            messages.append({
                "role": "assistant",
                "content": "Noted background results."
            })

        response = client.messages.create(
            model=MODEL, system=SYSTEM, messages=messages,
            tools=TOOLS, max_tokens=8000,
        )
        # ... 后续 tool dispatch 逻辑

drain_notifications 是原子操作——取出所有待处理通知并清空队列:

python
def drain_notifications(self) -> list:
    with self._lock:
        notifs = list(self._notification_queue)
        self._notification_queue.clear()
    return notifs

注入格式的设计细节

注入的消息使用 <background-results> XML 标签包裹,这不是随意选择:

  1. XML 标签作为语义边界——模型能清晰区分"用户对话"和"系统通知",不会将后台结果误解为用户的新输入。
  2. 伪造 assistant 回复——在注入 user 消息后,紧跟一条 "Noted background results." 的 assistant 消息。这是因为 API 要求消息交替出现(user → assistant → user → ...)。这条伪造回复让模型"确认"收到了通知,同时不影响后续的正常消息流。
  3. 结果截断——通知中的 result 被截断到 500 字符。这是 context window 经济学:后台任务可能产生数万字符的输出(例如 pytest 的完整报告),但模型通常只需要知道"通过/失败"即可决定下一步。完整输出通过 check_background 按需获取。

时序图

mermaid
sequenceDiagram
    participant User
    participant MainLoop as Main Loop
    participant BG as Background Thread
    participant Queue as Notification Queue
    participant LLM

    User->>MainLoop: "跑测试,同时创建配置"
    MainLoop->>LLM: messages
    LLM-->>MainLoop: tool_use(background_run "pytest")
    MainLoop->>BG: spawn daemon thread
    BG-->>MainLoop: task_id=a1b2c3d4
    MainLoop->>LLM: tool_result "task a1b2c3d4 started"
    LLM-->>MainLoop: tool_use(write_file "config.yaml")
    MainLoop->>MainLoop: execute write_file
    Note over BG: pytest running...
    MainLoop->>LLM: tool_result "wrote config.yaml"
    LLM-->>MainLoop: "配置文件已创建,测试在后台运行"

    Note over BG: pytest finishes
    BG->>Queue: enqueue({task_id, status, result})

    User->>MainLoop: "测试结果怎样?"
    MainLoop->>Queue: drain_notifications()
    Queue-->>MainLoop: [{task_id: a1b2c3d4, status: completed, ...}]
    MainLoop->>LLM: inject <background-results> + user query
    LLM-->>MainLoop: "42 tests passed, 0 failed"

批量通知的合并策略

当多个后台任务在同一个 drain 周期内完成时,所有结果被合并为一条 user 消息注入。这比每个结果单独注入更高效:

text
<background-results>
[bg:a1b2c3d4] completed: Tests: 42 passed, 0 failed
[bg:e5f6g7h8] completed: Lint: 0 errors, 2 warnings
[bg:i9j0k1l2] timeout: Error: Timeout (300s)
</background-results>

模型在一次推理中同时看到所有结果,可以做出综合判断("测试通过了,lint 有 2 个 warning,Docker build 超时了——先处理 warning"),而不是对每个结果分别做一次 LLM 调用。

通知 vs 轮询的 token 成本对比

假设 3 个后台任务,每个运行 60 秒,交错完成:

轮询模式(模型每 20 秒调用一次 check_background):

  • 9 次 LLM 调用(3 个任务 x 3 次轮询),每次都要重新发送完整 context
  • 大部分调用的结果是 "still running"——纯浪费

通知注入模式

  • 0 次额外 LLM 调用
  • 3 个结果在下一次自然 LLM 调用前一次性注入
  • 注入的文本约 200-500 token,远小于一次额外 LLM 调用的成本

Claude Code 的 run_in_background 参数正是这个模式的工程化实现——用户在 Bash 工具调用中设置 run_in_background: true,命令在后台执行,结果在完成后自动注入到 Agent 的下一轮上下文中。


11.4 异步执行模型对比

线程模型:简单但受限

上文的 Python 实现使用 threading.Thread + subprocess.run,这是最简单的后台执行模型:

python
thread = threading.Thread(
    target=self._execute, args=(task_id, command),
    daemon=True
)
thread.start()

优势

  • 实现简单,无需异步运行时
  • 与 subprocess 自然兼容(subprocess.run 本身是阻塞的,放在线程里就变成了"非阻塞")
  • CPython 的 GIL 对 I/O 密集任务影响不大(线程在等待子进程时释放 GIL)

局限

  • 取消困难——无法强制终止线程,只能用标志位做 cooperative cancellation
  • 资源浪费——每个后台任务占用一个 OS 线程(通常 8MB 栈空间),无法支撑大量并发任务
  • 错误传播复杂——线程中的异常不会传播到主线程,需要手动通过队列传递

事件驱动模型:tokio + async

Rust 生态的 Agent 系统(如 Codex CLI)使用 tokio 异步运行时,后台任务用 tokio::spawn 而非 OS 线程:

rust
tokio::spawn(async move {
    check_for_update(&version_file)
        .await
        .inspect_err(|e| tracing::error!("Failed: {e}"))
});

tokio::spawn 创建的是绿色线程(green thread),运行在少量 OS 线程组成的线程池上。一个线程池可以承载数万个并发 task,每个 task 的内存开销仅约几百字节(对比 OS 线程的 8MB)。

取消机制:CancellationToken

线程模型中取消是 cooperative 的,需要手动检查标志位。tokio 提供了结构化的取消机制——CancellationToken

rust
use tokio_util::sync::CancellationToken;

pub enum CancelErr {
    Cancelled,
}

pub trait OrCancelExt: Sized {
    type Output;
    async fn or_cancel(
        self,
        token: &CancellationToken
    ) -> Result<Self::Output, CancelErr>;
}

实现基于 tokio::select! 宏——同时等待任务完成和取消信号,谁先到就走谁的分支:

rust
async fn or_cancel(
    self,
    token: &CancellationToken
) -> Result<Self::Output, CancelErr> {
    tokio::select! {
        _ = token.cancelled() => Err(CancelErr::Cancelled),
        res = self => Ok(res),
    }
}

这段代码的精髓在于:取消不需要"杀死"任务,而是让任务和取消信号"竞赛"——如果取消信号先到达,select! 会 drop 掉 future,Rust 的所有权系统保证资源被自动清理。

使用方式极其优雅:

rust
let token = CancellationToken::new();

// 在某个地方触发取消
token.cancel();

// 任务执行时自动响应取消
let result = some_long_running_task()
    .or_cancel(&token)
    .await;

match result {
    Ok(value) => handle_success(value),
    Err(CancelErr::Cancelled) => handle_cancellation(),
}

Readiness Flag:异步就绪通知

生产级系统还需要解决"如何知道后台组件已就绪"的问题。一种常见模式是 ReadinessFlag——一个带订阅/通知机制的原子标志:

rust
pub struct ReadinessFlag {
    ready: AtomicBool,              // 原子布尔,快速读路径
    next_id: AtomicI32,             // token ID 生成器
    tokens: Mutex<HashSet<Token>>,  // 活跃订阅集合
    tx: watch::Sender<bool>,        // 广播就绪信号
}

工作流程:

  1. 需要等待就绪的组件调用 subscribe() 获取一个 Token
  2. 后台初始化完成后调用 mark_ready(token) 标记就绪
  3. 等待方调用 wait_ready().await 异步等待

这比简单的 AtomicBool 多了两个关键特性:授权机制(只有持有合法 Token 的组件才能标记就绪)和异步等待(通过 watch channel 广播,等待方不需要 busy-loop)。

当没有任何订阅者时,is_ready() 直接返回 true——这避免了"所有组件都没有订阅但系统永远等待就绪"的死锁。

两种模型的决策矩阵

维度线程模型 (Python threading)事件驱动 (tokio async)
实现复杂度低(30 行核心代码)中(需要 async runtime 知识)
并发上限~100 线程(受 OS 限制)~100K task(绿色线程)
内存/task~8MB(OS 线程栈)~几百字节(Future 状态机)
取消机制Cooperative(标志位)Structured(CancellationToken + select!)
资源清理手动(daemon 标志兜底)自动(Rust 所有权 + Drop)
错误传播通过队列手动传递Result 类型 + JoinHandle
适用场景原型、少量后台任务(<10)生产系统、大量并发 I/O

各 Agent 系统的异步策略

Claude Code——TypeScript 实现,基于 Node.js 的事件循环。后台任务通过 run_in_background 参数声明式触发,底层是 child_process.spawn + EventEmitter 通知。Node.js 的单线程事件循环天然适合 I/O 密集的后台任务管理,不需要显式的线程同步。

Codex CLI——Rust 实现,tokio 异步运行时。上文分析的 OrCancelExt trait 和 ReadinessFlag 就来自其 codex-async-utilsreadiness 模块。命令执行通过 PTY(伪终端)+ tokio 异步读写,支持实时流式输出和结构化取消。版本检查等网络 I/O 也通过 tokio::spawn 放入后台,不阻塞 TUI 启动。

OpenCode——Go 实现,goroutine + channel。Go 的 goroutine 是语言内置的绿色线程,go func() 一行就能启动后台任务,chan 作为类型安全的通知队列。context.Context 提供链式取消(父 context 取消时所有子 goroutine 自动取消)。在概念上最接近"语言原生支持后台执行"。

Python Agent 框架——asyncio + subprocess.create_subprocess_exec。asyncio 是 Python 的协程框架,用 await 代替线程切换。但 asyncio 在 Python 生态中的采用率远低于 threading,原因是大量库(包括 subprocess.run)是同步的,需要用 loop.run_in_executor 桥接。实际的 Python Agent 框架(如上文的实现)更倾向于直接用 threading,因为 subprocess 交互本身就是 I/O 密集的,线程模型足够胜任。

后台执行的边界条件

幂等性——后台任务的结果注入是"尽力而为"的。如果任务完成时主循环已经退出(用户 Ctrl+C),结果会丢失。对于幂等操作(如测试、lint)这无所谓;对于有副作用的操作(如部署),需要额外的持久化机制(写日志文件)。

输出竞争——多个后台任务同时写 stdout/stderr 时,输出可能交错。capture_output=True 避免了这个问题——每个子进程的输出被独立捕获,不会混入主进程的 stdout。

超时传播——后台任务的超时(300 秒)独立于主循环的超时。这意味着一个后台任务可能在主循环已经完成并退出后才超时。daemon 线程的特性保证了这种情况下进程仍然能正常退出。

Context window 压力——如果大量后台任务同时完成,一次 drain 注入的文本可能很长。实现中将每条通知的 result 截断到 500 字符,但 10 个任务同时完成仍会注入约 5000 token。生产系统可能需要更激进的截断策略,或者将超过阈值的通知分批注入。