news 2026/6/11 9:22:38

18.4 长期记忆可修改版

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
18.4 长期记忆可修改版

📋 LangGraph + PostgreSQL 异步长期存储代码详细流程解析

这个版本的核心是:使用异步AsyncPostgresStore连接 PostgreSQL,在对话中读取/更新用户画像(长期记忆),并同步到数据库

importosimportsysimportasyncioimportrefromdotenvimportload_dotenvfromlangchain_openaiimportChatOpenAIfromlanggraph.graphimportStateGraph,MessagesState,STARTfromlanggraph.checkpoint.memoryimportInMemorySaverfromlanggraph.store.postgresimportAsyncPostgresStoreifsys.platform=='win32':asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())load_dotenv()DB_URI="postgresql://postgres:root@localhost:5432/langgraph_db"llm=ChatOpenAI(model="qwen-plus",temperature=0.7,api_key=os.getenv("DASHSCOPE_API_KEY"),base_url=os.getenv("DASHSCOPE_BASE_URL"),model_kwargs={"extra_body":{"enable_thinking":False}})checkpointer=InMemorySaver()defcall_agent(state:MessagesState):response=llm.invoke(state["messages"])return{"messages":[response]}defextract_hobby_from_text(text:str)->str|None:patterns=[r"喜欢(养)?(猫|狗|鸟|鱼|乌龟|兔子|摄影|画画|唱歌|跳舞|游泳|跑步|读书|看电视|电影)",r"爱好是(.{2,10})",r"现在喜欢(养)?(.{2,10})",]forpatterninpatterns:match=re.search(pattern,text)ifmatch:hobby=match.group(2)iflen(match.groups())>=2elsematch.group(1)ifhobby:returnhobby.strip()returnNonedefextract_name_from_text(text:str)->str|None:# 明确命令 #setnameiftext.startswith("#setname"):parts=text.split(maxsplit=1)iflen(parts)==2:returnparts[1].strip()# 自然语言patterns=[r"我叫([\u4e00-\u9fa5]{2,4})",r"我是([\u4e00-\u9fa5]{2,4})",r"改名为([\u4e00-\u9fa5]{2,4})",r"把名字改成([\u4e00-\u9fa5]{2,4})",r"以后叫([\u4e00-\u9fa5]{2,4})",r"请叫我([\u4e00-\u9fa5]{2,4})",r"我的名字是([\u4e00-\u9fa5]{2,4})",r"更名为([\u4e00-\u9fa5]{2,4})",]forpatterninpatterns:match=re.search(pattern,text)ifmatch:returnmatch.group(1).strip()returnNoneasyncdefmain():asyncwithAsyncPostgresStore.from_conn_string(DB_URI)asstore:awaitstore.setup()print("✅ PostgreSQL 长期存储已连接")builder=StateGraph(MessagesState)builder.add_node("call_agent",call_agent)builder.add_edge(START,"call_agent")graph=builder.compile(checkpointer=checkpointer,store=store)user_id="user_002"config={"configurable":{"thread_id":f"{user_id}_thread"}}namespace=("profiles",user_id)existing=awaitstore.aget(namespace,"user_info")ifnotexisting:default_profile={"name":"张明","hobby":"摄影","preferences":{"language":"zh-CN","style":"detailed"}}awaitstore.aput(namespace,"user_info",default_profile)print("💾 首次写入默认用户画像")profile=default_profileelse:profile=existing.valueprint(f"📋 读取现有画像:{profile}")user_input=input("\n请输入您的消息: ").strip()print(f"用户:{user_input}")# 处理强制更新命令ifuser_input=="#forceupdate":awaitstore.aput(namespace,"user_info",profile)print("⚠️ 已强制写入当前 profile 到数据库")verify=awaitstore.aget(namespace,"user_info")print(f"✅ 验证数据库内容:{verify.value}")# 强制更新后依然进行正常对话else:updated=Falsenew_name=extract_name_from_text(user_input)print(f"[调试] 提取到的名字:{new_name}")ifnew_nameandnew_name!=profile.get("name"):print(f"🔄 检测到名字变化:{profile['name']}{new_name}")profile["name"]=new_name updated=Truenew_hobby=extract_hobby_from_text(user_input)print(f"[调试] 提取到的爱好:{new_hobby}")ifnew_hobbyandnew_hobby!=profile.get("hobby"):print(f"🔄 检测到爱好变化:{profile['hobby']}{new_hobby}")profile["hobby"]=new_hobby updated=Trueifupdated:try:awaitstore.aput(namespace,"user_info",profile)print("💾 用户画像已更新到 PostgreSQL")verify=awaitstore.aget(namespace,"user_info")print(f"✅ 验证数据库最新内容:{verify.value}")exceptExceptionase:print(f"❌ 数据库写入失败:{e}")else:print("ℹ️ 无更新,数据库保持不变")# 构造对话(使用当前 profile)context=f"用户偏好:{profile}。请基于此回答。"messages=[("system",context),("user",user_input)]foreventingraph.stream({"messages":messages},config=config,stream_mode="values"):event["messages"][-1].pretty_print()if__name__=="__main__":asyncio.run(main())---## 一、整体架构图(异步流程)```mermaid sequenceDiagram participant Mainasmain()participant Loopasasyncio事件循环 participant StoreasAsyncPostgresStore participant DBasPostgreSQL participant GraphasLangGraph图 participant LLMasQwen模型 Main->>Loop:asyncio.run(main())Loop->>Store:AsyncPostgresStore.from_conn_string(DB_URI)Store-->>Loop:异步上下文管理器 Main->>Store:__aenter__ → 建立连接池 Main->>Store:awaitstore.setup()→ 创建表 Main->>Store:awaitstore.aget()→ 查询用户画像 DB-->>Store:返回 JSON 或Nonealt 不存在 Main->>Store:awaitstore.aput()→ 写入默认画像else存在 Main->>Store:读取到 profile end Main->>Graph:builder.compile(store=store)Main->>Graph:awaitgraph.stream()→ 执行图 Graph->>LLM:调用模型 LLM-->>Graph:返回回复 Graph-->>Main:流式输出消息 Main->>Store:__aexit__ → 关闭连接池
--- ## 二、逐段代码流程详解(含异步机制) ### 1. Windows 异步兼容设置 ```python if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  • 为什么需要?
    Windows 默认的ProactorEventLooppsycopg(PostgreSQL 异步驱动)不兼容,必须改用SelectorEventLoop
  • 本质:切换 asyncio 的事件循环策略,让异步数据库操作能在 Windows 上正常运行。

