第 30 章 扩展与产品化闭环
Coding Agent 从 CLI 工具走向可嵌入、可集成、可大规模部署的产品,需要解决五个层次的工程问题:API 服务化、代码智能、插件生态、质量保证、以及 SDK 化二次开发。本章逐一拆解。
30.1 HTTP API 与服务化
30.1.1 WebSocket + REST API 架构
Agent 的交互模式决定了它不能只用 REST —— 一个 turn 可能执行几分钟,中间产出数十个 streaming notification(工具调用进度、partial response、approval 请求等)。成熟的 Agent 服务同时暴露两种传输协议:
| 传输方式 | 适用场景 | 消息模型 |
|---|---|---|
| WebSocket | 长时会话、streaming event | 双向 JSON-RPC over WS frame |
| REST + SSE | 无状态查询、event 订阅 | 标准 HTTP + Server-Sent Events |
WebSocket 传输的典型实现使用 axum 做 HTTP 框架,每个 WebSocket 连接被分配一个 ConnectionId,升级握手后进入双向 JSON-RPC 通信:
// WebSocket 连接升级与连接管理(Rust / axum)
enum AppServerTransport {
Stdio,
WebSocket { bind_address: SocketAddr },
}
// 连接升级后,每个连接获得独立的 bounded channel
async fn websocket_upgrade_handler(
websocket: WebSocketUpgrade,
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
State(state): State<WebSocketListenerState>,
) -> impl IntoResponse {
let connection_id = ConnectionId(
state.connection_counter.fetch_add(1, Ordering::Relaxed)
);
websocket.on_upgrade(move |stream| async move {
run_websocket_connection(connection_id, stream, state.transport_event_tx).await;
})
}REST + SSE 传输的另一种实现使用 Hono 框架构建完整的 REST API,同时通过 SSE 端点推送事件流:
// SSE 事件推送(TypeScript / Hono)
app.get("/event", async (c) => {
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
// 心跳防止代理超时
const heartbeat = setInterval(() => {
q.push(JSON.stringify({ type: "server.heartbeat", properties: {} }))
}, 10_000)
// 订阅全局事件总线
const unsub = Bus.subscribeAll((event) => {
q.push(JSON.stringify(event))
})
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
}
})
})30.1.2 从 CLI 到 API:Agent-as-a-Service 的架构模式
CLI 和 API 服务的核心差异在于连接生命周期管理。CLI 是 1:1 的 stdio 管道,进程退出即连接终止;API 服务是 N:1 的多连接模型,需要处理连接打开、关闭、以及消息路由。
graph TB
subgraph Clients
CLI[CLI / stdio]
IDE[IDE Extension / WebSocket]
SDK[SDK / in-process]
end
subgraph Transport["Transport Layer"]
STDIO[stdio transport]
WS[WebSocket transport]
INPROC[In-Process channel]
end
subgraph Core["Message Processing Core"]
MP[MessageProcessor]
TM[Thread Manager]
AUTH[Auth Manager]
end
subgraph Outbound["Outbound Router"]
OR[Connection Router]
CID1["Connection #0"]
CID2["Connection #1"]
CIDN["Connection #N"]
end
CLI --> STDIO
IDE --> WS
SDK --> INPROC
STDIO --> MP
WS --> MP
INPROC --> MP
MP --> TM
MP --> AUTH
MP --> OR
OR --> CID1
OR --> CID2
OR --> CIDN关键设计模式 — 传输层抽象:MessageProcessor 完全不关心消息来自 stdio、WebSocket 还是 in-process channel。所有传输方式都通过相同的 TransportEvent 枚举进入统一处理流程:
// 传输事件统一抽象
enum TransportEvent {
ConnectionOpened {
connection_id: ConnectionId,
writer: mpsc::Sender<OutgoingMessage>,
allow_legacy_notifications: bool,
disconnect_sender: Option<CancellationToken>,
},
ConnectionClosed { connection_id: ConnectionId },
IncomingMessage {
connection_id: ConnectionId,
message: JSONRPCMessage,
},
}出站消息的路由同样统一 —— OutgoingEnvelope 携带目标 ConnectionId(或 broadcast 标记),由独立的 outbound router 任务负责分发。这种读写分离的双 loop 架构避免了慢速连接(如网络抖动的 WebSocket 客户端)阻塞消息处理主循环:
// 双 loop 架构:processor loop + outbound loop
// Processor loop: 处理入站 JSON-RPC,dispatch 请求
// Outbound loop: 路由出站消息到各连接的 writer channel
enum OutboundControlEvent {
Opened {
connection_id: ConnectionId,
writer: mpsc::Sender<OutgoingMessage>,
// ...capability flags
},
Closed { connection_id: ConnectionId },
DisconnectAll,
}30.1.3 长连接管理、认证、多租户隔离
认证分层实现。REST 层使用 Basic Auth 中间件,WebSocket 层在升级前拒绝带 Origin 头的跨域请求(防止浏览器 CSRF 攻击),API Key 通过环境变量注入:
// REST 认证中间件
app.use((c, next) => {
if (c.req.method === "OPTIONS") return next() // CORS preflight bypass
const password = Flag.OPENCODE_SERVER_PASSWORD
if (!password) return next()
const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode"
return basicAuth({ username, password })(c, next)
})// WebSocket 层:拒绝 Origin 头(防浏览器 CSRF)
async fn reject_requests_with_origin_header(
request: Request<Body>,
next: Next,
) -> Result<Response, StatusCode> {
if request.headers().contains_key(ORIGIN) {
Err(StatusCode::FORBIDDEN)
} else {
Ok(next.run(request).await)
}
}多租户隔离通过 ConnectionId + ConnectionSessionState 实现。每个连接维护独立的会话状态,包括初始化标志、实验性 API 开关、notification opt-out 集合等:
// 每连接独立状态
struct ConnectionState {
session: ConnectionSessionState,
outbound_initialized: Arc<AtomicBool>,
outbound_experimental_api_enabled: Arc<AtomicBool>,
outbound_opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
}Graceful Shutdown在多连接场景下尤为重要。服务端收到 SIGTERM 后进入 drain 模式:停止接受新连接,等待所有活跃的 assistant turn 完成,然后断开所有 WebSocket 客户端:
impl ShutdownState {
fn update(&mut self, running_turn_count: usize, connection_count: usize) -> ShutdownAction {
if !self.requested { return ShutdownAction::Noop; }
// 第一次信号:等待 running turns 清零
if self.forced || running_turn_count == 0 {
return ShutdownAction::Finish;
}
// 第二次信号:强制关闭
ShutdownAction::Noop
}
}CORS 策略在 REST 服务中需要精细控制。典型做法是白名单 + 模式匹配:
cors({
origin(input) {
if (input.startsWith("http://localhost:")) return input
if (input.startsWith("http://127.0.0.1:")) return input
// Tauri 桌面客户端
if (input === "tauri://localhost") return input
// 云端控制面板
if (/^https:\/\/([a-z0-9-]+\.)*opencode\.ai$/.test(input)) return input
return undefined
},
})30.2 代码智能与 LSP
30.2.1 原生 LSP 支持:Hover / 跳转定义 / 引用查找
Coding Agent 相比普通 Chatbot 的核心优势之一是利用 Language Server Protocol (LSP) 获取编译器级别的代码理解。通过集成 LSP,Agent 可以精确地获取类型信息、跳转到定义、查找引用、获取诊断错误 —— 而非仅靠模型"猜测"代码语义。
典型的 LSP 集成架构是 Client-Server 分离:Agent 进程启动若干 LSP server 子进程(TypeScript、Python、Rust 等),通过 JSON-RPC over stdio 与它们通信。每个 language server 被包装为一个 LSPClient 实例:
// LSP Client 初始化
const connection = createMessageConnection(
new StreamMessageReader(server.process.stdout),
new StreamMessageWriter(server.process.stdin),
)
// 注册诊断通知监听
connection.onNotification("textDocument/publishDiagnostics", (params) => {
const filePath = fileURLToPath(params.uri)
diagnostics.set(filePath, params.diagnostics)
Bus.publish(Event.Diagnostics, { path: filePath, serverID })
})
// 发送 initialize 请求
await connection.sendRequest("initialize", {
rootUri: pathToFileURL(root).href,
capabilities: {
textDocument: {
synchronization: { didOpen: true, didChange: true },
publishDiagnostics: { versionSupport: true },
},
},
})
await connection.sendNotification("initialized", {})LSP 暴露的核心能力可以直接映射为 Agent 的 tool:
// Hover: 获取类型签名和文档
async function hover(input: { file: string; line: number; character: number }) {
return client.connection.sendRequest("textDocument/hover", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
}
// Go-to-definition: 跳转到符号定义
async function definition(input: { file: string; line: number; character: number }) {
return client.connection.sendRequest("textDocument/definition", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
}
// Find references: 查找所有引用
async function references(input: { file: string; line: number; character: number }) {
return client.connection.sendRequest("textDocument/references", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
context: { includeDeclaration: true },
})
}
// Call hierarchy: 查找调用关系
async function incomingCalls(input: { file: string; line: number; character: number }) {
const items = await client.connection.sendRequest(
"textDocument/prepareCallHierarchy", { ... }
)
return client.connection.sendRequest(
"callHierarchy/incomingCalls", { item: items[0] }
)
}30.2.2 代码索引与上下文选择:从百万行代码库中精准提取
Agent 面对大型代码库时的核心挑战是上下文窗口有限。即使模型支持 200K token,全量灌入一个中型项目的所有源码也不现实。代码索引系统负责在海量文件中快速定位相关上下文。
LSP Server 的按需启动与复用是关键优化。每个 LSP server 绑定到一个项目根目录,按文件扩展名匹配。首次访问某语言的文件时懒启动对应 server,后续复用:
// 按文件扩展名和项目根匹配 LSP client
async function getClients(file: string) {
const extension = path.parse(file).ext
for (const server of Object.values(servers)) {
if (server.extensions.length && !server.extensions.includes(extension))
continue
const root = await server.root(file) // 向上查找 package.json / Cargo.toml 等
if (!root) continue
// 查找已有 client 或懒启动
const match = clients.find(x => x.root === root && x.serverID === server.id)
if (match) { result.push(match); continue }
// 异步启动,避免阻塞
const client = await schedule(server, root, key)
if (client) result.push(client)
}
}Workspace Symbol 搜索让 Agent 能快速定位符号而不需遍历文件树:
// 全局符号搜索,限制返回 10 个最相关结果
async function workspaceSymbol(query: string) {
return runAll((client) =>
client.connection
.sendRequest("workspace/symbol", { query })
.then((result) =>
result
.filter((x) => relevantKinds.includes(x.kind)) // Class/Function/Interface...
.slice(0, 10)
)
).then((result) => result.flat())
}30.2.3 file-search 能力
除了 LSP 提供的语义级搜索,Agent 还需要文件名级别的模糊搜索 —— 当用户说"找那个 config 文件"时,Agent 需要快速定位。
文件模糊搜索的核心实现使用多线程并行扫描 + 评分排序:
// 多线程模糊文件搜索
async fn run_fuzzy_file_search(
query: String,
roots: Vec<String>,
cancellation_flag: Arc<AtomicBool>, // 支持取消
) -> Vec<FuzzyFileSearchResult> {
let cores = std::thread::available_parallelism().unwrap_or(1);
let threads = cores.min(MAX_THREADS); // 最多 12 线程
file_search::run(
query.as_str(),
search_dirs,
FileSearchOptions {
limit: NonZero::new(50), // 最多 50 结果
threads,
compute_indices: true, // 返回匹配位置用于高亮
..Default::default()
},
Some(cancellation_flag),
)
}搜索结果返回文件路径、匹配类型(文件/目录)、分数和匹配位置索引:
struct FuzzyFileSearchResult {
root: String, // 项目根目录
path: String, // 相对路径
match_type: FuzzyFileSearchMatchType, // File | Directory
file_name: String, // 文件名
score: f64, // 匹配分数
indices: Vec<usize>, // 匹配字符的索引位置
}搜索支持流式更新 —— 随着扫描进展,通过 notification 实时推送中间结果给前端,用户无需等待完整结果:
fuzzyFileSearch/session/updated → 发送当前已发现的匹配
fuzzyFileSearch/session/completed → 搜索结束30.3 插件与扩展生态
30.3.1 插件架构设计
Agent 的插件系统需要解决一个核心矛盾:开放性与安全性。插件可以修改 Agent 的行为(如注入 auth provider、拦截事件、修改 prompt),但不应该破坏核心稳定性。
成熟的插件架构采用 Hook-based 模式 —— 插件通过注册 hook 函数参与 Agent 生命周期的关键节点:
// 插件接口定义
type Plugin = (input: PluginInput) => Promise<Hooks>
interface PluginInput {
client: OpencodeClient // API 客户端,访问全部 Agent 功能
project: string // 项目标识
worktree: string // worktree 根目录
directory: string // 当前工作目录
serverUrl: URL // 服务器地址
$: BunShell // Shell 执行器
}
// Hook 点 —— 插件可以拦截的生命周期事件
interface Hooks {
config?: (config: Config) => void // 配置变更
event?: (input: { event: BusEvent }) => void // 事件监听
// ... 其他 hook 点(auth、prompt 修改等)
}插件加载流程分三层:
- 内置插件:直接编译进 Agent,如 Codex auth、Copilot auth
- npm 包插件:通过
bun install动态安装指定版本 - 本地文件插件:通过
file://协议加载本地 JS 模块
// 插件加载顺序
// 1. 内置插件
for (const plugin of INTERNAL_PLUGINS) {
const init = await plugin(input)
if (init) hooks.push(init)
}
// 2. npm/本地插件
for (let plugin of config.plugin ?? []) {
if (DEPRECATED_PLUGIN_PACKAGES.includes(plugin)) continue // 跳过废弃包
if (!plugin.startsWith("file://")) {
// 从 npm 安装
const pkg = extractPackageName(plugin)
const version = extractVersion(plugin) ?? "latest"
plugin = await BunProc.install(pkg, version)
}
// 动态 import 并去重
const mod = await import(plugin)
const seen = new Set<PluginInstance>()
for (const [, fn] of Object.entries(mod)) {
if (seen.has(fn)) continue
seen.add(fn)
hooks.push(await fn(input))
}
}触发机制 —— Hook 的执行采用串行 waterfall 模式,每个插件可以修改 output 对象,传递给下一个:
async function trigger<Name extends TriggerName>(
name: Name, input: Input, output: Output
): Promise<Output> {
for (const hook of state.hooks) {
const fn = hook[name]
if (!fn) continue
await fn(input, output) // 串行执行,共享 output 引用
}
return output
}30.3.2 扩展点设计:Hook vs Plugin vs MCP Server 的边界
Agent 生态中存在三种扩展机制,各有适用边界:
| 扩展机制 | 作用层次 | 生命周期 | 典型用途 |
|---|---|---|---|
| Hook | 进程内同步/异步回调 | 跟随 Agent 进程 | Auth 注入、事件监听、prompt 修改 |
| Plugin | 进程内模块化扩展 | 跟随 Agent 实例 | 完整功能模块(auth provider、自定义 agent) |
| MCP Server | 跨进程标准协议 | 独立进程生命周期 | 工具集扩展、外部数据源接入 |
选择原则:
- 需要修改 Agent 内部行为(auth、routing、prompt)→ Plugin + Hook
- 需要添加新的工具能力(数据库查询、API 调用)→ MCP Server
- 需要跨多个 Agent 产品复用 → MCP Server(协议标准化)
- 需要严格隔离(不信任第三方代码)→ MCP Server(进程级隔离)
Skill 是另一种轻量级扩展,介于 Hook 和 MCP Server 之间。Skill 本质是结构化的 Markdown 文件(SKILL.md),包含名称、描述和指令内容,被注入到 Agent 的 system prompt 中:
// Skill 发现 —— 从多个目录扫描 SKILL.md
const EXTERNAL_DIRS = [".claude", ".agents"]
const SKILL_PATTERN = "**/SKILL.md"
async function scanSkills(root: string) {
const matches = await Glob.scan(SKILL_PATTERN, {
cwd: root,
absolute: true,
symlink: true,
})
for (const match of matches) {
const md = await ConfigMarkdown.parse(match)
// frontmatter 解析 name + description
skills[md.data.name] = {
name: md.data.name,
description: md.data.description,
location: match,
content: md.content,
}
}
}30.4 质量保证闭环
30.4.1 完整的评测流水线
产品化 Agent 的质量保证不能靠人工测试。完整的评测流水线包括四个阶段:
graph LR
A["离线 Replay<br/>确定性回放历史 session"] --> B["回归测试集<br/>已知 good/bad cases"]
B --> C["LLM-Judge<br/>模型自评分 + 交叉评审"]
C --> D["数据回流<br/>失败 case 进入训练/prompt 优化"]
D --> A阶段 1:离线 Replay
从生产日志中提取用户 session,在沙箱环境中重放。关键是控制变量 —— 相同的输入、相同的工具状态、相同的文件系统快照:
# Replay 测试伪代码
def replay_session(session_log: SessionLog, sandbox: Sandbox):
sandbox.restore_snapshot(session_log.initial_state)
for turn in session_log.turns:
response = agent.run(turn.user_input, sandbox=sandbox)
assert_no_regression(response, turn.expected_behavior)阶段 2:回归测试集
维护一个 case 库,每个 case 定义:初始状态、用户输入、期望行为(可以是精确匹配,也可以是语义判断)。CI 每次提交运行全部 cases。
阶段 3:LLM-Judge
对于无法精确匹配的行为(如"代码质量"、"回答准确性"),使用另一个模型做评分。关键是定义清晰的评分 rubric:
JUDGE_PROMPT = """
Rate the agent's response on these dimensions (1-5):
1. Correctness: Does the code compile and produce correct output?
2. Completeness: Are all requirements addressed?
3. Code quality: Is the code clean, idiomatic, and well-structured?
Response to evaluate:
{agent_response}
Expected behavior:
{expected}
"""阶段 4:数据回流
失败 case 被标注后进入反馈循环:如果是 prompt 问题,调整 system prompt;如果是工具问题,修复工具实现;如果是模型能力问题,标记为 training signal。
30.4.2 测试策略和 CI/CD
Agent 的测试金字塔与传统软件不同 —— 集成测试的比重远大于单元测试,因为 Agent 的价值在于各组件协同工作的 emergent behavior。
| 测试层次 | 占比 | 验证什么 | 工具 |
|---|---|---|---|
| 单元测试 | ~20% | 配置解析、消息序列化、工具参数验证 | 标准 test framework |
| 集成测试 | ~50% | 传输层握手、JSON-RPC round-trip、auth 流程 | In-process runtime |
| E2E 测试 | ~20% | 完整 session 流程、sandbox 隔离、文件系统变更 | 真实 CLI 进程 |
| LLM-Judge | ~10% | 代码质量、回答准确性、行为符合预期 | 模型评审 |
In-process runtime 是集成测试的核心基础设施。它用 bounded in-memory channel 替代 socket/stdio,保留完整的 app-server 语义但避免进程边界开销:
// In-process 集成测试
#[tokio::test]
async fn in_process_start_initializes_and_handles_typed_v2_request() {
let client = start_test_client(SessionSource::Cli).await;
let response = client
.request(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("request transport should work")
.expect("request should succeed");
assert!(response.is_object());
client.shutdown().await.expect("should shutdown cleanly");
}CI/CD 流水线的关键是分层执行:
# CI 流水线分层
stages:
- lint # 静态检查,< 1 min
- unit # 单元测试,< 3 min
- integrate # 集成测试(in-process),< 10 min
- e2e # E2E 测试(需要 API key),< 30 min
- eval # LLM-Judge 评测,按需执行30.5 SDK 与二次开发
30.5.1 Python + TypeScript SDK
成熟的 Agent 产品提供多语言 SDK,让开发者将 Agent 能力嵌入自己的应用。SDK 设计的核心原则是镜像 API 语义但简化交互模式。
TypeScript SDK 采用 Codex → Thread → Turn 的三层抽象:
// TypeScript SDK 使用示例
import { Codex } from "@openai/codex"
const codex = new Codex({
apiKey: process.env.API_KEY,
baseUrl: "https://api.example.com",
})
// 创建 thread 并执行 turn
const thread = codex.startThread({
model: "o4-mini",
sandboxMode: "write",
workingDirectory: "/path/to/project",
})
// 同步执行(等待完成)
const result = await thread.run("Fix the type errors in auth.ts")
console.log(result.finalResponse)
console.log(result.items) // 工具调用记录、文件变更等
// 流式执行
const { events } = await thread.runStreamed("Refactor the database layer")
for await (const event of events) {
if (event.type === "item.completed") {
console.log(event.item)
}
}SDK 内部通过 spawn 一个 CLI 子进程并解析其 JSON 输出流来工作:
// SDK 内部:spawn CLI 进程
class CodexExec {
async *run(args: CodexExecArgs): AsyncGenerator<string> {
const commandArgs = ["exec", "--experimental-json"]
if (args.model) commandArgs.push("--model", args.model)
if (args.sandboxMode) commandArgs.push("--sandbox", args.sandboxMode)
// ... 其他参数
const child = spawn(this.executablePath, commandArgs, {
env,
signal: args.signal, // AbortSignal 支持
})
child.stdin.write(args.input)
child.stdin.end()
const rl = readline.createInterface({ input: child.stdout })
for await (const line of rl) {
yield line // 每行一个 JSON event
}
}
}Python SDK 提供同步和异步两套 API,通过 JSON-RPC over stdio 与 app-server 通信:
# Python SDK 同步 API
from codex import Codex
with Codex() as codex:
thread = codex.thread_start(
model="o4-mini",
sandbox=SandboxMode.WRITE,
)
result = thread.run("Implement the missing test cases")
print(result.output_text)
# Python SDK 异步 API
from codex import AsyncCodex
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="o4-mini")
turn = await thread.turn("Fix the failing tests")
async for event in turn.stream():
if event.method == "item/agentMessage/delta":
print(event.payload.delta, end="")Python SDK 的底层 AppServerClient 是一个完整的 JSON-RPC 客户端,管理请求 ID、notification 队列和 approval 处理:
class AppServerClient:
"""Synchronous JSON-RPC client for app-server over stdio."""
def request(self, method: str, params: dict, *, response_model: type[T]) -> T:
request_id = str(uuid.uuid4())
self._write_message({"id": request_id, "method": method, "params": params})
while True:
msg = self._read_message()
# 处理 server request(如 approval 请求)
if "method" in msg and "id" in msg:
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
continue
# 缓存 notification
if "method" in msg and "id" not in msg:
self._pending_notifications.append(
self._coerce_notification(msg["method"], msg.get("params"))
)
continue
# 匹配 response
if msg.get("id") == request_id:
if "error" in msg:
raise map_jsonrpc_error(msg["error"]["code"], msg["error"]["message"])
return response_model.model_validate(msg["result"])30.5.2 嵌入式 Agent:从独立 CLI 到可集成的库
SDK 有两种集成模式:进程外(spawn 子进程 + stdio/WebSocket 通信)和进程内(直接调用库 API,共享地址空间)。
进程外模式(TypeScript SDK 的做法):
[你的应用] --spawn--> [codex CLI 进程] --stdio--> [JSON event 流]优点:完全隔离,崩溃不影响宿主;缺点:启动开销、序列化开销、跨进程调试困难。
进程内模式(Rust in-process runtime 的做法):
[你的应用] --mpsc channel--> [MessageProcessor 任务] --mpsc channel--> [事件流]// 进程内嵌入
let handle = in_process::start(InProcessStartArgs {
config: Arc::new(config),
cli_overrides: vec![],
initialize: InitializeParams {
client_info: ClientInfo {
name: "my-app".to_string(),
version: "1.0.0".to_string(),
title: None,
},
capabilities: None,
},
channel_capacity: 128,
// ... 其他配置
}).await?;
// 发送请求
let response = handle.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(1),
params: ThreadStartParams { ephemeral: Some(true), ..default() },
}).await?;
// 消费事件
while let Some(event) = handle.next_event().await {
match event {
InProcessServerEvent::ServerNotification(n) => { /* 处理 turn 进度 */ }
InProcessServerEvent::ServerRequest(r) => {
// 处理 approval 请求
handle.respond_to_server_request(r.id().clone(), result)?;
}
InProcessServerEvent::Lagged { skipped } => {
log::warn!("dropped {skipped} events due to backpressure");
}
_ => {}
}
}
// 关闭
handle.shutdown().await?;30.5.3 进程级隔离 vs 库级嵌入的架构取舍
| 维度 | 进程级隔离 | 库级嵌入 |
|---|---|---|
| 启动延迟 | 100-500ms(spawn + initialize) | <10ms(channel 创建) |
| 内存开销 | 独立地址空间,重复加载 | 共享运行时,最小化重复 |
| 崩溃隔离 | 子进程崩溃不影响宿主 | 共享地址空间,panic 可能传播 |
| 调试难度 | 需要 attach 到子进程 | 单进程,正常断点/日志 |
| 资源共享 | 不可能(AuthManager 等需要各自初始化) | 可以共享 Config、AuthManager、ThreadManager |
| 背压处理 | 通过 pipe buffer / TCP backpressure | 通过 bounded channel try_send |
| 适用场景 | SDK 分发、不信任宿主环境 | IDE 扩展、CLI 内嵌、同团队集成 |
推荐策略:
- 面向外部开发者的 SDK → 进程级隔离(TypeScript/Python SDK 的做法),降低集成复杂度
- 面向内部组件的嵌入(如 IDE 扩展、CLI TUI)→ 库级嵌入(in-process runtime),最小化延迟
- 两层封装:底层提供
InProcessClientHandle(raw channel 接口),上层提供AppServerClient(带 worker task buffering、request/response 封装、surface-specific startup policy)
进程内模式的一个关键设计考量是背压(backpressure)策略。当事件消费者处理不过来时,不同类型的消息有不同的丢弃策略:
// 关键消息(如 TurnCompleted):永远不丢,用 send().await 阻塞等待
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
matches!(notification, ServerNotification::TurnCompleted(_))
}
// 非关键消息(如 item delta):满了就丢,用 try_send + warn
if let Err(TrySendError::Full(_)) = event_tx.try_send(event) {
warn!("dropping in-process server notification (queue full)");
}
// Server request(如 approval):满了不能丢,要返回 error
// 否则 approval 流程会永远 hang
if let Err(send_error) = event_tx.try_send(InProcessServerEvent::ServerRequest(request)) {
outgoing_message_sender
.notify_client_error(request_id, JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "queue is full".to_string(),
data: None,
})
.await;
}