Skip to content

第 10 章 Task System —— 从 Flat List 到 DAG

TodoWrite 是草稿纸,Task System 是项目管理工具——前者是内存中的扁平清单,后者是磁盘上的有向无环图(DAG),能表达依赖、并行、阻塞三种关系,并在上下文压缩和进程重启后依然存活。

10.1 任务图(DAG)的数据模型

为什么扁平清单不够用

第 6 章的 TodoWrite 解决了"Agent 没有计划"的问题,但它的数据模型是一个扁平的条目列表,每个条目只有 descriptiondone/not-done 两种状态。这在单会话、线性任务中足够,但面对真实的多步工程任务时,三个根本性缺陷暴露无遗:

  1. 无法表达依赖:任务 B 必须等任务 A 完成才能开始,扁平清单无法表达这种前后约束。Agent 可能在 A 还没做完时就跳去做 B,导致错误级联。
  2. 无法表达并行:任务 C 和 D 互不依赖、可以同时执行,但扁平列表把它们排成线性顺序,Agent 只能逐个执行,浪费并发机会。
  3. 不持久:TodoWrite 的状态活在对话上下文里,一旦触发上下文压缩(第 8 章),整个计划连同进度一起被摘要掉,Agent 丢失精确的任务状态。

Task System 的核心升级是把清单替换为 DAG(Directed Acyclic Graph,有向无环图)。每个任务是图中的一个节点,依赖关系是有向边。DAG 天然支持拓扑排序、就绪检测和并行识别。

每个 Task = 一个 JSON 文件

Task System 的存储设计极其简单——每个任务独立存储为一个 JSON 文件,放在 .tasks/ 目录下:

text
.tasks/
  task_1.json
  task_2.json
  task_3.json
  task_4.json

每个文件的 Schema:

json
{
  "id": 1,
  "subject": "Setup project structure",
  "description": "Initialize package.json, create src/ and test/ directories",
  "status": "pending",
  "blockedBy": [],
  "blocks": [2, 3],
  "owner": ""
}

六个字段,每个都有明确的语义:

字段类型含义
idint全局唯一标识,自增生成
subjectstring一句话任务标题
descriptionstring详细描述,可为空
statusenum状态机当前状态
blockedByint[]前置依赖——本任务被哪些任务阻塞
blocksint[]后置依赖——本任务完成后解锁哪些任务
ownerstring执行者标识(为多 Agent 协作预留)

blockedByblocks 是同一条依赖边的双向记录。任务 1 的 blocks: [2, 3] 意味着任务 2 和 3 的 blockedBy 中都包含 1。这种双向冗余牺牲了一点存储一致性,换来了查询效率——不需要扫描所有文件就能知道某个任务阻塞了谁。

为什么用文件而不用数据库?

单文件存储的设计看起来原始,但对 Agent 系统而言恰恰是最优解:

  • 可读性:Agent 可以直接 read_file 查看任意任务的完整状态,不需要 SQL 查询能力。
  • 原子性:每次只写一个文件,不存在事务一致性问题。
  • Git 友好.tasks/ 目录可以直接纳入版本控制,每次任务状态变更都有完整的 diff 记录。
  • 压缩存活:文件在磁盘上,不受上下文压缩影响。压缩后 Agent 只需 task_list 即可恢复全貌。

状态机

任务的生命周期是一个三态状态机:

mermaid
stateDiagram-v2
    [*] --> pending : task_create
    pending --> in_progress : task_update(status)
    in_progress --> completed : task_update(status)
    pending --> completed : task_update(status)
    completed --> [*]

    note right of pending : blockedBy 非空时,\n逻辑上处于"blocked"状态
    note right of completed : 完成时触发\n_clear_dependency()

三个状态的语义:

  • pending:已创建,等待执行。如果 blockedBy 非空,则逻辑上处于"被阻塞"状态——虽然 status 字段仍为 pending,但 Agent 不应该执行它。
  • in_progress:某个 Agent(或人类)正在执行中。
  • completed:已完成。状态转入 completed 时触发副作用——自动清除下游任务的 blockedBy 引用。

