第 10 章 Task System —— 从 Flat List 到 DAG
TodoWrite 是草稿纸,Task System 是项目管理工具——前者是内存中的扁平清单,后者是磁盘上的有向无环图(DAG),能表达依赖、并行、阻塞三种关系,并在上下文压缩和进程重启后依然存活。
10.1 任务图(DAG)的数据模型
为什么扁平清单不够用
第 6 章的 TodoWrite 解决了"Agent 没有计划"的问题,但它的数据模型是一个扁平的条目列表,每个条目只有 description 和 done/not-done 两种状态。这在单会话、线性任务中足够,但面对真实的多步工程任务时,三个根本性缺陷暴露无遗:
- 无法表达依赖:任务 B 必须等任务 A 完成才能开始,扁平清单无法表达这种前后约束。Agent 可能在 A 还没做完时就跳去做 B,导致错误级联。
- 无法表达并行:任务 C 和 D 互不依赖、可以同时执行,但扁平列表把它们排成线性顺序,Agent 只能逐个执行,浪费并发机会。
- 不持久:TodoWrite 的状态活在对话上下文里,一旦触发上下文压缩(第 8 章),整个计划连同进度一起被摘要掉,Agent 丢失精确的任务状态。
Task System 的核心升级是把清单替换为 DAG(Directed Acyclic Graph,有向无环图)。每个任务是图中的一个节点,依赖关系是有向边。DAG 天然支持拓扑排序、就绪检测和并行识别。
每个 Task = 一个 JSON 文件
Task System 的存储设计极其简单——每个任务独立存储为一个 JSON 文件,放在 .tasks/ 目录下:
.tasks/
task_1.json
task_2.json
task_3.json
task_4.json每个文件的 Schema:
{
"id": 1,
"subject": "Setup project structure",
"description": "Initialize package.json, create src/ and test/ directories",
"status": "pending",
"blockedBy": [],
"blocks": [2, 3],
"owner": ""
}六个字段,每个都有明确的语义:
| 字段 | 类型 | 含义 |
|---|---|---|
id | int | 全局唯一标识,自增生成 |
subject | string | 一句话任务标题 |
description | string | 详细描述,可为空 |
status | enum | 状态机当前状态 |
blockedBy | int[] | 前置依赖——本任务被哪些任务阻塞 |
blocks | int[] | 后置依赖——本任务完成后解锁哪些任务 |
owner | string | 执行者标识(为多 Agent 协作预留) |
blockedBy 和 blocks 是同一条依赖边的双向记录。任务 1 的 blocks: [2, 3] 意味着任务 2 和 3 的 blockedBy 中都包含 1。这种双向冗余牺牲了一点存储一致性,换来了查询效率——不需要扫描所有文件就能知道某个任务阻塞了谁。
为什么用文件而不用数据库?
单文件存储的设计看起来原始,但对 Agent 系统而言恰恰是最优解:
- 可读性:Agent 可以直接
read_file查看任意任务的完整状态,不需要 SQL 查询能力。 - 原子性:每次只写一个文件,不存在事务一致性问题。
- Git 友好:
.tasks/目录可以直接纳入版本控制,每次任务状态变更都有完整的 diff 记录。 - 压缩存活:文件在磁盘上,不受上下文压缩影响。压缩后 Agent 只需
task_list即可恢复全貌。
状态机
任务的生命周期是一个三态状态机:
stateDiagram-v2
[*] --> pending : task_create
pending --> in_progress : task_update(status)
in_progress --> completed : task_update(status)
pending --> completed : task_update(status)
completed --> [*]
note right of pending : blockedBy 非空时,\n逻辑上处于"blocked"状态
note right of completed : 完成时触发\n_clear_dependency()三个状态的语义:
pending:已创建,等待执行。如果blockedBy非空,则逻辑上处于"被阻塞"状态——虽然status字段仍为pending,但 Agent 不应该执行它。in_progress:某个 Agent(或人类)正在执行中。completed:已完成。状态转入completed时触发副作用——自动清除下游任务的blockedBy引用。
注意没有 failed 或 cancelled 状态。这是刻意的简化——Task System 的设计哲学是最小可用。失败的任务可以重置为 pending,取消的任务可以直接删除文件。不引入不需要的复杂度。
依赖解析:完成即解锁
依赖解析是 Task System 最核心的自动化逻辑。当一个任务被标记为 completed 时,系统遍历 .tasks/ 目录下的所有任务文件,将完成任务的 ID 从每个任务的 blockedBy 列表中移除:
def _clear_dependency(self, completed_id):
"""Remove completed_id from all other tasks' blockedBy lists."""
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)这个看似简单的操作实现了 DAG 的拓扑推进:
graph LR
subgraph "完成 task 1 之前"
A1["task 1<br/>completed"] --> B1["task 2<br/>blockedBy: [1]"]
A1 --> C1["task 3<br/>blockedBy: [1]"]
B1 --> D1["task 4<br/>blockedBy: [2, 3]"]
C1 --> D1
endgraph LR
subgraph "完成 task 1 之后"
A2["task 1<br/>completed"] -.-> B2["task 2<br/>blockedBy: [] ready"]
A2 -.-> C2["task 3<br/>blockedBy: [] ready"]
B2 --> D2["task 4<br/>blockedBy: [2, 3]"]
C2 --> D2
endtask 1 完成后,task 2 和 task 3 的 blockedBy 被清空,变为就绪状态,可以并行执行。task 4 仍被阻塞,直到 2 和 3 都完成。
就绪判定的规则极其简单:status == "pending" 且 blockedBy == []。Agent 只需调用 task_list,状态标记为 [ ] 且没有 (blocked by: ...) 后缀的任务就是可以立即执行的。
DAG 的完整示例
一个典型的代码重构任务拆解:
parse ─────→ transform ──→ test
│ ↑
└──→ emit ──────────────────┘transform 和 emit 都依赖 parse,但互不依赖,可以并行。test 依赖 transform 和 emit 都完成。
对应的 JSON 文件:
// task_1.json — parse
{"id": 1, "subject": "Parse AST", "status": "pending",
"blockedBy": [], "blocks": [2, 3]}
// task_2.json — transform
{"id": 2, "subject": "Transform AST", "status": "pending",
"blockedBy": [1], "blocks": [4]}
// task_3.json — emit
{"id": 3, "subject": "Emit output", "status": "pending",
"blockedBy": [1], "blocks": [4]}
// task_4.json — test
{"id": 4, "subject": "Run tests", "status": "pending",
"blockedBy": [2, 3], "blocks": []}task_list 的输出:
[ ] #1: Parse AST
[ ] #2: Transform AST (blocked by: [1])
[ ] #3: Emit output (blocked by: [1])
[ ] #4: Run tests (blocked by: [2, 3])只有 task 1 是就绪的。完成 task 1 后,task 2 和 task 3 同时就绪。
10.2 任务生命周期管理
TaskManager:CRUD 核心
TaskManager 是 Task System 的引擎,负责任务的创建、查询、更新和依赖维护。整个实现不到 80 行 Python,但覆盖了完整的 DAG 生命周期。
初始化:扫描已有任务
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1
def _max_id(self) -> int:
ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
return max(ids) if ids else 0初始化时扫描 .tasks/ 目录,找到已有任务的最大 ID,从 max_id + 1 开始分配新 ID。这保证了即使进程重启、上下文压缩、甚至换一个 Agent 实例接管,ID 序列也不会冲突。
四个 CRUD 工具
Task System 暴露四个工具给 Agent 调用:
task_create —— 创建任务
def create(self, subject: str, description: str = "") -> str:
task = {
"id": self._next_id, "subject": subject, "description": description,
"status": "pending", "blockedBy": [], "blocks": [], "owner": "",
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)新任务初始状态为 pending,无依赖、无阻塞。创建后返回完整的 JSON,让 Agent 知道分配到的 ID,后续可以用这个 ID 添加依赖关系。
task_update —— 更新状态与依赖
def update(self, task_id: int, status: str = None,
add_blocked_by: list = None, add_blocks: list = None) -> str:
task = self._load(task_id)
if status:
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Invalid status: {status}")
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
if add_blocked_by:
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
if add_blocks:
task["blocks"] = list(set(task["blocks"] + add_blocks))
for blocked_id in add_blocks:
try:
blocked = self._load(blocked_id)
if task_id not in blocked["blockedBy"]:
blocked["blockedBy"].append(task_id)
self._save(blocked)
except ValueError:
pass
self._save(task)
return json.dumps(task, indent=2)task_update 是功能最密集的工具,承担三种操作:
- 状态变更:修改
status,如果目标状态是completed则触发依赖解除。 - 添加前置依赖:
addBlockedBy将指定 ID 加入本任务的blockedBy。 - 添加后置依赖:
addBlocks不仅更新本任务的blocks,还双向同步——自动将本任务 ID 加入目标任务的blockedBy。这个双向写入保证了依赖图的一致性,Agent 不需要手动维护两端。
task_list —— 列出全部任务
def list_all(self) -> str:
tasks = []
for f in sorted(self.dir.glob("task_*.json")):
tasks.append(json.loads(f.read_text()))
lines = []
for t in tasks:
marker = {"pending": "[ ]", "in_progress": "[>]",
"completed": "[x]"}.get(t["status"], "[?]")
blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
return "\n".join(lines)输出格式设计为人类和 LLM 都容易解析的文本:
[x] #1: Parse AST
[>] #2: Transform AST
[ ] #3: Emit output
[ ] #4: Run tests (blocked by: [2, 3])三种标记符号:[ ] = pending(就绪或被阻塞),[>] = in_progress,[x] = completed。被阻塞的任务在末尾显示 (blocked by: [...]),让 Agent 一眼看出哪些任务可以立即执行。
task_get —— 查看单个任务详情
def get(self, task_id: int) -> str:
return json.dumps(self._load(task_id), indent=2)返回完整的 JSON,包含 description、owner 等在 task_list 摘要中省略的字段。Agent 在开始执行某个任务前,通常先 task_get 获取完整描述。
工具注册与调度
四个任务工具和基础文件/Shell 工具一起注册到 TOOL_HANDLERS dispatch map:
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"),
kw.get("addBlockedBy"), kw.get("addBlocks")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}从 4 个工具(bash, read, write, edit)扩展到 8 个。新增的 4 个任务工具没有修改任何已有工具的定义,完全是增量添加——这与第 8 章强调的"前缀稳定性"原则一致。
典型的 Agent 交互流程
一个完整的任务驱动工作流:
User: "Refactor the parser module: extract tokenizer, rewrite grammar rules, update tests."
Agent 思考: 这个任务可以拆成 DAG...
→ task_create("Extract tokenizer from parser") → id=1
→ task_create("Rewrite grammar rules") → id=2
→ task_create("Update test suite") → id=3
→ task_update(2, addBlockedBy=[1]) → task 2 blocked by 1
→ task_update(3, addBlockedBy=[1, 2]) → task 3 blocked by 1, 2
→ task_list
[ ] #1: Extract tokenizer from parser
[ ] #2: Rewrite grammar rules (blocked by: [1])
[ ] #3: Update test suite (blocked by: [1, 2])
Agent: "Task 1 is ready. Starting extraction..."
→ task_update(1, status="in_progress")
→ ... 执行实际代码修改 ...
→ task_update(1, status="completed")
→ 自动: task 2 的 blockedBy 从 [1] 变为 []
→ 自动: task 3 的 blockedBy 从 [1, 2] 变为 [2]
→ task_list
[x] #1: Extract tokenizer from parser
[ ] #2: Rewrite grammar rules
[ ] #3: Update test suite (blocked by: [2])
Agent: "Task 2 is now ready. Proceeding..."关键观察:Agent 不需要记住依赖关系。每次调用 task_list 都能从磁盘重建完整视图。即使中间发生了上下文压缩,Agent 只需一次 task_list 就能回到正轨。
10.3 Task System 作为协作基础设施
从单 Agent 到多 Agent 的桥梁
Task System 的设计在单 Agent 场景下已经足够实用,但它的真正价值在于为后续的多 Agent 协作奠定了基础设施。.tasks/ 目录本质上是一个共享看板(Shared Kanban Board)——任何能读写文件系统的进程都可以参与协作。
graph TD
subgraph ".tasks/ 共享看板"
T1["task_1.json<br/>status: completed<br/>owner: agent-main"]
T2["task_2.json<br/>status: in_progress<br/>owner: agent-worker-1"]
T3["task_3.json<br/>status: in_progress<br/>owner: agent-worker-2"]
T4["task_4.json<br/>status: pending<br/>blockedBy: [2, 3]"]
end
A["Main Agent"] -->|"task_create<br/>task_update"| T1
B["Worker Agent 1"] -->|"task_get → 执行 → task_update"| T2
C["Worker Agent 2"] -->|"task_get → 执行 → task_update"| T3
B -.->|"完成后解锁"| T4
C -.->|"完成后解锁"| T4三个协作模式的共同基座
Task System 是后续三种高级机制的统一基座:
后台执行(Background Execution)
Agent 可以将耗时任务标记为 in_progress,交给后台进程执行。后台进程完成后将状态更新为 completed,前台 Agent 通过 task_list 发现下游任务已解锁。整个过程不依赖对话上下文——因为状态在文件系统里。
团队认领(Team Claiming)
多个 Agent 实例共享同一个 .tasks/ 目录时,owner 字段实现了简单的任务认领机制。Agent A 将 task 2 的 owner 设为自己的标识后开始执行,Agent B 看到 task 2 已有 owner,就跳过它去认领其他就绪任务。这是一个最简化的分布式任务调度——没有锁、没有消息队列,只有文件系统的 read-write。
Worktree 绑定
Git worktree 允许在同一仓库下创建多个独立的工作目录。每个 Agent 可以在自己的 worktree 中工作(避免文件冲突),但所有 Agent 共享同一个 .tasks/ 目录来协调进度。任务完成后,每个 worktree 的修改通过 Git 合并回主分支。
.tasks/ 目录 = 共享看板
把 .tasks/ 类比为项目管理工具中的看板:
| 看板概念 | Task System 对应 |
|---|---|
| 卡片 | 单个 task_N.json 文件 |
| 列(To Do / In Progress / Done) | status 字段 |
| 卡片之间的依赖箭头 | blockedBy + blocks 字段 |
| 人员分配 | owner 字段 |
| 看板面板 | task_list 命令的输出 |
与 Jira/Trello 等工具的本质区别:Agent 既是看板的使用者,也是看板的操作者。 Agent 自主拆分任务、建立依赖、认领并执行、更新状态——全程无需人类参与看板操作。
文件系统作为协调协议
Task System 刻意避免引入进程间通信(IPC)机制。多个 Agent 之间的协调完全通过文件系统的 read/write 实现。这看起来"原始",但有三个关键优势:
- 零基础设施:不需要 Redis、RabbitMQ 或任何消息中间件。文件系统是最普遍的共享存储。
- 可观测:人类可以直接
ls .tasks/查看所有任务,cat task_3.json查看详情,甚至手动编辑 JSON 修正错误。 - 崩溃恢复:Agent 崩溃后,任务状态不会丢失。新 Agent 启动时扫描
.tasks/即可恢复全部上下文。
当然,文件系统协调也有明显局限:没有锁机制,两个 Agent 同时写同一个文件会产生竞争条件(race condition)。在实际生产中,这通常通过以下方式缓解:
- 每个 Agent 只写自己
owner的任务 - 使用
_clear_dependency的全局扫描作为最终一致性保障 - 如果需要强一致性,升级为数据库存储
10.4 与其他系统对比
TodoWrite vs Task System
| 维度 | TodoWrite(第 6 章) | Task System |
|---|---|---|
| 数据结构 | 扁平列表 | DAG(有向无环图) |
| 存储位置 | 对话上下文内 | 文件系统(.tasks/) |
| 状态模型 | done / not-done | pending → in_progress → completed |
| 依赖关系 | 无 | blockedBy + blocks |
| 并行能力 | 无(线性执行) | 支持(无依赖的任务可并行) |
| 压缩存活 | 否(压缩后丢失精确状态) | 是(文件系统独立于上下文) |
| 多 Agent | 不支持 | 支持(共享 .tasks/ 目录) |
| 适用场景 | 单会话、<5 步的简单任务 | 多步骤、有依赖、需持久化的工程任务 |
两者不是替代关系,而是互补。TodoWrite 适合"读这三个文件然后改一下配置"这种线性小任务——启动快、无开销。Task System 适合"重构整个模块:拆分 → 重写 → 测试 → 集成"这种有结构的复合任务。
云端任务系统
Jira、Linear、Notion 等云端项目管理工具也能管理任务依赖,但它们与 Agent 的集成存在根本性阻抗:
API 延迟:每次读写任务状态都要经过网络请求。Agent 在一次 ReAct 循环中可能需要多次 task_list 和 task_update,网络延迟会显著拖慢 Agent 的决策速度。
认证复杂度:OAuth、API Key、权限配置——每多一层抽象就多一个故障点。文件系统操作零认证。
Schema 不匹配:云端工具的数据模型是为人类设计的(Sprint、Epic、Story Point),Agent 不需要这些概念。Task System 的 Schema 是为 Agent 的工具调用优化的——字段少、类型简单、可直接序列化为 tool input。
不可离线:网络断开时云端工具完全不可用。文件系统始终可用。
结论:云端工具适合人类团队协作,Task System 适合 Agent 自主执行。如果需要两者桥接,可以在任务完成后将状态同步到云端(而非反过来让 Agent 实时读写云端 API)。
基于会话的任务管理
部分 Agent 框架将任务状态存储在对话历史中,而非独立的持久化存储。这种方案的问题在第 8 章已有详细分析:
- 上下文压缩冲突:压缩器无法区分"任务状态信息"和"普通对话历史",摘要时可能丢失关键的依赖关系。
- 跨会话不可见:新开一个会话,之前的任务进度完全不可见。
- 多 Agent 不可共享:会话是私有的,另一个 Agent 无法读取。
Task System 通过将状态外化到文件系统,彻底解决了这三个问题。代价是多了 4 个工具定义(约 400 token),但这个代价相比获得的能力完全值得。
设计取舍总结
Task System 的设计选择始终遵循一个原则:在 Agent 能力边界内的最简方案。
- 用文件系统而非数据库 → Agent 已有
read_file/write_file能力,不需要新增 SQL 工具 - 用自增 ID 而非 UUID → 简短,LLM 更容易在对话中引用
task 3而非task a7b3c... - 用双向冗余而非单向引用 → 查询快,不需要反向索引
- 不设
failed状态 → 简化状态机,失败可以重置为pending - 不做并发控制 → 单 Agent 场景无需锁,多 Agent 场景通过
owner字段做软隔离
每一个"没有做"的决定都是有意为之——它们是 Task System 在后续章节中能被平滑扩展为多 Agent 协作基座的前提条件。过早引入复杂机制会锁死架构演进空间。