2. 异步主函数入口

asyncdefmain():...if__name__=="__main__":asyncio.run(main())
  • asyncio.run(main())是 Python 官方推荐的启动异步函数的方式。
    它会:
    • 创建一个新的事件循环(Event Loop)。
    • 执行main()协程。
    • 协程结束后关闭循环。
  • 任何async def函数内部都可以使用await等待异步操作

3. 异步连接 PostgreSQL(核心难点)

asyncwithAsyncPostgresStore.from_conn_string(DB_URI)asstore:awaitstore.setup()
  • AsyncPostgresStore.from_conn_string(DB_URI)返回一个异步上下文管理器
  • async with ... as store:等同于:
    store=awaitAsyncPostgresStore.from_conn_string(DB_URI).__aenter__()try:...finally:awaitstore.__aexit__()
  • __aenter__会创建到 PostgreSQL 的连接池(异步非阻塞)。
  • await store.setup()执行 DDL 语句创建表,也是异步非阻塞的(不会卡住事件循环)。

4. 读取/写入用户画像(异步 I/O)

existing=awaitstore.aget(namespace,"user_info")ifnotexisting:awaitstore.aput(namespace,"user_info",default_profile)else:profile=existing.value
  • aget/aput都是异步方法,需要await
  • 异步 I/O 期间,事件循环可以切换去处理其他任务(虽然本例只有一个任务,但如果是并发请求就会体现优势)。

