Skip to content

第 14 章 自治 Agent —— 从被动到主动

Agent 自治的核心跃迁不是"执行力更强",而是主动发现工作:Teammate 自己扫描看板、认领就绪任务、完成后继续寻找下一个,无需 Lead 逐个分配。


14.1 被动 vs 主动

被动模型:Lead 驱动

前面章节中,多 Agent 协作遵循严格的指令链:

text
用户 → Lead Agent → spawn teammate(name, role, prompt) → Teammate 执行 prompt 指定的任务 → 结束

10 个未认领的任务在看板上?Lead 必须逐一执行 spawn_teammate,为每个 Teammate 指定具体的 prompt。Lead 本身成为瓶颈——它的上下文窗口不断膨胀(每次 spawn 都是一轮完整的推理 + tool_use),决策质量随上下文膨胀而下降。

被动模型的症结是任务分配的中心化。每一条"谁做什么"的决策都必须经过 Lead,而 Lead 本身是一个受限于上下文窗口和推理准确度的 LLM。任务数量一旦超过 Lead 的有效管理范围,系统就开始出错——遗漏任务、重复分配、分配给不合适的 Teammate。

主动模型:Teammate 自驱

自治模式下,任务分配去中心化。Teammate 的行为模式变为:

text
Teammate 完成当前工作 → 进入 IDLE → 主动扫描看板 → 发现 pending + unblocked 任务 → 自动认领 → 开始工作

Lead 的角色从"任务分配器"退化为"任务创建者"——Lead 只需要把任务写入看板,不需要关心由谁执行。这个转变看似微小,但本质上是从主从架构到去中心化自组织的范式转移

两种模型的关键差异:

维度被动模型主动模型
任务分配Lead 逐一 spawnTeammate 自主认领
Lead 负载随任务数线性增长恒定(只写任务)
Teammate 生命周期单任务后终止持续循环直到无工作
扩展性受限于 Lead 上下文窗口Teammate 数量独立扩展
容错Lead 忘记 spawn → 任务遗漏任务在看板上永远可见

14.2 自治循环

Teammate 生命周期:三阶段状态机

自治 Teammate 的核心是一个两阶段循环——WORK 和 IDLE——加上一个终止条件:

mermaid
stateDiagram-v2
    [*] --> WORK: spawn(name, role, prompt)
    WORK --> IDLE: stop_reason != tool_use<br>或调用 idle 工具
    IDLE --> WORK: 收到 inbox 消息
    IDLE --> WORK: 发现并认领 unclaimed 任务
    IDLE --> SHUTDOWN: 60s 超时无新任务
    WORK --> SHUTDOWN: 收到 shutdown_request
    SHUTDOWN --> [*]

WORK 阶段是标准的 Agent Loop——LLM 推理、调用工具、处理结果,循环直到模型停止发出 tool_use 或主动调用 idle 工具:

