Skip to content

第 30 章 扩展与产品化闭环

Coding Agent 从 CLI 工具走向可嵌入、可集成、可大规模部署的产品,需要解决五个层次的工程问题:API 服务化、代码智能、插件生态、质量保证、以及 SDK 化二次开发。本章逐一拆解。


30.1 HTTP API 与服务化

30.1.1 WebSocket + REST API 架构

Agent 的交互模式决定了它不能只用 REST —— 一个 turn 可能执行几分钟,中间产出数十个 streaming notification(工具调用进度、partial response、approval 请求等)。成熟的 Agent 服务同时暴露两种传输协议:

传输方式适用场景消息模型
WebSocket长时会话、streaming event双向 JSON-RPC over WS frame
REST + SSE无状态查询、event 订阅标准 HTTP + Server-Sent Events

WebSocket 传输的典型实现使用 axum 做 HTTP 框架,每个 WebSocket 连接被分配一个 ConnectionId,升级握手后进入双向 JSON-RPC 通信:

rust
// WebSocket 连接升级与连接管理(Rust / axum)
enum AppServerTransport {
    Stdio,
    WebSocket { bind_address: SocketAddr },
}

// 连接升级后,每个连接获得独立的 bounded channel
async fn websocket_upgrade_handler(
    websocket: WebSocketUpgrade,
    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
    State(state): State<WebSocketListenerState>,
) -> impl IntoResponse {
    let connection_id = ConnectionId(
        state.connection_counter.fetch_add(1, Ordering::Relaxed)
    );
    websocket.on_upgrade(move |stream| async move {
        run_websocket_connection(connection_id, stream, state.transport_event_tx).await;
    })
}

REST + SSE 传输的另一种实现使用 Hono 框架构建完整的 REST API,同时通过 SSE 端点推送事件流:

typescript
// SSE 事件推送(TypeScript / Hono)
app.get("/event", async (c) => {
  return streamSSE(c, async (stream) => {
    const q = new AsyncQueue<string | null>()
    // 心跳防止代理超时
    const heartbeat = setInterval(() => {
      q.push(JSON.stringify({ type: "server.heartbeat", properties: {} }))
    }, 10_000)
    // 订阅全局事件总线
    const unsub = Bus.subscribeAll((event) => {
      q.push(JSON.stringify(event))
    })
    for await (const data of q) {
      if (data === null) return
      await stream.writeSSE({ data })
    }
  })
})

30.1.2 从 CLI 到 API:Agent-as-a-Service 的架构模式

CLI 和 API 服务的核心差异在于连接生命周期管理。CLI 是 1:1 的 stdio 管道,进程退出即连接终止;API 服务是 N:1 的多连接模型,需要处理连接打开、关闭、以及消息路由。

mermaid
graph TB
    subgraph Clients
        CLI[CLI / stdio]
        IDE[IDE Extension / WebSocket]
        SDK[SDK / in-process]
    end

    subgraph Transport["Transport Layer"]
        STDIO[stdio transport]
        WS[WebSocket transport]
        INPROC[In-Process channel]
    end

    subgraph Core["Message Processing Core"]
        MP[MessageProcessor]
        TM[Thread Manager]
        AUTH[Auth Manager]
    end

    subgraph Outbound["Outbound Router"]
        OR[Connection Router]
        CID1["Connection #0"]
        CID2["Connection #1"]
        CIDN["Connection #N"]
    end

    CLI --> STDIO
    IDE --> WS
    SDK --> INPROC

    STDIO --> MP
    WS --> MP
    INPROC --> MP

    MP --> TM
    MP --> AUTH
    MP --> OR

    OR --> CID1
    OR --> CID2
    OR --> CIDN

关键设计模式 — 传输层抽象MessageProcessor 完全不关心消息来自 stdio、WebSocket 还是 in-process channel。所有传输方式都通过相同的 TransportEvent 枚举进入统一处理流程:

rust
// 传输事件统一抽象
enum TransportEvent {
    ConnectionOpened {
        connection_id: ConnectionId,
        writer: mpsc::Sender<OutgoingMessage>,
        allow_legacy_notifications: bool,
        disconnect_sender: Option<CancellationToken>,
    },
    ConnectionClosed { connection_id: ConnectionId },
    IncomingMessage {
        connection_id: ConnectionId,
        message: JSONRPCMessage,
    },
}

