nanobot的整体运行机制
本文以nanobot项目代码为准,重点梳理一条用户消息从 Channel 进入系统,到 Agent 调用 LLM、执行工具、保存 Session,再返回响应的完整流程。

一、核心概念
1. Turn
一个 Turn 表示系统处理一条入站消息的完整过程:
1 | |
需要区分三个容易混淆的概念:
- AgentLoop:长期存在的编排对象,不是一次 request。
- Turn:处理一条入站消息的一轮业务流程。
- Iteration:一个 Turn 内部,
AgentRunner主循环的一次迭代。
一个 Turn 可能包含多次 Iteration,例如:
1 | |
通常一次 Iteration 会发起一次主要 LLM 请求,但空响应恢复等逻辑可能在同一个 Iteration 内额外请求 LLM,因此 Iteration 数不一定严格等于 Provider API 请求总数。
2. 消息与总线
Channel 将外部平台消息转换为 InboundMessage,再发布到 MessageBus:
1 | |
Agent 处理完成后生成 OutboundMessage:
1 | |
MessageBus 的作用是解耦 Channel 与 Agent 核心。Channel 不需要直接调用 LLM,AgentLoop 也不需要了解各平台的收发实现。
3. Session
Session 表示一段对话的持久化状态,定义在 session/manager.py:
1 | |
各字段含义:
key:会话唯一标识,通常是channel:chat_id。messages:该会话保存的消息。created_at:Session 创建时间。updated_at:最近一次消息或维护操作的时间。metadata:标题、摘要、checkpoint、持续目标等扩展状态。last_consolidated:已经归档的消息前缀长度。
未归档、仍可进入近期上下文的消息是:
1 | |
二、AgentLoop 的职责

AgentLoop 定义在 agent/loop.py,是产品层的核心编排器,主要负责:
- 从
MessageBus接收消息。 - 按 Session 串行、跨 Session 并发地调度任务。
- 维护 Turn 状态机。
- 构建历史、记忆、技能和运行时上下文。
- 调用
AgentRunner。 - 保存 Session 并发布响应。
AgentLoop 不直接实现通用的 ReAct 工具循环;真正的 LLM/工具迭代位于 AgentRunner。
三、AgentLoop 的主要入口
1. run():持续消费 MessageBus
1 | |
它适用于 Gateway 和各种持续运行的 Channel。
等待入站消息超时后,系统会检查空闲 Session:
1 | |
收到普通消息后,run() 创建后台任务:
1 | |
这样 AgentLoop 不会因为某个 Session 正在等待 LLM,而停止接收其他 Session 的消息。
2. _dispatch():并发调度与消息发布
_dispatch() 的并发原则是:
1 | |
核心结构:
1 | |
如果同一个 Session 已经有任务运行,后续普通消息会进入 pending_queue,供当前 Runner 在执行过程中注入,而不是再启动一个相互竞争的 Turn。
_dispatch() 还负责:
- 建立流式输出回调。
- 调用
_process_message()。 - 将最终
OutboundMessage发布到总线。 - 在取消时恢复 runtime checkpoint。
- 将未消费的 pending 消息重新发布,避免丢失。
3. process_direct():直接处理一条消息
1 | |
它自行构造 InboundMessage,取得 Session 锁,然后调用 _process_message()。
当前项目中它被 Python SDK、OpenAI 兼容 API、CLI 的部分直接调用路径以及 Dream 内部任务使用。它不经过 MessageBus 的持续消费循环,但仍复用同一套 Turn 状态机。
4. _process_message():Turn 状态机驱动器
_process_message() 创建 TurnContext,然后循环调用当前状态对应的 handler:
1 | |
每个状态的耗时、事件和异常会写入 ctx.trace。
四、TurnContext 与状态机