5. 同步与异步的混合(LangGraph 图执行)

builder=StateGraph(MessagesState)...graph=builder.compile(checkpointer=checkpointer,store=store)# 这里的 stream 是一个异步生成器foreventingraph.stream(...,stream_mode="values"):...
  • graph.stream()内部是异步实现的,但 LangGraph 做了封装,可以直接在async函数中for await(实际上stream()返回的是同步迭代器?),但此处for event in graph.stream(...)是同步迭代,因为stream方法返回的是生成器,但内部调用 LLM 时其实是同步阻塞的(llm.invoke)。注意:这里为了简化,LLM 调用是同步的(llm.invoke阻塞),但数据库操作已经异步化。

潜在改进:可以将 LLM 调用也改为异步(llm.ainvoke),进一步提高并发性能,但不是必须。

6. 更新数据库并验证

awaitstore.aput(namespace,"user_info",profile)verify=awaitstore.aget(namespace,"user_info")
  • 写入后立即读取验证,确保数据已持久化。
  • 所有await都是非阻塞的。

三、异步调用顺序(时间轴)

步骤代码操作是否阻塞事件循环
1asyncio.run(main())启动事件循环阻塞(直到 main 结束)
2async with AsyncPostgresStore...建立数据库连接池非阻塞(I/O 等待)
3await store.setup()发送建表 SQL非阻塞
4await store.aget(...)发送查询 SQL非阻塞
5if not existing: await store.aput(...)写入数据非阻塞
6graph.stream(...)调用 LLM阻塞(LLM 响应时间长)
7await store.aput(...)(若更新)写入更新非阻塞
8async with结束关闭连接池非阻塞

总结:只有 LLM 调用是同步阻塞的(可改为异步),其他数据库操作均是非阻塞异步,不会阻塞事件循环。


四、常见异步疑问与解答

Q1: 为什么需要async def main()asyncio.run()

  • :异步代码必须运行在事件循环中。asyncio.run(main())创建并运行事件循环,直到main()完成。

Q2:async with和普通with有什么区别?

  • async with用于异步上下文管理器,其__aenter____aexit__是协程,需要await。普通with用于同步上下文管理器。

Q3:await store.aget(...)时程序在干什么?

  • :事件循环会挂起当前协程,去执行其他就绪的任务(如果有多任务并发)。如果没有其他任务,事件循环会等待 I/O 完成(数据库响应),然后恢复协程。CPU 不会空转

Q4: 这个代码是真正的异步吗?LLM 调用为什么是同步?

  • :数据库部分是完全异步的。LLM 调用(llm.invoke)是同步的,因为ChatOpenAI默认使用同步 HTTP 客户端。若需要全异步,可改用llm.ainvoke(需模型支持)。本例重点演示数据库异步。

Q5: 如何在异步中处理用户输入(input())?

  • input()是同步阻塞函数,会阻塞整个事件循环。本例中input()async函数里直接调用,会阻塞,但通常用户输入很快,影响不大。如需彻底异步,可使用asyncio.to_thread(input)

五、执行流程图(简化的数据流)

启动程序 │ ▼ 建立到 PostgreSQL 的异步连接池 │ ▼ 查询 store 表,获取用户画像 │ ├─ 无记录 → 写入默认 {"name":"张明","hobby":"摄影"} └─ 有记录 → 读取到 profile │ ▼ 用户输入消息(同步 input) │ ▼ 尝试提取 name 和 hobby(同步正则) │ ├─ 检测到变化 → await store.aput 更新数据库 │ │ │ ▼ │ 验证更新(再读一次) └─ 无变化 → 跳过 │ ▼ 构造系统提示(包含 profile) │ ▼ 执行 LangGraph 图(调用 LLM) │ ▼ 流式输出回复 │ ▼ 程序结束,关闭数据库连接池