注意没有 failedcancelled 状态。这是刻意的简化——Task System 的设计哲学是最小可用。失败的任务可以重置为 pending,取消的任务可以直接删除文件。不引入不需要的复杂度。

依赖解析:完成即解锁

依赖解析是 Task System 最核心的自动化逻辑。当一个任务被标记为 completed 时,系统遍历 .tasks/ 目录下的所有任务文件,将完成任务的 ID 从每个任务的 blockedBy 列表中移除:

python
def _clear_dependency(self, completed_id):
    """Remove completed_id from all other tasks' blockedBy lists."""
    for f in self.dir.glob("task_*.json"):
        task = json.loads(f.read_text())
        if completed_id in task.get("blockedBy", []):
            task["blockedBy"].remove(completed_id)
            self._save(task)

这个看似简单的操作实现了 DAG 的拓扑推进:

mermaid
graph LR
    subgraph "完成 task 1 之前"
        A1["task 1<br/>completed"] --> B1["task 2<br/>blockedBy: [1]"]
        A1 --> C1["task 3<br/>blockedBy: [1]"]
        B1 --> D1["task 4<br/>blockedBy: [2, 3]"]
        C1 --> D1
    end
mermaid
graph LR
    subgraph "完成 task 1 之后"
        A2["task 1<br/>completed"] -.-> B2["task 2<br/>blockedBy: []  ready"]
        A2 -.-> C2["task 3<br/>blockedBy: []  ready"]
        B2 --> D2["task 4<br/>blockedBy: [2, 3]"]
        C2 --> D2
    end

task 1 完成后,task 2 和 task 3 的 blockedBy 被清空,变为就绪状态,可以并行执行。task 4 仍被阻塞,直到 2 和 3 都完成。

就绪判定的规则极其简单:status == "pending"blockedBy == []。Agent 只需调用 task_list,状态标记为 [ ] 且没有 (blocked by: ...) 后缀的任务就是可以立即执行的。

DAG 的完整示例

一个典型的代码重构任务拆解:

bash
parse  ─────→  transform  ──→  test

   └──→  emit  ──────────────────┘

transformemit 都依赖 parse,但互不依赖,可以并行。test 依赖 transformemit 都完成。

对应的 JSON 文件:

json
// task_1.json — parse
{"id": 1, "subject": "Parse AST", "status": "pending",
 "blockedBy": [], "blocks": [2, 3]}

// task_2.json — transform
{"id": 2, "subject": "Transform AST", "status": "pending",
 "blockedBy": [1], "blocks": [4]}

// task_3.json — emit
{"id": 3, "subject": "Emit output", "status": "pending",
 "blockedBy": [1], "blocks": [4]}

// task_4.json — test
{"id": 4, "subject": "Run tests", "status": "pending",
 "blockedBy": [2, 3], "blocks": []}

task_list 的输出:

bash
[ ] #1: Parse AST
[ ] #2: Transform AST (blocked by: [1])
[ ] #3: Emit output (blocked by: [1])
[ ] #4: Run tests (blocked by: [2, 3])

只有 task 1 是就绪的。完成 task 1 后,task 2 和 task 3 同时就绪。


10.2 任务生命周期管理

TaskManager:CRUD 核心

TaskManager 是 Task System 的引擎,负责任务的创建、查询、更新和依赖维护。整个实现不到 80 行 Python,但覆盖了完整的 DAG 生命周期。

初始化:扫描已有任务

python
class TaskManager:
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True)
        self._next_id = self._max_id() + 1

    def _max_id(self) -> int:
        ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
        return max(ids) if ids else 0

初始化时扫描 .tasks/ 目录,找到已有任务的最大 ID,从 max_id + 1 开始分配新 ID。这保证了即使进程重启、上下文压缩、甚至换一个 Agent 实例接管,ID 序列也不会冲突。

四个 CRUD 工具

Task System 暴露四个工具给 Agent 调用:

task_create —— 创建任务

