第 11 章 后台执行 —— Agent 不等待
Agent 的工具调用默认是阻塞的——模型发出一条 tool_use,harness 执行完毕、拿到结果后才进入下一轮推理。后台执行的核心命题是打破这个串行瓶颈:慢操作交给独立线程/协程,主循环继续思考,完成后通过通知队列将结果注入模型的下一轮上下文。
11.1 阻塞式执行的代价
串行瓶颈的本质
标准 Agent Loop 的执行模型是严格同步的:
LLM call → tool_use(A) → execute A → tool_result → LLM call → tool_use(B) → execute B → ...每一步工具执行期间,模型处于空闲等待状态。这在快速工具(读文件、字符串操作)上不是问题,但在以下场景中代价巨大:
| 操作 | 典型耗时 | 阻塞影响 |
|---|---|---|
npm install | 30-120s | 模型空等,用户无交互 |
pytest (大型项目) | 60-300s | 一轮工具调用消耗整个超时窗口 |
docker build | 60-600s | 任务可能直接超时失败 |
git clone (大仓库) | 30-180s | 后续工作全部排队 |
并行需求的场景
用户的自然指令常常隐含并行意图:
- "安装依赖,同时帮我创建配置文件"
- "跑一下测试,然后开始写文档"(测试和写文档之间无依赖)
- "lint 代码和跑 type check,有报错再一起修"
阻塞式执行下,Agent 只能串行处理这些请求:先等 npm install 跑完 120 秒,再创建配置文件——即使这两件事之间零依赖。用户体验从"助手在帮我干活"退化为"我在等助手干活"。
超时风险叠加
更隐蔽的问题是超时叠加。假设 Agent 每次工具调用的超时是 120 秒,一个任务需要依次执行 3 个慢命令(每个 90 秒),串行执行总耗时 270 秒。如果系统对整个 Agent turn 有超时限制(例如 300 秒),留给模型推理的时间只剩 30 秒。而并行执行这 3 个命令,工具执行阶段只需 90 秒。
11.2 后台任务架构
核心设计:daemon 线程 + 通知队列
后台执行的架构只有两个组件:一个管理后台任务生命周期的 BackgroundManager,和一个线程安全的通知队列(notification queue)。
Main thread Background thread(s)
+-----------------------+ +-------------------+
| Agent Loop | | Task A (daemon) |
| LLM call | | subprocess.run |
| tool dispatch | | ... |
| ... | ←── | enqueue(result) |
| [drain queue] | +-------------------+
| LLM call (with | +-------------------+
| injected results) | | Task B (daemon) |
+-----------------------+ | subprocess.run |
| enqueue(result) |
+-------------------+关键设计决策:
- 主循环保持单线程——不引入并发到 Agent 推理路径中,只有 I/O 密集的子进程执行被并行化。
- daemon 线程——后台线程设为 daemon,主进程退出时自动终止,无需显式清理。
- 通知队列而非回调——后台线程不直接修改 messages 列表,而是将结果推入队列,主循环在安全时机(下一次 LLM 调用前)统一消费。
BackgroundManager 实现
核心数据结构:
class BackgroundManager:
def __init__(self):
self.tasks = {} # task_id -> {status, result, command}
self._notification_queue = [] # 已完成任务的结果
self._lock = threading.Lock() # 保护通知队列三个字段各司其职:tasks 存储所有任务的全生命周期状态(用于主动查询),_notification_queue 存储尚未被主循环消费的完成通知(用于被动注入),_lock 保证队列的线程安全。
工具设计:三件套
后台执行只需要 3 个工具:
background_run——启动后台任务,立即返回 task_id:
def run(self, command: str) -> str:
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {
"status": "running",
"result": None,
"command": command
}
thread = threading.Thread(
target=self._execute,
args=(task_id, command),
daemon=True
)
thread.start()
return f"Background task {task_id} started: {command[:80]}"这是 fire-and-forget 模式:模型得到 task_id 后立即进入下一步推理,不等待执行结果。task_id 使用 UUID 前 8 位,足够在一个会话内唯一标识。
check_background——查询任务状态:
def check(self, task_id: str = None) -> str:
if task_id:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
# 省略 task_id 时列出所有任务
lines = []
for tid, t in self.tasks.items():
lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
return "\n".join(lines) if lines else "No background tasks."模型可以主动调用此工具检查某个任务是否完成,但这不是获取结果的主要路径——通知注入机制(11.3 节)会自动将结果送到模型面前。
bg_cancel(可选)——取消任务。在 Python threading 模型下,无法直接杀死线程,实际实现通常是设置一个 cancelled 标志位,让 _execute 方法定期检查:
def cancel(self, task_id: str) -> str:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
if t["status"] != "running":
return f"Task {task_id} already {t['status']}"
t["cancelled"] = True
return f"Cancellation requested for {task_id}"这揭示了线程模型的固有限制:Python 线程不支持强制终止,取消只能是 cooperative 的。下文 11.4 节会讨论 async 模型如何解决这个问题。
后台执行的线程安全
_execute 方法是后台线程的入口,运行在独立线程中:
def _execute(self, task_id: str, command: str):
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=300
)
output = (r.stdout + r.stderr).strip()[:50000]
status = "completed"
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
status = "timeout"
except Exception as e:
output = f"Error: {e}"
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._lock:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
})两处写操作的线程安全性值得分析:
self.tasks[task_id]的写入——没有加锁。这在 CPython 中是安全的,因为 dict 的单键赋值是 GIL 保护的原子操作。但严格来说这依赖 CPython 实现细节,生产代码应该使用锁或concurrent.futures。self._notification_queue.append——显式加锁。必须加锁,因为drain_notifications在主线程中会clear()队列,append 和 clear 的竞争必须互斥。
结果截断到 500 字符推入通知队列,但 tasks 字典中保留完整输出(最多 50000 字符)。这是因为通知注入会占用 context window 空间,应该只推送摘要;模型如果需要完整输出,可以主动调用 check_background 获取。
11.3 通知注入机制
核心问题:后台结果如何进入模型视野
后台任务完成时,主循环可能正在做其他事(等待 LLM 响应、执行其他工具)。问题是:如何在不打断当前流程的前提下,让模型在"下一个自然时机"感知到后台结果?
两种方案:
| 方案 | 机制 | 优缺点 |
|---|---|---|
| 模型主动轮询 | 模型定期调用 check_background | 浪费 LLM 调用次数,模型可能忘记轮询 |
| 系统注入通知 | harness 在每轮 LLM 调用前自动注入 | 零额外 LLM 调用,模型被动感知 |
正确答案是后者——模型不需要"记住"有后台任务在跑,harness 负责在合适的时机将结果送到模型面前。
drain-and-inject 模式
实现只需要在 Agent Loop 的 LLM 调用前插入一段 drain 逻辑:
def agent_loop(messages: list):
while True:
# === 关键:在每次 LLM 调用前,排空通知队列 ===
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}"
for n in notifs
)
messages.append({
"role": "user",
"content": f"<background-results>\n{notif_text}\n</background-results>"
})
messages.append({
"role": "assistant",
"content": "Noted background results."
})
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
# ... 后续 tool dispatch 逻辑drain_notifications 是原子操作——取出所有待处理通知并清空队列:
def drain_notifications(self) -> list:
with self._lock:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs注入格式的设计细节
注入的消息使用 <background-results> XML 标签包裹,这不是随意选择:
- XML 标签作为语义边界——模型能清晰区分"用户对话"和"系统通知",不会将后台结果误解为用户的新输入。
- 伪造 assistant 回复——在注入 user 消息后,紧跟一条
"Noted background results."的 assistant 消息。这是因为 API 要求消息交替出现(user → assistant → user → ...)。这条伪造回复让模型"确认"收到了通知,同时不影响后续的正常消息流。 - 结果截断——通知中的 result 被截断到 500 字符。这是 context window 经济学:后台任务可能产生数万字符的输出(例如 pytest 的完整报告),但模型通常只需要知道"通过/失败"即可决定下一步。完整输出通过
check_background按需获取。
时序图
sequenceDiagram
participant User
participant MainLoop as Main Loop
participant BG as Background Thread
participant Queue as Notification Queue
participant LLM
User->>MainLoop: "跑测试,同时创建配置"
MainLoop->>LLM: messages
LLM-->>MainLoop: tool_use(background_run "pytest")
MainLoop->>BG: spawn daemon thread
BG-->>MainLoop: task_id=a1b2c3d4
MainLoop->>LLM: tool_result "task a1b2c3d4 started"
LLM-->>MainLoop: tool_use(write_file "config.yaml")
MainLoop->>MainLoop: execute write_file
Note over BG: pytest running...
MainLoop->>LLM: tool_result "wrote config.yaml"
LLM-->>MainLoop: "配置文件已创建,测试在后台运行"
Note over BG: pytest finishes
BG->>Queue: enqueue({task_id, status, result})
User->>MainLoop: "测试结果怎样?"
MainLoop->>Queue: drain_notifications()
Queue-->>MainLoop: [{task_id: a1b2c3d4, status: completed, ...}]
MainLoop->>LLM: inject <background-results> + user query
LLM-->>MainLoop: "42 tests passed, 0 failed"批量通知的合并策略
当多个后台任务在同一个 drain 周期内完成时,所有结果被合并为一条 user 消息注入。这比每个结果单独注入更高效:
<background-results>
[bg:a1b2c3d4] completed: Tests: 42 passed, 0 failed
[bg:e5f6g7h8] completed: Lint: 0 errors, 2 warnings
[bg:i9j0k1l2] timeout: Error: Timeout (300s)
</background-results>模型在一次推理中同时看到所有结果,可以做出综合判断("测试通过了,lint 有 2 个 warning,Docker build 超时了——先处理 warning"),而不是对每个结果分别做一次 LLM 调用。
通知 vs 轮询的 token 成本对比
假设 3 个后台任务,每个运行 60 秒,交错完成:
轮询模式(模型每 20 秒调用一次 check_background):
- 9 次 LLM 调用(3 个任务 x 3 次轮询),每次都要重新发送完整 context
- 大部分调用的结果是 "still running"——纯浪费
通知注入模式:
- 0 次额外 LLM 调用
- 3 个结果在下一次自然 LLM 调用前一次性注入
- 注入的文本约 200-500 token,远小于一次额外 LLM 调用的成本
Claude Code 的 run_in_background 参数正是这个模式的工程化实现——用户在 Bash 工具调用中设置 run_in_background: true,命令在后台执行,结果在完成后自动注入到 Agent 的下一轮上下文中。
11.4 异步执行模型对比
线程模型:简单但受限
上文的 Python 实现使用 threading.Thread + subprocess.run,这是最简单的后台执行模型:
thread = threading.Thread(
target=self._execute, args=(task_id, command),
daemon=True
)
thread.start()优势:
- 实现简单,无需异步运行时
- 与 subprocess 自然兼容(
subprocess.run本身是阻塞的,放在线程里就变成了"非阻塞") - CPython 的 GIL 对 I/O 密集任务影响不大(线程在等待子进程时释放 GIL)
局限:
- 取消困难——无法强制终止线程,只能用标志位做 cooperative cancellation
- 资源浪费——每个后台任务占用一个 OS 线程(通常 8MB 栈空间),无法支撑大量并发任务
- 错误传播复杂——线程中的异常不会传播到主线程,需要手动通过队列传递
事件驱动模型:tokio + async
Rust 生态的 Agent 系统(如 Codex CLI)使用 tokio 异步运行时,后台任务用 tokio::spawn 而非 OS 线程:
tokio::spawn(async move {
check_for_update(&version_file)
.await
.inspect_err(|e| tracing::error!("Failed: {e}"))
});tokio::spawn 创建的是绿色线程(green thread),运行在少量 OS 线程组成的线程池上。一个线程池可以承载数万个并发 task,每个 task 的内存开销仅约几百字节(对比 OS 线程的 8MB)。
取消机制:CancellationToken
线程模型中取消是 cooperative 的,需要手动检查标志位。tokio 提供了结构化的取消机制——CancellationToken:
use tokio_util::sync::CancellationToken;
pub enum CancelErr {
Cancelled,
}
pub trait OrCancelExt: Sized {
type Output;
async fn or_cancel(
self,
token: &CancellationToken
) -> Result<Self::Output, CancelErr>;
}实现基于 tokio::select! 宏——同时等待任务完成和取消信号,谁先到就走谁的分支:
async fn or_cancel(
self,
token: &CancellationToken
) -> Result<Self::Output, CancelErr> {
tokio::select! {
_ = token.cancelled() => Err(CancelErr::Cancelled),
res = self => Ok(res),
}
}这段代码的精髓在于:取消不需要"杀死"任务,而是让任务和取消信号"竞赛"——如果取消信号先到达,select! 会 drop 掉 future,Rust 的所有权系统保证资源被自动清理。
使用方式极其优雅:
let token = CancellationToken::new();
// 在某个地方触发取消
token.cancel();
// 任务执行时自动响应取消
let result = some_long_running_task()
.or_cancel(&token)
.await;
match result {
Ok(value) => handle_success(value),
Err(CancelErr::Cancelled) => handle_cancellation(),
}Readiness Flag:异步就绪通知
生产级系统还需要解决"如何知道后台组件已就绪"的问题。一种常见模式是 ReadinessFlag——一个带订阅/通知机制的原子标志:
pub struct ReadinessFlag {
ready: AtomicBool, // 原子布尔,快速读路径
next_id: AtomicI32, // token ID 生成器
tokens: Mutex<HashSet<Token>>, // 活跃订阅集合
tx: watch::Sender<bool>, // 广播就绪信号
}工作流程:
- 需要等待就绪的组件调用
subscribe()获取一个Token - 后台初始化完成后调用
mark_ready(token)标记就绪 - 等待方调用
wait_ready().await异步等待
这比简单的 AtomicBool 多了两个关键特性:授权机制(只有持有合法 Token 的组件才能标记就绪)和异步等待(通过 watch channel 广播,等待方不需要 busy-loop)。
当没有任何订阅者时,is_ready() 直接返回 true——这避免了"所有组件都没有订阅但系统永远等待就绪"的死锁。
两种模型的决策矩阵
| 维度 | 线程模型 (Python threading) | 事件驱动 (tokio async) |
|---|---|---|
| 实现复杂度 | 低(30 行核心代码) | 中(需要 async runtime 知识) |
| 并发上限 | ~100 线程(受 OS 限制) | ~100K task(绿色线程) |
| 内存/task | ~8MB(OS 线程栈) | ~几百字节(Future 状态机) |
| 取消机制 | Cooperative(标志位) | Structured(CancellationToken + select!) |
| 资源清理 | 手动(daemon 标志兜底) | 自动(Rust 所有权 + Drop) |
| 错误传播 | 通过队列手动传递 | Result 类型 + JoinHandle |
| 适用场景 | 原型、少量后台任务(<10) | 生产系统、大量并发 I/O |
各 Agent 系统的异步策略
Claude Code——TypeScript 实现,基于 Node.js 的事件循环。后台任务通过 run_in_background 参数声明式触发,底层是 child_process.spawn + EventEmitter 通知。Node.js 的单线程事件循环天然适合 I/O 密集的后台任务管理,不需要显式的线程同步。
Codex CLI——Rust 实现,tokio 异步运行时。上文分析的 OrCancelExt trait 和 ReadinessFlag 就来自其 codex-async-utils 和 readiness 模块。命令执行通过 PTY(伪终端)+ tokio 异步读写,支持实时流式输出和结构化取消。版本检查等网络 I/O 也通过 tokio::spawn 放入后台,不阻塞 TUI 启动。
OpenCode——Go 实现,goroutine + channel。Go 的 goroutine 是语言内置的绿色线程,go func() 一行就能启动后台任务,chan 作为类型安全的通知队列。context.Context 提供链式取消(父 context 取消时所有子 goroutine 自动取消)。在概念上最接近"语言原生支持后台执行"。
Python Agent 框架——asyncio + subprocess.create_subprocess_exec。asyncio 是 Python 的协程框架,用 await 代替线程切换。但 asyncio 在 Python 生态中的采用率远低于 threading,原因是大量库(包括 subprocess.run)是同步的,需要用 loop.run_in_executor 桥接。实际的 Python Agent 框架(如上文的实现)更倾向于直接用 threading,因为 subprocess 交互本身就是 I/O 密集的,线程模型足够胜任。
后台执行的边界条件
幂等性——后台任务的结果注入是"尽力而为"的。如果任务完成时主循环已经退出(用户 Ctrl+C),结果会丢失。对于幂等操作(如测试、lint)这无所谓;对于有副作用的操作(如部署),需要额外的持久化机制(写日志文件)。
输出竞争——多个后台任务同时写 stdout/stderr 时,输出可能交错。capture_output=True 避免了这个问题——每个子进程的输出被独立捕获,不会混入主进程的 stdout。
超时传播——后台任务的超时(300 秒)独立于主循环的超时。这意味着一个后台任务可能在主循环已经完成并退出后才超时。daemon 线程的特性保证了这种情况下进程仍然能正常退出。
Context window 压力——如果大量后台任务同时完成,一次 drain 注入的文本可能很长。实现中将每条通知的 result 截断到 500 字符,但 10 个任务同时完成仍会注入约 5000 token。生产系统可能需要更激进的截断策略,或者将超过阈值的通知分批注入。