1. TurnState
完整状态流转:
1 | |
COMMAND 还可以通过 "shortcut" 直接进入 DONE。
2. StateTraceEntry
记录单个状态的执行情况:
1 | |
例如可以记录:
1 | |
3. TurnContext
TurnContext 不是 AgentLoop 本身,而是某一个 Turn 的临时工作上下文。
它保存:
- 当前
InboundMessage和Session。 - 当前状态及 trace。
- 历史消息和待发送给模型的
initial_messages。 - 最终回答、工具使用情况和停止原因。
- pending queue、压缩摘要和流式回调。
- 本轮是否为
ephemeral内部运行。
五、各状态详解
1. RESTORE
对应 _state_restore()。
主要工作:
- 处理消息附件,必要时提取文档文本。
- 获取或创建 Session。
- 发布 Session Turn 开始事件。
- 保存消息对应的工作区范围。
- 恢复中断前的 runtime checkpoint。
- 恢复提前持久化但尚未完成的用户消息。
checkpoint 可以保留中断前的 Assistant tool call 和已经完成的工具结果,使 /stop 或任务取消后不会完全丢失执行现场。
2. COMPACT
对应 _state_compact():
1 | |
这里通常不执行实际压缩,而是把后台空闲压缩的结果接回当前 Turn:
- 重新取得可能已经被替换的 Session。
- 读取
_last_summary。 - 将格式化摘要保存到
ctx.pending_summary,供 BUILD 阶段注入 prompt。
真正的空闲压缩由:
1 | |
在后台执行。
如果 idleCompactAfterMinutes=0,空闲压缩关闭,此状态通常只返回原 Session;但如果 Session 已经有 _last_summary,仍可能读取并注入摘要。
3. COMMAND
对应 _state_command()。
它调用:
1 | |
如果是普通消息,返回 "dispatch",进入 BUILD。
如果命令已直接处理,返回 "shortcut",跳过 BUILD、RUN、SAVE 和 RESPOND,直接进入 DONE。为了让 WebUI 能恢复命令历史,除 /new 外,命令和结果会在这里直接保存,并标记 _command=True,使其不进入普通 LLM 历史。
需要注意:/stop 等优先级控制命令会在 AgentLoop.run() 中提前处理,不一定进入这个状态。
4. BUILD
对应 _state_build(),作用是构造本轮 LLM 输入。
4.1 Token consolidation
非 ephemeral Turn 先调用:
1 | |
安全输入预算:
1 | |
估算对象不仅是 Session 文本,还包括:
- System prompt。
- 未归档 Session 历史。
_last_summary。- Memory、skills 和运行时上下文。
- Session metadata。
- 工具 definitions。
- 探测消息
[token-probe]。
当:
1 | |
系统会选择旧的完整用户轮次,通过 LLM 生成摘要,写入 memory/history.jsonl,推进 last_consolidated,并把最近摘要写入:
1 | |
默认目标是压到:
1 | |
consolidation_ratio 默认是 0.5。
4.2 构建近期历史
1 | |
get_history() 会:
- 只读取
last_consolidated后的消息。 - 按消息数和 token 预算截取。
- 尽量从 user 消息开始。
- 删除开头孤立的 tool result。
- 过滤
_command消息。 - 为附件补充文本 breadcrumb。
- 给 user 消息添加时间戳。
4.3 构造完整 prompt
1 | |
最终上下文大致为:
1 | |
BUILD 还会:
- 设置工具上下文。
- 重置
MessageTool的本轮状态。 - 提前持久化当前用户消息,防止执行中断后丢失。
- 创建进度和重试等待回调。
5. RUN
对应 _state_run()。
它先发布 "running" 状态,然后调用:
1 | |
_run_agent_loop() 负责把产品层上下文适配成 AgentRunSpec:
- 进度、流式和额外 Hook。
- checkpoint callback。
- pending message injection callback。
- RequestContext、workspace scope 和 file state。
- 模型、工具、上下文窗口与超时。
- sustained goal 的自动继续条件。
随后调用:
1 | |
_state_run() 将返回结果写回 TurnContext:
1 | |
最后检查是否需要创建内部 continuation。
6. SAVE
对应 _state_save()。
主要工作:
- 处理 continuation 的保存边界。
- 为意外空回答补充统一占位文本。
- 计算本轮用户可感知延迟。
- 将 Runner 产生的新消息追加到 Session。
- 记录运行时间。
- 执行 Session 文件消息数量上限保护。
- 后台再次安排 token consolidation,为下一轮提前整理。
- 清除 pending user turn 和 runtime checkpoint。
- 原子保存 Session。
这里的后台 token consolidation 与 BUILD 前的同步检查互补:
1 | |
7. RESPOND
对应 _state_respond()。
如果 suppress_response=True,则不生成普通出站消息。
否则调用 _assemble_outbound(),将最终文本包装为:
1 | |
如果本轮 MessageTool 已经主动发送消息,普通最终回复可能被抑制,避免重复发送。
流式响应会在 metadata 中标记 _streamed,延迟会记录为 latency_ms。ephemeral 内部运行还会附带 _stop_reason。
六、AgentRunner
AgentRunner 定义在 agent/runner.py,目标是提供不依赖 Channel、SessionManager 等产品层细节的通用工具型 Agent 执行循环。
1. AgentRunSpec
描述一次 Runner 执行所需的配置:
initial_messagestoolsmodelmax_iterationsmax_tool_result_chars- Hook 和回调
- workspace、session key
- context window
- timeout
- injection callback
- sustained goal 条件
2. AgentRunResult
Runner 的统一返回结构:
1 | |
3. run()
AgentRunner.run() 管理 Hook 生命周期和异常边界:
1 | |
真正的 ReAct 循环位于 _run_core()。
七、_run_core() 主循环
核心结构:
1 | |
这里的 else 属于 for...else:只有循环自然耗尽且没有执行 break 时才进入。
1. 每轮上下文治理
每次请求模型前依次执行:
1 | |
作用分别是:
- 删除没有对应 assistant tool call 的孤立 tool result。
- 为缺失结果的 tool call 插入合成错误结果。
- 将较早的大型工具结果替换为一行提示。
- 限制单条工具结果大小,必要时落盘。
- 当整个 prompt 超预算时,只保留较新的合法历史。
这些处理只作用于 messages_for_model,不直接改变用于持久化的原始 messages。
_snip_history() 后可能再次产生孤立工具消息,因此还会重新执行结构修复。
2. 请求模型
1 | |
响应包含:
contenttool_callsfinish_reason- reasoning/thinking 信息
- token usage
finish_reason 表示单次 LLM 生成停止的原因,例如:
stop:正常结束。tool_calls/function_call:请求执行工具。length:达到输出 token 上限。error:Provider 调用失败。
它不同于 Runner 的 stop_reason;后者描述整个 Agent 执行最终为何停止。
3. 工具调用分支
当:
1 | |
为真时,Runner 会:
- 将 Assistant tool call 追加到
messages。 - 保存
awaiting_toolscheckpoint。 - 调用
_execute_tools()。 - 将每个结果包装为
role="tool"消息。 - 保存
tools_completedcheckpoint。 - 检查是否有 pending 消息需要注入。
continue进入下一次 Iteration。
4. 工具执行链
完整调用路径:
1 | |
prepare_call() 负责:
- 按名称找到工具。
- 转换参数类型。
- 根据 JSON Schema 校验参数。
随后正常路径直接调用:
1 | |
如果传入的是不支持 prepare_call() 的兼容工具容器,则退化为:
1 | |
只读且 concurrency_safe=True 的工具可以通过 asyncio.gather() 并发执行;写文件、执行命令等有副作用工具通常单独执行。
工具结果会被标准化为:
1 | |
下一轮 LLM 请求会看到该结果,并决定继续调用工具还是返回最终答案。
5. 没有工具调用时
没有工具调用不代表立即在分支处返回。Runner 还需要检查:
- 空回答是否需要重试。
- 输出是否被
max_tokens截断并需要续写。 - 是否有用户追加消息或子 Agent 结果。
- sustained goal 是否仍需继续。
- Provider 是否返回错误。
如果这些情况都不存在,则:
1 | |
循环退出后统一构造 AgentRunResult。
6. 恢复与特殊分支
空回答
空回答会先重试;多次为空时,再发起 finalization retry。仍为空则:
1 | |
输出截断
当:
1 | |
Runner 会把当前部分回答加入消息,再加入“继续输出”提示,然后进入下一次 Iteration。恢复次数有上限。
中途消息注入
用户在 Agent 执行过程中发送的新消息,或子 Agent 返回的结果,可以通过 injection_callback 注入当前消息链。
有注入时:
1 | |
LLM 或工具错误
错误会被转换为统一文本和 stop_reason。如果此时还有可注入消息,Runner 仍可能继续;否则退出。
最大迭代次数
如果一直调用工具、续写或处理注入,直到 for 循环自然耗尽:
1 | |
系统生成统一的迭代上限提示并返回。
八、Session 压缩的三个层次

1. 空闲压缩
配置:
1 | |
0 表示关闭,默认值也是 0。
Session 空闲达到 TTL 后,后台调用 compact_idle_session():
- 将旧消息通过 LLM 归档。
- 最多保留最近 8 条合法消息。
- 真正替换
session.messages。 - 将
last_consolidated重置为0。 - 保存
_last_summary。
空闲压缩不先检查 Session 是否 token 超限;触发条件是空闲时间。
2. Token consolidation
在 BUILD 前同步检查,并在 SAVE 后后台检查。
触发条件是完整 prompt 的估算 token 达到安全输入预算。它通常不立即删除旧消息,而是:
- 对旧消息生成摘要。
- 写入
history.jsonl。 - 推进
last_consolidated。 - 让
get_history()忽略已经归档的前缀。
3. Runner 即时裁剪
AgentRunner._snip_history() 是最后一道防线:
- 不调用 LLM 摘要。
- 不修改 Session。
- 不更新
last_consolidated。 - 只裁剪当前这一请求实际发送给模型的
messages_for_model。
三者的区别:
1 | |
九、一次完整请求示例

1 | |
十、代码的边界
1 | |
理解这些边界后,阅读 nanobot 的主线可以简化为:
1 | |