python
def create(self, subject: str, description: str = "") -> str:
    task = {
        "id": self._next_id, "subject": subject, "description": description,
        "status": "pending", "blockedBy": [], "blocks": [], "owner": "",
    }
    self._save(task)
    self._next_id += 1
    return json.dumps(task, indent=2)

新任务初始状态为 pending,无依赖、无阻塞。创建后返回完整的 JSON,让 Agent 知道分配到的 ID,后续可以用这个 ID 添加依赖关系。

task_update —— 更新状态与依赖

python
def update(self, task_id: int, status: str = None,
           add_blocked_by: list = None, add_blocks: list = None) -> str:
    task = self._load(task_id)
    if status:
        if status not in ("pending", "in_progress", "completed"):
            raise ValueError(f"Invalid status: {status}")
        task["status"] = status
        if status == "completed":
            self._clear_dependency(task_id)
    if add_blocked_by:
        task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
    if add_blocks:
        task["blocks"] = list(set(task["blocks"] + add_blocks))
        for blocked_id in add_blocks:
            try:
                blocked = self._load(blocked_id)
                if task_id not in blocked["blockedBy"]:
                    blocked["blockedBy"].append(task_id)
                    self._save(blocked)
            except ValueError:
                pass
    self._save(task)
    return json.dumps(task, indent=2)

task_update 是功能最密集的工具,承担三种操作:

  1. 状态变更:修改 status,如果目标状态是 completed 则触发依赖解除。
  2. 添加前置依赖addBlockedBy 将指定 ID 加入本任务的 blockedBy
  3. 添加后置依赖addBlocks 不仅更新本任务的 blocks,还双向同步——自动将本任务 ID 加入目标任务的 blockedBy。这个双向写入保证了依赖图的一致性,Agent 不需要手动维护两端。

task_list —— 列出全部任务

python
def list_all(self) -> str:
    tasks = []
    for f in sorted(self.dir.glob("task_*.json")):
        tasks.append(json.loads(f.read_text()))
    lines = []
    for t in tasks:
        marker = {"pending": "[ ]", "in_progress": "[>]",
                  "completed": "[x]"}.get(t["status"], "[?]")
        blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
        lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
    return "\n".join(lines)

输出格式设计为人类和 LLM 都容易解析的文本:

bash
[x] #1: Parse AST
[>] #2: Transform AST
[ ] #3: Emit output
[ ] #4: Run tests (blocked by: [2, 3])

三种标记符号:[ ] = pending(就绪或被阻塞),[>] = in_progress,[x] = completed。被阻塞的任务在末尾显示 (blocked by: [...]),让 Agent 一眼看出哪些任务可以立即执行。

task_get —— 查看单个任务详情

python
def get(self, task_id: int) -> str:
    return json.dumps(self._load(task_id), indent=2)

返回完整的 JSON,包含 descriptionowner 等在 task_list 摘要中省略的字段。Agent 在开始执行某个任务前,通常先 task_get 获取完整描述。

工具注册与调度

四个任务工具和基础文件/Shell 工具一起注册到 TOOL_HANDLERS dispatch map:

python
TOOL_HANDLERS = {
    "bash":        lambda **kw: run_bash(kw["command"]),
    "read_file":   lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file":  lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":   lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
    "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
    "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"),
                                              kw.get("addBlockedBy"), kw.get("addBlocks")),
    "task_list":   lambda **kw: TASKS.list_all(),
    "task_get":    lambda **kw: TASKS.get(kw["task_id"]),
}

从 4 个工具(bash, read, write, edit)扩展到 8 个。新增的 4 个任务工具没有修改任何已有工具的定义,完全是增量添加——这与第 8 章强调的"前缀稳定性"原则一致。

典型的 Agent 交互流程

一个完整的任务驱动工作流:

python
User: "Refactor the parser module: extract tokenizer, rewrite grammar rules, update tests."

Agent 思考: 这个任务可以拆成 DAG...