出站消息的路由同样统一 —— OutgoingEnvelope 携带目标 ConnectionId(或 broadcast 标记),由独立的 outbound router 任务负责分发。这种读写分离的双 loop 架构避免了慢速连接(如网络抖动的 WebSocket 客户端)阻塞消息处理主循环:

rust
// 双 loop 架构:processor loop + outbound loop
// Processor loop: 处理入站 JSON-RPC,dispatch 请求
// Outbound loop: 路由出站消息到各连接的 writer channel
enum OutboundControlEvent {
    Opened {
        connection_id: ConnectionId,
        writer: mpsc::Sender<OutgoingMessage>,
        // ...capability flags
    },
    Closed { connection_id: ConnectionId },
    DisconnectAll,
}

30.1.3 长连接管理、认证、多租户隔离

认证分层实现。REST 层使用 Basic Auth 中间件,WebSocket 层在升级前拒绝带 Origin 头的跨域请求(防止浏览器 CSRF 攻击),API Key 通过环境变量注入:

typescript
// REST 认证中间件
app.use((c, next) => {
  if (c.req.method === "OPTIONS") return next()  // CORS preflight bypass
  const password = Flag.OPENCODE_SERVER_PASSWORD
  if (!password) return next()
  const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode"
  return basicAuth({ username, password })(c, next)
})
rust
// WebSocket 层:拒绝 Origin 头(防浏览器 CSRF)
async fn reject_requests_with_origin_header(
    request: Request<Body>,
    next: Next,
) -> Result<Response, StatusCode> {
    if request.headers().contains_key(ORIGIN) {
        Err(StatusCode::FORBIDDEN)
    } else {
        Ok(next.run(request).await)
    }
}

多租户隔离通过 ConnectionId + ConnectionSessionState 实现。每个连接维护独立的会话状态,包括初始化标志、实验性 API 开关、notification opt-out 集合等:

rust
// 每连接独立状态
struct ConnectionState {
    session: ConnectionSessionState,
    outbound_initialized: Arc<AtomicBool>,
    outbound_experimental_api_enabled: Arc<AtomicBool>,
    outbound_opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
}

Graceful Shutdown在多连接场景下尤为重要。服务端收到 SIGTERM 后进入 drain 模式:停止接受新连接,等待所有活跃的 assistant turn 完成,然后断开所有 WebSocket 客户端:

rust
impl ShutdownState {
    fn update(&mut self, running_turn_count: usize, connection_count: usize) -> ShutdownAction {
        if !self.requested { return ShutdownAction::Noop; }
        // 第一次信号:等待 running turns 清零
        if self.forced || running_turn_count == 0 {
            return ShutdownAction::Finish;
        }
        // 第二次信号:强制关闭
        ShutdownAction::Noop
    }
}

CORS 策略在 REST 服务中需要精细控制。典型做法是白名单 + 模式匹配:

typescript
cors({
  origin(input) {
    if (input.startsWith("http://localhost:")) return input
    if (input.startsWith("http://127.0.0.1:")) return input
    // Tauri 桌面客户端
    if (input === "tauri://localhost") return input
    // 云端控制面板
    if (/^https:\/\/([a-z0-9-]+\.)*opencode\.ai$/.test(input)) return input
    return undefined
  },
})

30.2 代码智能与 LSP

30.2.1 原生 LSP 支持:Hover / 跳转定义 / 引用查找

Coding Agent 相比普通 Chatbot 的核心优势之一是利用 Language Server Protocol (LSP) 获取编译器级别的代码理解。通过集成 LSP,Agent 可以精确地获取类型信息、跳转到定义、查找引用、获取诊断错误 —— 而非仅靠模型"猜测"代码语义。

典型的 LSP 集成架构是 Client-Server 分离:Agent 进程启动若干 LSP server 子进程(TypeScript、Python、Rust 等),通过 JSON-RPC over stdio 与它们通信。每个 language server 被包装为一个 LSPClient 实例:

