news 2026/4/26 2:23:07

Nanobot 从 AgentLoop 启动看怎么驱动大模型运行

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Nanobot 从 AgentLoop 启动看怎么驱动大模型运行

背景

在之前的文章中我们分析了nanobot onboardnanobot gateway命令的实现,
这次我们分析AgentLoop.run方法怎么驱动整个Agent的运行,
从整理上来看,该nanobot用到了Typer,Rich,Questionary,prompt_toolkit这种现代、美观且交互式命令行界面 (CLI) 的强大工具组合。
Typer 用于定义 CLI 结构和参数;Rich 负责文本样式、表格、面板和 Markdown 渲染;Questionary 用于创建交互式问答界面
其中Rich中的Console,Markdown,Table,Text用来进行渲染,支持颜色、表格、面板、语法高亮和 Markdown ,以更好的进行个性化的展示。
其中最主要都是用协程实现,其中也会调用线程来做其他工作。
关于协程和线程的区别:

协程是用户态的轻量级“微线程”,切换由用户程序控制,没有内核态切换,开销极小;线程是内核态的资源单元,切换由操作系统调度,开销较大. 线程切换需要涉及用户态到内核态的转换,上下文包括内核栈、硬件寄存器等,保存和恢复资源较多

Agent.run命令

这个是所有Agent驱动的主循环方法,也是消息传递的主入口,它从 MessageBus 拉入站消息,再异步分发到真正的处理逻辑,
最终把处理结果发送到outbound队列中,从而发送给对应的Channel
于此同时,能够优先处理/stop,/restart,/status这种命令

1.设置启动标志并且链接MCP

self._running=Trueawaitself._connect_mcp()

首先用一个标志位来标记并且保持主循环持续运行,再次连接远程的MCP服务器,并把远程Tool注册进当前ToolRegistry中。
_connect_mcp的实现中使用了AsyncExitStack来手动管理上下文,
这里主要用enter_async_contextaclose两个方法注册和释放异步资源。如果在注册的过程中发生异常的话,则调用close方法。
connect_mcp_servers方法中根据MCP的类型(是stdio或者sse)来建立不同的连接(使用了httpx模块)。最后以MCPToolWrapper类的方式注册到ToolRegistry中,
可以看到其实MCP也是最终通过Tool方式调用的。

2.主循环,消费入站队列消息
这里是个死循环,以最大超时1秒的间隔去从入站队列中消费消息。

whileself._running:try:msg=awaitasyncio.wait_for(self.bus.consume_inbound(),timeout=1.0)exceptasyncio.TimeoutError:continueexceptasyncio.CancelledError:# Preserve real task cancellation so shutdown can complete cleanly.# Only ignore non-task CancelledError signals that may leak from integrations.ifnotself._runningorasyncio.current_task().cancelling():raisecontinueexceptExceptionase:logger.warning("Error consuming inbound message: {}, continuing...",e)continueraw=msg.content.strip()ifself.commands.is_priority(raw):ctx=CommandContext(msg=msg,session=None,key=msg.session_key,raw=raw,loop=self)result=awaitself.commands.dispatch_priority(ctx)ifresult:awaitself.bus.publish_outbound(result)continuetask=asyncio.create_task(self._dispatch(msg))self._active_tasks.setdefault(msg.session_key,[]).append(task)task.add_done_callback(lambdat,k=msg.session_key:self._active_tasks.get(k,[])andself._active_tasks[k].remove(t)iftinself._active_tasks.get(k,[])elseNone)
  • 如果是/stop,/restart,/status则优先处理该命令,如果有结果返回,并把该命令的处理结果发送给出站队列
  • 否则使用asyncio.create_task来创建一个协程任务来异步执行,并按照 session_key记录活跃任务,同时使用add_done_callback方法来增加回调(任务完成后进行活跃任务的清理)

_dispatch方法

1. 并发控制

lock=self._session_locks.setdefault(msg.session_key, asyncio.Lock())gate=self._concurrency_gate or nullcontext()

这里会按照每个session_key 一把锁,保证同一个session的并发资源控制;
全局 asyncio.Semaphore 控制可以同时并发的运行的Session会话(由环境变量 NANOBOT_MAX_CONCURRENT_REQUESTS 控制,默认 3;≤0 为 nullcontext,不限制)