→ task_create("Extract tokenizer from parser")          → id=1
→ task_create("Rewrite grammar rules")                  → id=2
→ task_create("Update test suite")                       → id=3
→ task_update(2, addBlockedBy=[1])                       → task 2 blocked by 1
→ task_update(3, addBlockedBy=[1, 2])                    → task 3 blocked by 1, 2
→ task_list
  [ ] #1: Extract tokenizer from parser
  [ ] #2: Rewrite grammar rules (blocked by: [1])
  [ ] #3: Update test suite (blocked by: [1, 2])

Agent: "Task 1 is ready. Starting extraction..."
→ task_update(1, status="in_progress")
... 执行实际代码修改 ...
→ task_update(1, status="completed")
  → 自动: task 2 的 blockedBy 从 [1] 变为 []
  → 自动: task 3 的 blockedBy 从 [1, 2] 变为 [2]

→ task_list
  [x] #1: Extract tokenizer from parser
  [ ] #2: Rewrite grammar rules
  [ ] #3: Update test suite (blocked by: [2])

Agent: "Task 2 is now ready. Proceeding..."

关键观察:Agent 不需要记住依赖关系。每次调用 task_list 都能从磁盘重建完整视图。即使中间发生了上下文压缩,Agent 只需一次 task_list 就能回到正轨。


10.3 Task System 作为协作基础设施

从单 Agent 到多 Agent 的桥梁

Task System 的设计在单 Agent 场景下已经足够实用,但它的真正价值在于为后续的多 Agent 协作奠定了基础设施。.tasks/ 目录本质上是一个共享看板(Shared Kanban Board)——任何能读写文件系统的进程都可以参与协作。

mermaid
graph TD
    subgraph ".tasks/ 共享看板"
        T1["task_1.json<br/>status: completed<br/>owner: agent-main"]
        T2["task_2.json<br/>status: in_progress<br/>owner: agent-worker-1"]
        T3["task_3.json<br/>status: in_progress<br/>owner: agent-worker-2"]
        T4["task_4.json<br/>status: pending<br/>blockedBy: [2, 3]"]
    end

    A["Main Agent"] -->|"task_create<br/>task_update"| T1
    B["Worker Agent 1"] -->|"task_get → 执行 → task_update"| T2
    C["Worker Agent 2"] -->|"task_get → 执行 → task_update"| T3
    B -.->|"完成后解锁"| T4
    C -.->|"完成后解锁"| T4

三个协作模式的共同基座

Task System 是后续三种高级机制的统一基座:

后台执行(Background Execution)

Agent 可以将耗时任务标记为 in_progress,交给后台进程执行。后台进程完成后将状态更新为 completed,前台 Agent 通过 task_list 发现下游任务已解锁。整个过程不依赖对话上下文——因为状态在文件系统里。

团队认领(Team Claiming)

多个 Agent 实例共享同一个 .tasks/ 目录时,owner 字段实现了简单的任务认领机制。Agent A 将 task 2 的 owner 设为自己的标识后开始执行,Agent B 看到 task 2 已有 owner,就跳过它去认领其他就绪任务。这是一个最简化的分布式任务调度——没有锁、没有消息队列,只有文件系统的 read-write。

Worktree 绑定

Git worktree 允许在同一仓库下创建多个独立的工作目录。每个 Agent 可以在自己的 worktree 中工作(避免文件冲突),但所有 Agent 共享同一个 .tasks/ 目录来协调进度。任务完成后,每个 worktree 的修改通过 Git 合并回主分支。

.tasks/ 目录 = 共享看板

.tasks/ 类比为项目管理工具中的看板:

看板概念Task System 对应
卡片单个 task_N.json 文件
列(To Do / In Progress / Done)status 字段
卡片之间的依赖箭头blockedBy + blocks 字段
人员分配owner 字段
看板面板task_list 命令的输出

与 Jira/Trello 等工具的本质区别:Agent 既是看板的使用者,也是看板的操作者。 Agent 自主拆分任务、建立依赖、认领并执行、更新状态——全程无需人类参与看板操作。

文件系统作为协调协议