typescript
// LSP Client 初始化
const connection = createMessageConnection(
  new StreamMessageReader(server.process.stdout),
  new StreamMessageWriter(server.process.stdin),
)

// 注册诊断通知监听
connection.onNotification("textDocument/publishDiagnostics", (params) => {
  const filePath = fileURLToPath(params.uri)
  diagnostics.set(filePath, params.diagnostics)
  Bus.publish(Event.Diagnostics, { path: filePath, serverID })
})

// 发送 initialize 请求
await connection.sendRequest("initialize", {
  rootUri: pathToFileURL(root).href,
  capabilities: {
    textDocument: {
      synchronization: { didOpen: true, didChange: true },
      publishDiagnostics: { versionSupport: true },
    },
  },
})
await connection.sendNotification("initialized", {})

LSP 暴露的核心能力可以直接映射为 Agent 的 tool:

typescript
// Hover: 获取类型签名和文档
async function hover(input: { file: string; line: number; character: number }) {
  return client.connection.sendRequest("textDocument/hover", {
    textDocument: { uri: pathToFileURL(input.file).href },
    position: { line: input.line, character: input.character },
  })
}

// Go-to-definition: 跳转到符号定义
async function definition(input: { file: string; line: number; character: number }) {
  return client.connection.sendRequest("textDocument/definition", {
    textDocument: { uri: pathToFileURL(input.file).href },
    position: { line: input.line, character: input.character },
  })
}

// Find references: 查找所有引用
async function references(input: { file: string; line: number; character: number }) {
  return client.connection.sendRequest("textDocument/references", {
    textDocument: { uri: pathToFileURL(input.file).href },
    position: { line: input.line, character: input.character },
    context: { includeDeclaration: true },
  })
}

// Call hierarchy: 查找调用关系
async function incomingCalls(input: { file: string; line: number; character: number }) {
  const items = await client.connection.sendRequest(
    "textDocument/prepareCallHierarchy", { ... }
  )
  return client.connection.sendRequest(
    "callHierarchy/incomingCalls", { item: items[0] }
  )
}

30.2.2 代码索引与上下文选择:从百万行代码库中精准提取

Agent 面对大型代码库时的核心挑战是上下文窗口有限。即使模型支持 200K token,全量灌入一个中型项目的所有源码也不现实。代码索引系统负责在海量文件中快速定位相关上下文。

LSP Server 的按需启动与复用是关键优化。每个 LSP server 绑定到一个项目根目录,按文件扩展名匹配。首次访问某语言的文件时懒启动对应 server,后续复用:

typescript
// 按文件扩展名和项目根匹配 LSP client
async function getClients(file: string) {
  const extension = path.parse(file).ext
  for (const server of Object.values(servers)) {
    if (server.extensions.length && !server.extensions.includes(extension))
      continue
    const root = await server.root(file)  // 向上查找 package.json / Cargo.toml 等
    if (!root) continue
    // 查找已有 client 或懒启动
    const match = clients.find(x => x.root === root && x.serverID === server.id)
    if (match) { result.push(match); continue }
    // 异步启动,避免阻塞
    const client = await schedule(server, root, key)
    if (client) result.push(client)
  }
}

Workspace Symbol 搜索让 Agent 能快速定位符号而不需遍历文件树:

typescript
// 全局符号搜索,限制返回 10 个最相关结果
async function workspaceSymbol(query: string) {
  return runAll((client) =>
    client.connection
      .sendRequest("workspace/symbol", { query })
      .then((result) =>
        result
          .filter((x) => relevantKinds.includes(x.kind))  // Class/Function/Interface...
          .slice(0, 10)
      )
  ).then((result) => result.flat())
}

30.2.3 file-search 能力

除了 LSP 提供的语义级搜索,Agent 还需要文件名级别的模糊搜索 —— 当用户说"找那个 config 文件"时,Agent 需要快速定位。

文件模糊搜索的核心实现使用多线程并行扫描 + 评分排序:

rust
// 多线程模糊文件搜索
async fn run_fuzzy_file_search(
    query: String,
    roots: Vec<String>,
    cancellation_flag: Arc<AtomicBool>,  // 支持取消
) -> Vec<FuzzyFileSearchResult> {
    let cores = std::thread::available_parallelism().unwrap_or(1);
    let threads = cores.min(MAX_THREADS);  // 最多 12 线程

    file_search::run(
        query.as_str(),
        search_dirs,
        FileSearchOptions {
            limit: NonZero::new(50),   // 最多 50 结果
            threads,
            compute_indices: true,      // 返回匹配位置用于高亮
            ..Default::default()
        },
        Some(cancellation_flag),
    )
}

搜索结果返回文件路径、匹配类型(文件/目录)、分数和匹配位置索引:

rust
struct FuzzyFileSearchResult {
    root: String,                      // 项目根目录
    path: String,                      // 相对路径
    match_type: FuzzyFileSearchMatchType,  // File | Directory
    file_name: String,                 // 文件名
    score: f64,                        // 匹配分数
    indices: Vec<usize>,              // 匹配字符的索引位置
}

搜索支持流式更新 —— 随着扫描进展,通过 notification 实时推送中间结果给前端,用户无需等待完整结果:

text
fuzzyFileSearch/session/updated  → 发送当前已发现的匹配
fuzzyFileSearch/session/completed → 搜索结束

30.3 插件与扩展生态

30.3.1 插件架构设计

Agent 的插件系统需要解决一个核心矛盾:开放性与安全性。插件可以修改 Agent 的行为(如注入 auth provider、拦截事件、修改 prompt),但不应该破坏核心稳定性。

成熟的插件架构采用 Hook-based 模式 —— 插件通过注册 hook 函数参与 Agent 生命周期的关键节点:

typescript
// 插件接口定义
type Plugin = (input: PluginInput) => Promise<Hooks>

interface PluginInput {
  client: OpencodeClient     // API 客户端,访问全部 Agent 功能
  project: string            // 项目标识
  worktree: string           // worktree 根目录
  directory: string          // 当前工作目录
  serverUrl: URL             // 服务器地址
  $: BunShell                // Shell 执行器
}

// Hook 点 —— 插件可以拦截的生命周期事件
interface Hooks {
  config?: (config: Config) => void              // 配置变更
  event?: (input: { event: BusEvent }) => void   // 事件监听
  // ... 其他 hook 点(auth、prompt 修改等)
}

插件加载流程分三层:

  1. 内置插件:直接编译进 Agent,如 Codex auth、Copilot auth
  2. npm 包插件:通过 bun install 动态安装指定版本
  3. 本地文件插件:通过 file:// 协议加载本地 JS 模块
typescript
// 插件加载顺序
// 1. 内置插件
for (const plugin of INTERNAL_PLUGINS) {
  const init = await plugin(input)
  if (init) hooks.push(init)
}

// 2. npm/本地插件
for (let plugin of config.plugin ?? []) {
  if (DEPRECATED_PLUGIN_PACKAGES.includes(plugin)) continue  // 跳过废弃包
  if (!plugin.startsWith("file://")) {
    // 从 npm 安装
    const pkg = extractPackageName(plugin)
    const version = extractVersion(plugin) ?? "latest"
    plugin = await BunProc.install(pkg, version)
  }
  // 动态 import 并去重
  const mod = await import(plugin)
  const seen = new Set<PluginInstance>()
  for (const [, fn] of Object.entries(mod)) {
    if (seen.has(fn)) continue
    seen.add(fn)
    hooks.push(await fn(input))
  }
}

触发机制 —— Hook 的执行采用串行 waterfall 模式,每个插件可以修改 output 对象,传递给下一个:

typescript
async function trigger<Name extends TriggerName>(
  name: Name, input: Input, output: Output
): Promise<Output> {
  for (const hook of state.hooks) {
    const fn = hook[name]
    if (!fn) continue
    await fn(input, output)  // 串行执行,共享 output 引用
  }
  return output
}

30.3.2 扩展点设计:Hook vs Plugin vs MCP Server 的边界

Agent 生态中存在三种扩展机制,各有适用边界:

扩展机制作用层次生命周期典型用途
Hook进程内同步/异步回调跟随 Agent 进程Auth 注入、事件监听、prompt 修改
Plugin进程内模块化扩展跟随 Agent 实例完整功能模块(auth provider、自定义 agent)
MCP Server跨进程标准协议独立进程生命周期工具集扩展、外部数据源接入