2. 流式输出(可选)
默认 on_stream / on_stream_end 为 None。
若 msg.metadata.get(“_wants_stream”) 为真(例如某频道希望流式),也就是channel配置为"streaming": true
on_stream(delta):把模型增量以 OutboundMessage 发出,metadata 带 _stream_delta: True。
on_stream_end(resuming=…):发空内容,metadata 带 _stream_end、_resuming(供 UI 区分是彻底结束还是工具轮次中间暂停)。

3.核心处理_process_message

  • 当是普通用户消息时

    1. 加载对应的session,从目录workspace/sessions加载当前会话所属的历史消息。

    2. 如果是/new,/status,/help命令,则直接返回对应的处理结果

    3. prompt token合并,

      1. 首先是获取对应session的独享锁
      2. 计算 bucket = self.context_window_tokens(65536) - self.max_completion_tokens(4096) - self._SAFETY_BUFFER(1024) = 60416 。
        计算 prompt占用token estimated:
        • 调用session.get_history方法, 该方法从seesion所有中消息取 第一条 user 裁齐+去掉孤儿 tool 前缀+ 拷贝成只含 role / content / tool 相关键 的列表,
          供 ContextBuilder.build_messages 等拼真实请求使用
        • 调用_build_messages拼出一份和真实请求结构相近的 probe_messages(当前消息占位为 [token-probe])
        • 调用estimate_prompt_tokens_chain方法, 该方法先调用模型自带的estimate_prompt_tokens方法,如果模型没有对应的方法,则使用tiktoken中的cl100k_base近似估算
          把各条消息里字符串 content、text 块、tool_calls/reasoning/name/tool_call_id 和 整份 tools JSON 拼成大字串,用 cl100k_base 计数 token,
          再加 4 * 消息条数 的粗开销,得到整段 prompt 的 tiktoken 估计
      3. 如果 estimated < budget:认为不需要合并,打 debug「idle」日志,直接返回,否则进行如下步骤
      4. 调用pick_consolidation_boundary获取对应每轮次要进行合并的messege 边界,最多经过5轮,并调用consolidate_messages使用LLM + save_memory 工具把这段对话摘要进HISTORY.md / MEMORY.md文件中:
        这里巧妙的使用了TOOL参数的方式来更新对应HISTORY.md / MEMORY.md文件的内容。
        entry = args["history_entry"] update = args["memory_update"] ... self.append_history(entry) ... self.write_long_term(update)
    4. 给带有路由信息的工具设置上下文

      fornamein("message","spawn","cron"):iftool :=self.tools.get(name):ifhasattr(tool,"set_context"): tool.set_context(channel, chat_id, *([message_id]ifname=="message"else[]))

      设置MessageTool,SpawnTool,CronTool工具的目标channelchat_id为入站信息的channelchat_id。让它们发消息、起子代理、建 cron 时路由到当前会话,避免多会话并发时上下文信息错位

    5. 拼发给模型的消息列表

      history=session.get_history(max_messages=0)initial_messages=self.context.build_messages(history=history,current_message=msg.content,media=msg.mediaifmsg.mediaelseNone,channel=msg.channel,chat_id=msg.chat_id,)

      这里的history只包括未合并的信息,因为合并后的信息在build_messages中添加进来,这里的build_messages会包括:

      1. 一条role: "system":大段system_prompt(见下文「系统提示的组成」)。
      2. *history:由调用方传入的会话历史(通常为Session.get_history()等得到的user/assistant/tool消息列表)。
      3. 一条末尾消息rolecurrent_role(默认"user";子代理回灌等场景可为"assistant"),contentmerged(运行时元数据 + 当前用户正文/图片)。
        system_promptbuild_system_prompt(skill_names)生成,各块之间用\n\n---\n\n连接,主要包括:
      部分来源方法内容概要
      身份与规范_get_identity()nanobot 身份、运行环境(OS、架构、Python)、工作区路径、MEMORY/HISTORY/skills 路径、Windows/POSIX 策略、工具与通过message发文件等指引。
      Bootstrap 文件_load_bootstrap_files()工作区根目录下若存在则加载:AGENTS.mdSOUL.mdUSER.mdTOOLS.md
      长期记忆MemoryStore.get_memory_context()读取memory/MEMORY.md,有内容则放在# Memory下。
      常驻技能get_always_skills+load_skills_for_context有则追加# Active Skills
      技能目录build_skills_summary()有则追加# Skills及如何阅读SKILL.md等说明。
      其中 ~/.nanobot/workspace/ ├── AGENTS.md # 代理调度规则与标准作业程序,是代理的工作指南 ├── HEARTBEAT.md # 定时执行逻辑与主动任务状态自检,让 代理 具备“自主意识” ├── SOUL.md # 响应语气、行为特征及输出格式配置,是代理的性格、核心价值观和长期指令 ├── TOOLS.md # 工具授权注册表及调用参数规范,是代理的技能配置清单,定义了工具准则 ├── USER.md # 用户画像数据,包含特定偏好与交互限制配置, 定义了 代理 如何服务你

      user_content 为由"type": "text"或者"type": "image_url"等多模态输入组成,具体参考OpenAI Chat Completions API和ChatCompletionContentPart
      具体的内容可以见 Nanobot的 system_prompt 示例

    6. 跑 Agent 主循环
      这里主要是 调 LLM +(有工具则并发执行并追加结果)+ 再问模型」 直到得到无工具调用的最终文本或错误/步数耗尽,并返回 最终正文、工具名列表、完整 messages 列表
      若 final_content is None 则 置为固定英文一句,避免对外返回空串。

    7. 持久化与后台合并

      self._save_turn(session,all_msgs,1+len(history))self.sessions.save(session)

      对新增的messages进行清洗 user、截断 tool、丢弃有害空 assistant,打上时间戳后 append 到 session.messages,而后续的 sessions.save 直接写到jsonl文件中。

    8. 如果MessageTool已发消息 ,则不再返回主回复 ,避免频道里再贴一条重复总结(由 message 工具已发过),否则返回正常的消息

  • 当是由SpawnTool工具执行产生的子代理的消息时,
    总体上处理逻辑和普通用户消息处理逻辑一样, 区别点如下:

    1. current_role = "assistant" if msg.sender_id == "subagent"如果是由subagent发送来的消息的话,则 设置 消息role 为 “assistant”,
    2. 没有对MessageTool的额外处理
    3. 没有对/new命令的处理
      4. 出站处理