python
while True:
    # -- WORK PHASE --
    for _ in range(50):  # 单次工作阶段的安全上限
        # 穿插检查 inbox,响应 shutdown_request
        inbox = bus.read_inbox(name)
        for msg in inbox:
            if msg.get("type") == "shutdown_request":
                set_status(name, "shutdown")
                return

        response = client.messages.create(
            model=MODEL, system=sys_prompt,
            messages=messages, tools=tools, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            break  # 模型认为工作完成,进入 IDLE

        # 执行工具调用
        idle_requested = False
        for block in response.content:
            if block.type == "tool_use":
                if block.name == "idle":
                    idle_requested = True
                else:
                    output = execute_tool(name, block.name, block.input)
                # ... 收集 tool_result

        if idle_requested:
            break  # 模型主动请求进入 IDLE

注意 idle 工具的设计——它不执行任何副作用,仅仅作为一个信号量,让模型显式声明"我没有更多工作了"。这比依赖 stop_reason 更可靠,因为模型有时会在回复文本后自然停止(end_turn),但这并不意味着它认为工作完成。

Idle Cycle:定期扫描看板

IDLE 阶段是自治的核心。Teammate 进入一个轮询循环,交替检查两个工作来源:

python
set_status(name, "idle")
resume = False
polls = IDLE_TIMEOUT // POLL_INTERVAL  # 60s / 5s = 12 次

for _ in range(polls):
    time.sleep(POLL_INTERVAL)  # 5 秒间隔

    # 来源 1:inbox 消息(Lead 或其他 Teammate 发来的新指令)
    inbox = bus.read_inbox(name)
    if inbox:
        for msg in inbox:
            if msg.get("type") == "shutdown_request":
                set_status(name, "shutdown")
                return
            messages.append({"role": "user", "content": json.dumps(msg)})
        resume = True
        break

    # 来源 2:看板上的 unclaimed 任务
    unclaimed = scan_unclaimed_tasks()
    if unclaimed:
        task = unclaimed[0]
        claim_task(task["id"], name)
        messages.append({
            "role": "user",
            "content": f"<auto-claimed>Task #{task['id']}: {task['subject']}\n"
                       f"{task.get('description', '')}</auto-claimed>"
        })
        resume = True
        break

if not resume:
    set_status(name, "shutdown")  # 60s 内无新工作 → 自动关闭
    return
set_status(name, "working")

两个关键设计决策:

1. 检查顺序:inbox 优先于看板。 inbox 消息可能包含 shutdown_request 或来自 Lead 的紧急指令,必须先处理。如果看板优先,Teammate 可能在被要求关闭时反而去认领新任务。

2. 超时关闭(60s)。 没有无限轮询——如果 60 秒内没有新工作,Teammate 自动 shutdown。这防止了空转浪费资源(每次轮询虽然不调用 LLM,但线程和内存开销依然存在)。超时时间是一个需要调优的参数:太短导致 Teammate 频繁重启,太长导致资源浪费。

Auto-claim:从扫描到认领

任务看板是一组 JSON 文件,每个文件代表一个任务。扫描逻辑只关心三个条件:

python
def scan_unclaimed_tasks() -> list:
    unclaimed = []
    for f in sorted(tasks_dir.glob("task_*.json")):
        task = json.loads(f.read_text())
        if (task.get("status") == "pending"        # 状态为待处理
                and not task.get("owner")           # 未被认领
                and not task.get("blockedBy")):     # 无依赖阻塞
            unclaimed.append(task)
    return unclaimed

sorted() 确保按文件名排序(task_1.json < task_2.json),这意味着默认的认领策略是 FIFO(先进先出)——编号小的任务先被认领。

认领操作通过写入 owner 字段和修改 status 完成:

python
def claim_task(task_id: int, owner: str) -> str:
    with claim_lock:  # threading.Lock
        path = tasks_dir / f"task_{task_id}.json"
        task = json.loads(path.read_text())
        task["owner"] = owner
        task["status"] = "in_progress"
        path.write_text(json.dumps(task, indent=2))
    return f"Claimed task #{task_id} for {owner}"

工具数量的演进

从最小可用 Agent(第 2 章的 1 个 bash 工具)到自治 Agent,工具数量的增长反映了能力层的叠加:

阶段工具数新增工具新增能力
最小循环1bash基础执行
文件操作4read_file, write_file, edit_file持久化读写
多 Agent 通信8send_message, read_inbox, broadcast, spawn_teammate团队协作
协议层12list_teammates, shutdown_request/response, plan_approval协调控制
自治层14idle, claim_task自主工作发现

14 个工具是 Lead 的完整工具集。Teammate 侧有 10 个工具(不含 spawn_teammate、list_teammates、broadcast,外加 idle 和 claim_task)。注意 Teammate 不能 spawn 其他 Teammate——这防止了无限递归 spawn 的风险。


14.3 竞争条件与协调

多 Teammate 同时认领的问题

当多个 Teammate 同时处于 IDLE 阶段时,它们的轮询周期可能重叠。两个 Teammate 可能在同一个 5 秒窗口内都发现 task_3 是 unclaimed 的,然后都尝试认领。如果没有同步机制,结果是 last-write-wins——两个 Teammate 都认为自己认领成功,但只有最后写入文件的那个是真正的 owner,另一个在不知情的情况下重复执行相同任务。

这是经典的 TOCTOU(Time of Check to Time of Use)竞争条件:

text
Teammate A: scan() → 发现 task_3 pending, no owner ← check
Teammate B: scan() → 发现 task_3 pending, no owner ← check(A 还没写入)
Teammate A: claim_task(3, "alice") → 写入 owner="alice" ← use
Teammate B: claim_task(3, "bob")  → 覆盖 owner="bob"  ← use(覆盖了 A 的写入)

Owner 字段的原子更新

解决方案是让 claim 操作原子化。在单进程多线程场景下,最简单的方式是 threading.Lock

python
_claim_lock = threading.Lock()

def claim_task(task_id: int, owner: str) -> str:
    with _claim_lock:
        path = tasks_dir / f"task_{task_id}.json"
        if not path.exists():
            return f"Error: Task {task_id} not found"
        task = json.loads(path.read_text())
        if task.get("owner"):  # 已被认领 → 放弃
            return f"Task {task_id} already claimed by {task['owner']}"
        task["owner"] = owner
        task["status"] = "in_progress"
        path.write_text(json.dumps(task, indent=2))
    return f"Claimed task #{task_id} for {owner}"

关键是 _claim_lock 的临界区包含了读取 + 检查 + 写入的完整过程。任何时刻只有一个线程能进入临界区,消除了 TOCTOU 窗口。

但当前实现有一个微妙的缺陷:claim_task 内部虽然加了锁,但 scan_unclaimed_tasks 在锁外面运行。也就是说,Teammate A 扫描到 task_3 之后、进入 claim_task 之前的窗口里,Teammate B 可能已经认领了 task_3。修复方式是在 claim_task 中增加二次检查——如果进入临界区后发现 task 已有 owner,返回失败而非覆盖。上面的代码已经展示了这个修复(if task.get("owner"))。

三种并发控制策略对比

对于更复杂的部署场景(多进程、分布式),需要不同的同步机制:

策略适用场景实现方式代价
threading.Lock单进程多线程Python 内存锁最简单;进程崩溃锁自动释放
文件锁 (fcntl.flock)多进程单机OS 级锁定 task 文件需处理 stale lock(进程异常退出)
乐观锁 (CAS)分布式 / 数据库版本号 + compare-and-swap高并发下重试率高

乐观锁的典型实现:

python
def claim_task_optimistic(task_id: int, owner: str) -> bool:
    task = load_task(task_id)
    if task["owner"] or task["status"] != "pending":
        return False
    old_version = task["version"]
    task["owner"] = owner
    task["status"] = "in_progress"
    task["version"] = old_version + 1
    # 写入时检查版本号是否被其他进程修改
    return compare_and_swap(task_id, old_version, task)

乐观锁不阻塞——所有 Teammate 都可以同时读取,但只有第一个成功写入的 Teammate 拿到任务。失败的 Teammate 需要重新扫描。

任务"抢占"策略

除了先到先得(FIFO + Lock),还有更智能的任务分配策略:

先到先得(First Come First Served)

当前实现的策略。优点是简单、无需额外元数据。缺点是可能出现"能力错配"——一个擅长前端的 Teammate 可能认领了后端任务,而擅长后端的 Teammate 随后只能拿到前端任务。

能力匹配(Capability Matching)

每个 Teammate 的 role 字段可以用于匹配任务标签:

python
def scan_tasks_for_role(role: str) -> list:
    unclaimed = scan_unclaimed_tasks()
    # 优先返回 tags 匹配 role 的任务
    matched = [t for t in unclaimed if role in t.get("tags", [])]
    return matched if matched else unclaimed  # 无匹配时退化为 FIFO

这需要任务创建时带上 tags 字段(如 ["backend", "database"]),Teammate 的 role(如 "backend")作为匹配条件。无匹配时退化为 FIFO,确保任务不会因为没有精确匹配的 Teammate 而饿死。

优先级调度

任务带 priority 字段,Teammate 总是认领最高优先级的任务:

python
unclaimed = scan_unclaimed_tasks()
unclaimed.sort(key=lambda t: t.get("priority", 0), reverse=True)
task = unclaimed[0]

三种策略可以组合使用——先按优先级排序,同优先级内按能力匹配,都相同时 FIFO。


14.4 从 always-on session 到 always-on assistant

当前限制:会话级存活

前面描述的自治循环有一个本质限制——它绑定在一个运行中的进程上。Python 进程退出,所有 Teammate 线程随之消亡。60 秒的 idle 超时进一步限制了 Agent 的存活时间。

这意味着 Agent 是"会话级"的——用户启动会话、Agent 工作、用户关闭会话、Agent 消失。真正的 always-on assistant 需要超越会话生命周期,在没有用户交互的情况下持续存在和工作。

Heartbeat 机制:定期自动巡检

Heartbeat 是最简单的 always-on 模式。Agent 不再在 60 秒后 shutdown,而是以更长的间隔(如每 5 分钟)执行一次巡检:

python
HEARTBEAT_INTERVAL = 300  # 5 分钟

def heartbeat_loop(agent_config):
    while True:
        time.sleep(HEARTBEAT_INTERVAL)
        # 1. 检查看板是否有新任务
        unclaimed = scan_unclaimed_tasks()
        if unclaimed:
            process_tasks(unclaimed, agent_config)

        # 2. 检查 inbox 是否有新消息
        inbox = bus.read_inbox(agent_config["name"])
        if inbox:
            handle_messages(inbox, agent_config)

        # 3. 检查已认领任务的状态(是否卡住/超时)
        check_owned_tasks(agent_config["name"])

Heartbeat 和 Idle Cycle 的关键区别:

维度Idle CycleHeartbeat
超时行为无新工作 → shutdown永不 shutdown
轮询间隔5 秒(短)5 分钟(长)
资源消耗轻量线程长驻进程/守护进程
LLM 调用仅在发现工作时仅在发现工作时
适用场景一次性任务批次持续监控

Heartbeat 模式下,Agent 的开销主要是进程保活和定期的文件系统扫描,不涉及 LLM 调用——只有在发现新工作时才触发完整的 Agent Loop。

Cron 调度:Agent 为自己安排未来任务

更进一步,Agent 可以创建定时任务——在看板上写入带有 scheduled_at 字段的任务,由调度器在指定时间激活:

python
# Agent 创建一个定时任务
task = {
    "id": next_id(),
    "subject": "每日代码质量检查",
    "description": "运行 lint 和测试套件,报告新增问题",
    "status": "scheduled",        # 尚未 pending
    "scheduled_at": "2025-01-15T09:00:00Z",
    "recurrence": "daily",        # 每天重复
    "owner": None,
}
write_task(task)

调度器(独立进程或 cron job)定期检查 scheduled 任务,到时间后将 status 改为 pending

python
def activate_scheduled_tasks():
    now = datetime.utcnow()
    for task in load_all_tasks():
        if task["status"] == "scheduled":
            scheduled = datetime.fromisoformat(task["scheduled_at"])
            if now >= scheduled:
                task["status"] = "pending"
                if task.get("recurrence") == "daily":
                    task["scheduled_at"] = (scheduled + timedelta(days=1)).isoformat()
                    # status 保持 scheduled,同时创建一个新的 pending 副本
                    create_pending_copy(task)
                save_task(task)

这使 Agent 具备了自我规划能力——不仅响应当前的任务,还能为未来安排工作。典型用例:

  • 每日凌晨运行测试套件,发现回归立即创建修复任务
  • 每周生成代码覆盖率报告
  • 在 PR 合并后 10 分钟自动执行部署后检查

多频道 IM 路由

always-on assistant 的另一个维度是多入口。用户可能从不同的通讯渠道发来指令——Slack 消息、Telegram 命令、WhatsApp 文本、Discord bot。Agent 需要一个统一的入口将这些消息路由到 inbox:

mermaid
flowchart LR
    S[Slack Webhook] --> R[Message Router]
    T[Telegram Bot API] --> R
    W[WhatsApp Business API] --> R
    D[Discord Gateway] --> R
    R --> I[统一 Inbox<br>JSONL 格式]
    I --> A[Agent<br>Heartbeat Loop]
    A --> I2[统一 Outbox]
    I2 --> S2[Slack]
    I2 --> T2[Telegram]
    I2 --> W2[WhatsApp]
    I2 --> D2[Discord]

路由层的核心职责:

  1. 协议归一化:不同平台的消息格式各异,路由层将它们统一为 Agent 的 inbox 格式(JSON 对象,包含 fromcontentchanneltimestamp
  2. 回复路由:Agent 的回复需要发送回正确的频道。outbox 消息携带 channel 字段,路由层根据该字段选择对应的 API 发送回复
  3. 频道隔离/合并:可以为每个频道维护独立的对话上下文(隔离),也可以将所有频道的消息汇入同一个 Agent 上下文(合并)

频道合并模式下的一个实际问题是上下文混淆——来自 Slack 的技术讨论和来自 Telegram 的日常闲聊混在同一个 messages[] 中,可能导致 Agent 在技术回复中使用过于随意的语气,或在闲聊中引入不相关的技术细节。解决方案是在每条消息前注入频道标签:

python
message = {
    "role": "user",
    "content": f"<channel:{channel_name}>{content}</channel:{channel_name}>"
}

14.5 自治 Agent 的失败模式

自治带来灵活性的同时,也引入了被动模型中不存在的失败模式。这些问题在测试环境中往往不明显,但在长时间运行或多 Teammate 场景下必然出现。

无限循环:同一任务反复认领/放弃

症状:Teammate 认领 task_5,工作一段时间后遇到错误,放弃并将 status 改回 pending。下一次 idle scan 时再次发现 task_5,再次认领,再次失败,如此循环。

根因:任务失败后没有累积失败信息。Teammate 每次认领时看到的都是一个"全新"的 pending 任务,没有任何先前失败的记忆。

解决方案:在任务上记录尝试次数和失败原因。

python
def release_task(task_id: int, owner: str, reason: str):
    task = load_task(task_id)
    task["status"] = "pending"
    task["owner"] = None
    task.setdefault("attempts", []).append({
        "by": owner,
        "reason": reason,
        "timestamp": time.time()
    })
    if len(task["attempts"]) >= MAX_RETRIES:
        task["status"] = "failed"  # 不再被扫描到
    save_task(task)

设定 MAX_RETRIES(例如 3 次),超过重试上限后将任务标记为 failed,从扫描结果中排除。同时,scan 阶段可以读取 attempts 字段,避免同一个 Teammate 重复认领自己已经失败过的任务:

python
def scan_tasks_for(name: str) -> list:
    unclaimed = scan_unclaimed_tasks()
    return [t for t in unclaimed
            if name not in [a["by"] for a in t.get("attempts", [])]]

任务饥饿:某些 Teammate 永远抢不到任务

症状:3 个 Teammate 同时运行,但 alice 完成了 80% 的任务,bob 和 charlie 几乎没有认领到任何任务。

根因:轮询周期不同步。alice 的 WORK 阶段恰好更短(任务简单或执行更快),导致她更频繁地进入 IDLE 并扫描看板。每次新任务出现时,alice 总是第一个发现。

分析:这本质上是调度公平性问题。FIFO + 先到先得策略下,扫描频率高的 Teammate 天然占优。

缓解方案

  1. 轮询抖动(Jitter):为每个 Teammate 的 POLL_INTERVAL 加入随机偏移,打散同步轮询:
python
import random
jittered_interval = POLL_INTERVAL + random.uniform(-1, 1)  # 4-6 秒
time.sleep(jittered_interval)
  1. 认领冷却期:Teammate 刚完成一个任务后,强制等待一段时间再扫描看板,给其他 Teammate 机会:
python
if just_completed_task:
    time.sleep(COOLDOWN)  # 例如 10 秒
  1. 负载均衡:scan 时考虑每个 Teammate 当前已认领的任务数量,已有活跃任务的 Teammate 优先级降低。这需要在 scan 阶段查询全局状态,增加了一定的复杂度。

上下文耗尽:长时间运行导致压缩质量下降

症状:Teammate 连续工作数小时后,行为开始退化——回复与 role 不相关、忘记正在做什么、重复执行已完成的步骤。

根因:messages[] 是 append-only 的。长时间工作意味着上下文窗口被填满,触发压缩(第 6 章)。压缩过程中,早期的关键信息(包括 Teammate 的身份和角色)可能被丢弃。

Identity re-injection:当检测到上下文被压缩(messages 数量异常减少),在消息历史头部重新注入身份信息:

python
def maybe_reinject_identity(messages, name, role, team_name):
    if len(messages) <= 3:  # 压缩后消息很少,身份信息可能丢失
        identity = {
            "role": "user",
            "content": f"<identity>You are '{name}', role: {role}, "
                       f"team: {team_name}. Continue your work.</identity>"
        }
        ack = {
            "role": "assistant",
            "content": f"I am {name}. Continuing."
        }
        messages.insert(0, identity)
        messages.insert(1, ack)

为什么同时插入 user 和 assistant 两条消息?因为 API 要求消息交替出现(user → assistant → user → ...)。如果只插入一条 user 消息,后面紧跟的下一条如果也是 user 消息,API 会报错。

len(messages) <= 3 是一个启发式阈值——正常的工作对话至少有初始 prompt + 若干轮 tool_use 交互,消息数远超 3。如果压缩后只剩 3 条以下,说明大量历史被丢弃,身份信息大概率已经丢失。

更根本的解决方案是定期重启 Teammate。与其让上下文无限膨胀后压缩,不如在每完成 N 个任务后主动 shutdown 并重新 spawn:

python
TASKS_PER_LIFECYCLE = 5

if completed_task_count >= TASKS_PER_LIFECYCLE:
    set_status(name, "shutdown")
    # 由外部调度器重新 spawn 一个全新的 Teammate
    return

这在本质上是用进程级隔离替代上下文级管理——每个 Teammate 实例是短命的,上下文永远保持清洁。代价是 spawn 的启动成本(一次完整的 LLM 调用来建立上下文),但对于长时间运行的系统,这个代价远低于上下文退化带来的错误成本。

失败模式总结

失败模式检测信号防御机制
无限循环同一任务的 attempts 持续增长MAX_RETRIES + failed 状态
任务饥饿Teammate 间完成任务数差异极大轮询抖动 + 认领冷却
上下文耗尽messages 数量骤降;回复与 role 不符Identity re-injection + 定期重启
死锁两个任务互相 blockedBy依赖图环检测
幽灵认领owner 指向已 shutdown 的 TeammateHeartbeat 超时释放 + 任务归还