选择原则

  • 需要修改 Agent 内部行为(auth、routing、prompt)→ Plugin + Hook
  • 需要添加新的工具能力(数据库查询、API 调用)→ MCP Server
  • 需要跨多个 Agent 产品复用 → MCP Server(协议标准化)
  • 需要严格隔离(不信任第三方代码)→ MCP Server(进程级隔离)

Skill 是另一种轻量级扩展,介于 Hook 和 MCP Server 之间。Skill 本质是结构化的 Markdown 文件(SKILL.md),包含名称、描述和指令内容,被注入到 Agent 的 system prompt 中:

typescript
// Skill 发现 —— 从多个目录扫描 SKILL.md
const EXTERNAL_DIRS = [".claude", ".agents"]
const SKILL_PATTERN = "**/SKILL.md"

async function scanSkills(root: string) {
  const matches = await Glob.scan(SKILL_PATTERN, {
    cwd: root,
    absolute: true,
    symlink: true,
  })
  for (const match of matches) {
    const md = await ConfigMarkdown.parse(match)
    // frontmatter 解析 name + description
    skills[md.data.name] = {
      name: md.data.name,
      description: md.data.description,
      location: match,
      content: md.content,
    }
  }
}

30.4 质量保证闭环

30.4.1 完整的评测流水线

产品化 Agent 的质量保证不能靠人工测试。完整的评测流水线包括四个阶段:

mermaid
graph LR
    A["离线 Replay<br/>确定性回放历史 session"] --> B["回归测试集<br/>已知 good/bad cases"]
    B --> C["LLM-Judge<br/>模型自评分 + 交叉评审"]
    C --> D["数据回流<br/>失败 case 进入训练/prompt 优化"]
    D --> A

阶段 1:离线 Replay

从生产日志中提取用户 session,在沙箱环境中重放。关键是控制变量 —— 相同的输入、相同的工具状态、相同的文件系统快照:

python
# Replay 测试伪代码
def replay_session(session_log: SessionLog, sandbox: Sandbox):
    sandbox.restore_snapshot(session_log.initial_state)
    for turn in session_log.turns:
        response = agent.run(turn.user_input, sandbox=sandbox)
        assert_no_regression(response, turn.expected_behavior)

阶段 2:回归测试集

维护一个 case 库,每个 case 定义:初始状态、用户输入、期望行为(可以是精确匹配,也可以是语义判断)。CI 每次提交运行全部 cases。

阶段 3:LLM-Judge

对于无法精确匹配的行为(如"代码质量"、"回答准确性"),使用另一个模型做评分。关键是定义清晰的评分 rubric:

python
JUDGE_PROMPT = """
Rate the agent's response on these dimensions (1-5):
1. Correctness: Does the code compile and produce correct output?
2. Completeness: Are all requirements addressed?
3. Code quality: Is the code clean, idiomatic, and well-structured?

Response to evaluate:
{agent_response}

Expected behavior:
{expected}
"""

阶段 4:数据回流

失败 case 被标注后进入反馈循环:如果是 prompt 问题,调整 system prompt;如果是工具问题,修复工具实现;如果是模型能力问题,标记为 training signal。

30.4.2 测试策略和 CI/CD

Agent 的测试金字塔与传统软件不同 —— 集成测试的比重远大于单元测试,因为 Agent 的价值在于各组件协同工作的 emergent behavior。

测试层次占比验证什么工具
单元测试~20%配置解析、消息序列化、工具参数验证标准 test framework
集成测试~50%传输层握手、JSON-RPC round-trip、auth 流程In-process runtime
E2E 测试~20%完整 session 流程、sandbox 隔离、文件系统变更真实 CLI 进程
LLM-Judge~10%代码质量、回答准确性、行为符合预期模型评审

In-process runtime 是集成测试的核心基础设施。它用 bounded in-memory channel 替代 socket/stdio,保留完整的 app-server 语义但避免进程边界开销:

rust
// In-process 集成测试
#[tokio::test]
async fn in_process_start_initializes_and_handles_typed_v2_request() {
    let client = start_test_client(SessionSource::Cli).await;
    let response = client
        .request(ClientRequest::ConfigRequirementsRead {
            request_id: RequestId::Integer(1),
            params: None,
        })
        .await
        .expect("request transport should work")
        .expect("request should succeed");
    assert!(response.is_object());
    client.shutdown().await.expect("should shutdown cleanly");
}

CI/CD 流水线的关键是分层执行

yaml
# CI 流水线分层
stages:
  - lint       # 静态检查,< 1 min
  - unit       # 单元测试,< 3 min
  - integrate  # 集成测试(in-process),< 10 min
  - e2e        # E2E 测试(需要 API key),< 30 min
  - eval       # LLM-Judge 评测,按需执行

30.5 SDK 与二次开发

30.5.1 Python + TypeScript SDK

成熟的 Agent 产品提供多语言 SDK,让开发者将 Agent 能力嵌入自己的应用。SDK 设计的核心原则是镜像 API 语义但简化交互模式

TypeScript SDK 采用 Codex → Thread → Turn 的三层抽象:

typescript
// TypeScript SDK 使用示例
import { Codex } from "@openai/codex"

const codex = new Codex({
  apiKey: process.env.API_KEY,
  baseUrl: "https://api.example.com",
})

// 创建 thread 并执行 turn
const thread = codex.startThread({
  model: "o4-mini",
  sandboxMode: "write",
  workingDirectory: "/path/to/project",
})

// 同步执行(等待完成)
const result = await thread.run("Fix the type errors in auth.ts")
console.log(result.finalResponse)
console.log(result.items)  // 工具调用记录、文件变更等

// 流式执行
const { events } = await thread.runStreamed("Refactor the database layer")
for await (const event of events) {
  if (event.type === "item.completed") {
    console.log(event.item)
  }
}

SDK 内部通过 spawn 一个 CLI 子进程并解析其 JSON 输出流来工作:

typescript
// SDK 内部:spawn CLI 进程
class CodexExec {
  async *run(args: CodexExecArgs): AsyncGenerator<string> {
    const commandArgs = ["exec", "--experimental-json"]
    if (args.model) commandArgs.push("--model", args.model)
    if (args.sandboxMode) commandArgs.push("--sandbox", args.sandboxMode)
    // ... 其他参数

    const child = spawn(this.executablePath, commandArgs, {
      env,
      signal: args.signal,  // AbortSignal 支持
    })
    child.stdin.write(args.input)
    child.stdin.end()

    const rl = readline.createInterface({ input: child.stdout })
    for await (const line of rl) {
      yield line  // 每行一个 JSON event
    }
  }
}

Python SDK 提供同步和异步两套 API,通过 JSON-RPC over stdio 与 app-server 通信:

python
# Python SDK 同步 API
from codex import Codex

with Codex() as codex:
    thread = codex.thread_start(
        model="o4-mini",
        sandbox=SandboxMode.WRITE,
    )
    result = thread.run("Implement the missing test cases")
    print(result.output_text)

# Python SDK 异步 API
from codex import AsyncCodex

async with AsyncCodex() as codex:
    thread = await codex.thread_start(model="o4-mini")
    turn = await thread.turn("Fix the failing tests")
    async for event in turn.stream():
        if event.method == "item/agentMessage/delta":
            print(event.payload.delta, end="")

Python SDK 的底层 AppServerClient 是一个完整的 JSON-RPC 客户端,管理请求 ID、notification 队列和 approval 处理:

python
class AppServerClient:
    """Synchronous JSON-RPC client for app-server over stdio."""

    def request(self, method: str, params: dict, *, response_model: type[T]) -> T:
        request_id = str(uuid.uuid4())
        self._write_message({"id": request_id, "method": method, "params": params})

        while True:
            msg = self._read_message()
            # 处理 server request(如 approval 请求)
            if "method" in msg and "id" in msg:
                response = self._handle_server_request(msg)
                self._write_message({"id": msg["id"], "result": response})
                continue
            # 缓存 notification
            if "method" in msg and "id" not in msg:
                self._pending_notifications.append(
                    self._coerce_notification(msg["method"], msg.get("params"))
                )
                continue
            # 匹配 response
            if msg.get("id") == request_id:
                if "error" in msg:
                    raise map_jsonrpc_error(msg["error"]["code"], msg["error"]["message"])
                return response_model.model_validate(msg["result"])