六、如何验证异步是否正常工作?

  1. 添加日志:在await store.aget前后打印时间戳,观察是否阻塞。
  2. 模拟慢查询:在 PostgreSQL 中执行SELECT pg_sleep(5);,此时程序应该可以响应其他任务(如果有并发)。
  3. 使用asyncio.create_task同时发起多个数据库请求,观察是否并发执行。

七、总结:异步调用的核心要点

概念说明
async def声明一个协程函数
await等待一个可等待对象(协程、Future、Task),期间让出事件循环
asyncio.run()启动事件循环并运行主协程
async with异步上下文管理器,用于管理需要异步初始化和清理的资源(如数据库连接池)
AsyncPostgresStoreLangGraph 提供的异步 PostgreSQL 存储客户端,所有方法都是协程(需await

只要理解“遇到await时,事件循环可以切换去执行其他任务,I/O 完成后回来继续”,就能掌握异步的精髓。在这个例子中,虽然只有一个任务,但异步框架已经为未来扩展并发打下了基础。

如果你希望完整地使用全异步(包括 LLM 调用),可以将llm.invoke替换为llm.ainvoke,并将graph.stream改为异步迭代(需查看 LangGraph 文档)。但当前版本足以让你理解异步数据库操作的流程。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/11 9:22:38

GMP模型

Go 调度器用 G/M/P 模型。 G 是 goroutine,M 是 OS 线程,P 是执行 Go 代码所需的调度上下文。 M 必须绑定 P 才能运行 G。 P 有本地队列,减少全局锁竞争;找不到 G 时会从全局队列、netpoller 或其他 P 偷取。 G 阻塞在 channel/锁…

作者头像 李华
网站建设 2026/6/11 9:22:34

期货回测成交太理想:天勤 TqSim commission 与实盘校准

前言 国内期货量化策略上线通常走三步:先用天勤 TqBacktest 在历史 K 线上回测双均线等信号,净值曲线往往很好看;再用 TqSim 或 TqKq 接实时行情做模拟盘;最后才挂 TqAccount 实盘。很多人卡在第二步:回测净值翻倍&…

作者头像 李华
网站建设 2026/6/11 9:22:29

别再手动算距离了!用Python的NumPy和Matplotlib搞定点到几何图形的最短距离(附完整代码)

几何距离计算实战:用Python向量化技术提升空间分析效率在游戏碰撞检测、地理围栏分析或工业机器人路径规划中,计算点到几何对象的距离是高频操作。传统循环遍历方法在面对海量数据时性能堪忧,而基于NumPy的向量化运算可轻松实现百倍加速。本文…

作者头像 李华
网站建设 2026/6/11 9:22:23

Gooey:Zig 语言打造跨平台 UI 框架,挑战 Electron 与 Qt 统治地位

【导语:在系统编程领域,Zig 语言社区长期缺乏生产级 GUI 方案。近期 GitHub 上出现的实验性项目 Gooey,试图改变这一现状,它是完全用 Zig 编写的 GPU 加速跨平台 UI 框架,虽有局限,但前景值得期待。】填补 …

作者头像 李华
网站建设 2026/6/11 9:22:22

用Python手把手教你实现QAM/PSK星座图的格雷映射(附完整代码)

Python实战:从零构建QAM/PSK星座图的格雷映射系统通信工程师工具箱里最迷人的工具之一,莫过于星座图——那些漂浮在复平面上的神秘点阵。但你是否想过,为什么相邻星座点之间通常只有一位二进制差异?这背后藏着格雷码的智慧。今天&…

作者头像 李华
网站建设 2026/6/11 9:22:17

分场景板材边缘间隙标准大全,匹配DFA设计落地执行规范

板材边缘间隙没有通用的 “万能数值”,不同 PCB 工艺、封装类型、拼板方式、装配结构、应用场景,对应的间隙标准差异极大。很多工程师设计时仅凭经验取值,要么间隙过大造成空间浪费,产品体积超标;要么间隙过小触发工艺…

作者头像 李华