Task System 刻意避免引入进程间通信(IPC)机制。多个 Agent 之间的协调完全通过文件系统的 read/write 实现。这看起来"原始",但有三个关键优势:

  1. 零基础设施:不需要 Redis、RabbitMQ 或任何消息中间件。文件系统是最普遍的共享存储。
  2. 可观测:人类可以直接 ls .tasks/ 查看所有任务,cat task_3.json 查看详情,甚至手动编辑 JSON 修正错误。
  3. 崩溃恢复:Agent 崩溃后,任务状态不会丢失。新 Agent 启动时扫描 .tasks/ 即可恢复全部上下文。

当然,文件系统协调也有明显局限:没有锁机制,两个 Agent 同时写同一个文件会产生竞争条件(race condition)。在实际生产中,这通常通过以下方式缓解:

  • 每个 Agent 只写自己 owner 的任务
  • 使用 _clear_dependency 的全局扫描作为最终一致性保障
  • 如果需要强一致性,升级为数据库存储

10.4 与其他系统对比

TodoWrite vs Task System

维度TodoWrite(第 6 章)Task System
数据结构扁平列表DAG(有向无环图)
存储位置对话上下文内文件系统(.tasks/
状态模型done / not-donepending → in_progress → completed
依赖关系blockedBy + blocks
并行能力无(线性执行)支持(无依赖的任务可并行)
压缩存活否(压缩后丢失精确状态)是(文件系统独立于上下文)
多 Agent不支持支持(共享 .tasks/ 目录)
适用场景单会话、<5 步的简单任务多步骤、有依赖、需持久化的工程任务

两者不是替代关系,而是互补。TodoWrite 适合"读这三个文件然后改一下配置"这种线性小任务——启动快、无开销。Task System 适合"重构整个模块:拆分 → 重写 → 测试 → 集成"这种有结构的复合任务。

云端任务系统

Jira、Linear、Notion 等云端项目管理工具也能管理任务依赖,但它们与 Agent 的集成存在根本性阻抗:

API 延迟:每次读写任务状态都要经过网络请求。Agent 在一次 ReAct 循环中可能需要多次 task_listtask_update,网络延迟会显著拖慢 Agent 的决策速度。

认证复杂度:OAuth、API Key、权限配置——每多一层抽象就多一个故障点。文件系统操作零认证。

Schema 不匹配:云端工具的数据模型是为人类设计的(Sprint、Epic、Story Point),Agent 不需要这些概念。Task System 的 Schema 是为 Agent 的工具调用优化的——字段少、类型简单、可直接序列化为 tool input。

不可离线:网络断开时云端工具完全不可用。文件系统始终可用。

结论:云端工具适合人类团队协作,Task System 适合 Agent 自主执行。如果需要两者桥接,可以在任务完成后将状态同步到云端(而非反过来让 Agent 实时读写云端 API)。

基于会话的任务管理

部分 Agent 框架将任务状态存储在对话历史中,而非独立的持久化存储。这种方案的问题在第 8 章已有详细分析:

  • 上下文压缩冲突:压缩器无法区分"任务状态信息"和"普通对话历史",摘要时可能丢失关键的依赖关系。
  • 跨会话不可见:新开一个会话,之前的任务进度完全不可见。
  • 多 Agent 不可共享:会话是私有的,另一个 Agent 无法读取。

Task System 通过将状态外化到文件系统,彻底解决了这三个问题。代价是多了 4 个工具定义(约 400 token),但这个代价相比获得的能力完全值得。

设计取舍总结

Task System 的设计选择始终遵循一个原则:在 Agent 能力边界内的最简方案

  • 用文件系统而非数据库 → Agent 已有 read_file/write_file 能力,不需要新增 SQL 工具
  • 用自增 ID 而非 UUID → 简短,LLM 更容易在对话中引用 task 3 而非 task a7b3c...
  • 用双向冗余而非单向引用 → 查询快,不需要反向索引
  • 不设 failed 状态 → 简化状态机,失败可以重置为 pending
  • 不做并发控制 → 单 Agent 场景无需锁,多 Agent 场景通过 owner 字段做软隔离

每一个"没有做"的决定都是有意为之——它们是 Task System 在后续章节中能被平滑扩展为多 Agent 协作基座的前提条件。过早引入复杂机制会锁死架构演进空间。