30.5.2 嵌入式 Agent:从独立 CLI 到可集成的库

SDK 有两种集成模式:进程外(spawn 子进程 + stdio/WebSocket 通信)和进程内(直接调用库 API,共享地址空间)。

进程外模式(TypeScript SDK 的做法):

bash
[你的应用] --spawn--> [codex CLI 进程] --stdio--> [JSON event 流]

优点:完全隔离,崩溃不影响宿主;缺点:启动开销、序列化开销、跨进程调试困难。

进程内模式(Rust in-process runtime 的做法):

text
[你的应用] --mpsc channel--> [MessageProcessor 任务] --mpsc channel--> [事件流]
rust
// 进程内嵌入
let handle = in_process::start(InProcessStartArgs {
    config: Arc::new(config),
    cli_overrides: vec![],
    initialize: InitializeParams {
        client_info: ClientInfo {
            name: "my-app".to_string(),
            version: "1.0.0".to_string(),
            title: None,
        },
        capabilities: None,
    },
    channel_capacity: 128,
    // ... 其他配置
}).await?;

// 发送请求
let response = handle.request(ClientRequest::ThreadStart {
    request_id: RequestId::Integer(1),
    params: ThreadStartParams { ephemeral: Some(true), ..default() },
}).await?;

// 消费事件
while let Some(event) = handle.next_event().await {
    match event {
        InProcessServerEvent::ServerNotification(n) => { /* 处理 turn 进度 */ }
        InProcessServerEvent::ServerRequest(r) => {
            // 处理 approval 请求
            handle.respond_to_server_request(r.id().clone(), result)?;
        }
        InProcessServerEvent::Lagged { skipped } => {
            log::warn!("dropped {skipped} events due to backpressure");
        }
        _ => {}
    }
}

// 关闭
handle.shutdown().await?;

30.5.3 进程级隔离 vs 库级嵌入的架构取舍

维度进程级隔离库级嵌入
启动延迟100-500ms(spawn + initialize)<10ms(channel 创建)
内存开销独立地址空间,重复加载共享运行时,最小化重复
崩溃隔离子进程崩溃不影响宿主共享地址空间,panic 可能传播
调试难度需要 attach 到子进程单进程,正常断点/日志
资源共享不可能(AuthManager 等需要各自初始化)可以共享 Config、AuthManager、ThreadManager
背压处理通过 pipe buffer / TCP backpressure通过 bounded channel try_send
适用场景SDK 分发、不信任宿主环境IDE 扩展、CLI 内嵌、同团队集成

推荐策略

  • 面向外部开发者的 SDK → 进程级隔离(TypeScript/Python SDK 的做法),降低集成复杂度
  • 面向内部组件的嵌入(如 IDE 扩展、CLI TUI)→ 库级嵌入(in-process runtime),最小化延迟
  • 两层封装:底层提供 InProcessClientHandle(raw channel 接口),上层提供 AppServerClient(带 worker task buffering、request/response 封装、surface-specific startup policy)

进程内模式的一个关键设计考量是背压(backpressure)策略。当事件消费者处理不过来时,不同类型的消息有不同的丢弃策略:

rust
// 关键消息(如 TurnCompleted):永远不丢,用 send().await 阻塞等待
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
    matches!(notification, ServerNotification::TurnCompleted(_))
}

// 非关键消息(如 item delta):满了就丢,用 try_send + warn
if let Err(TrySendError::Full(_)) = event_tx.try_send(event) {
    warn!("dropping in-process server notification (queue full)");
}

// Server request(如 approval):满了不能丢,要返回 error
// 否则 approval 流程会永远 hang
if let Err(send_error) = event_tx.try_send(InProcessServerEvent::ServerRequest(request)) {
    outgoing_message_sender
        .notify_client_error(request_id, JSONRPCErrorError {
            code: OVERLOADED_ERROR_CODE,
            message: "queue is full".to_string(),
            data: None,
        })
        .await;
}