ifresponseisnotNone:awaitself.bus.publish_outbound(response)elifmsg.channel=="cli":awaitself.bus.publish_outbound(OutboundMessage(channel=msg.channel,chat_id=msg.chat_id,content="",metadata=msg.metadataor{},))

如果主流程返回的内容不为空,则直接进行出站处理,否则如果channel是cli(主要是本地CLI命令nanobot agent),则把出站内容置为空。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 22:40:25

如何设计一门编程语言

如何设计一门编程语言 先拆解问题&#xff1a;设计基础是什么&#xff1f;需要涵盖形式语义学、计算模型&#xff08;如Lambda演算、图灵机&#xff09;、编程范式&#xff08;命令式、函数式等&#xff09;和语言设计目标。工作原理和机制&#xff1a;从源代码到可执行代码的流…

作者头像 李华
网站建设 2026/4/16 22:40:21

XCharts 深度解析:Unity 数据可视化图表插件实战指南

XCharts 深度解析&#xff1a;Unity 数据可视化图表插件实战指南 【免费下载链接】XCharts A charting and data visualization library for Unity. Unity数据可视化图表插件。 项目地址: https://gitcode.com/gh_mirrors/xc/XCharts XCharts 是一款基于 UGUI 的功能强…

作者头像 李华
网站建设 2026/4/16 22:40:18

高效智能的米哈游游戏扫码登录解决方案:MHY_Scanner完整指南

高效智能的米哈游游戏扫码登录解决方案&#xff1a;MHY_Scanner完整指南 【免费下载链接】MHY_Scanner MHY扫码登录器&#xff0c;支持从直播流抢码。 项目地址: https://gitcode.com/gh_mirrors/mh/MHY_Scanner 你是否曾经因为抢不到米哈游游戏的登录二维码而感到困扰&…

作者头像 李华