📋 LangGraph + PostgreSQL 异步长期存储代码详细流程解析
这个版本的核心是:使用异步AsyncPostgresStore连接 PostgreSQL,在对话中读取/更新用户画像(长期记忆),并同步到数据库。
importosimportsysimportasyncioimportrefromdotenvimportload_dotenvfromlangchain_openaiimportChatOpenAIfromlanggraph.graphimportStateGraph,MessagesState,STARTfromlanggraph.checkpoint.memoryimportInMemorySaverfromlanggraph.store.postgresimportAsyncPostgresStoreifsys.platform=='win32':asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())load_dotenv()DB_URI="postgresql://postgres:root@localhost:5432/langgraph_db"llm=ChatOpenAI(model="qwen-plus",temperature=0.7,api_key=os.getenv("DASHSCOPE_API_KEY"),base_url=os.getenv("DASHSCOPE_BASE_URL"),model_kwargs={"extra_body":{"enable_thinking":False}})checkpointer=InMemorySaver()defcall_agent(state:MessagesState):response=llm.invoke(state["messages"])return{"messages":[response]}defextract_hobby_from_text(text:str)->str|None:patterns=[r"喜欢(养)?(猫|狗|鸟|鱼|乌龟|兔子|摄影|画画|唱歌|跳舞|游泳|跑步|读书|看电视|电影)",r"爱好是(.{2,10})",r"现在喜欢(养)?(.{2,10})",]forpatterninpatterns:match=re.search(pattern,text)ifmatch:hobby=match.group(2)iflen(match.groups())>=2elsematch.group(1)ifhobby:returnhobby.strip()returnNonedefextract_name_from_text(text:str)->str|None:# 明确命令 #setnameiftext.startswith("#setname"):parts=text.split(maxsplit=1)iflen(parts)==2:returnparts[1].strip()# 自然语言patterns=[r"我叫([\u4e00-\u9fa5]{2,4})",r"我是([\u4e00-\u9fa5]{2,4})",r"改名为([\u4e00-\u9fa5]{2,4})",r"把名字改成([\u4e00-\u9fa5]{2,4})",r"以后叫([\u4e00-\u9fa5]{2,4})",r"请叫我([\u4e00-\u9fa5]{2,4})",r"我的名字是([\u4e00-\u9fa5]{2,4})",r"更名为([\u4e00-\u9fa5]{2,4})",]forpatterninpatterns:match=re.search(pattern,text)ifmatch:returnmatch.group(1).strip()returnNoneasyncdefmain():asyncwithAsyncPostgresStore.from_conn_string(DB_URI)asstore:awaitstore.setup()print("✅ PostgreSQL 长期存储已连接")builder=StateGraph(MessagesState)builder.add_node("call_agent",call_agent)builder.add_edge(START,"call_agent")graph=builder.compile(checkpointer=checkpointer,store=store)user_id="user_002"config={"configurable":{"thread_id":f"{user_id}_thread"}}namespace=("profiles",user_id)existing=awaitstore.aget(namespace,"user_info")ifnotexisting:default_profile={"name":"张明","hobby":"摄影","preferences":{"language":"zh-CN","style":"detailed"}}awaitstore.aput(namespace,"user_info",default_profile)print("💾 首次写入默认用户画像")profile=default_profileelse:profile=existing.valueprint(f"📋 读取现有画像:{profile}")user_input=input("\n请输入您的消息: ").strip()print(f"用户:{user_input}")# 处理强制更新命令ifuser_input=="#forceupdate":awaitstore.aput(namespace,"user_info",profile)print("⚠️ 已强制写入当前 profile 到数据库")verify=awaitstore.aget(namespace,"user_info")print(f"✅ 验证数据库内容:{verify.value}")# 强制更新后依然进行正常对话else:updated=Falsenew_name=extract_name_from_text(user_input)print(f"[调试] 提取到的名字:{new_name}")ifnew_nameandnew_name!=profile.get("name"):print(f"🔄 检测到名字变化:{profile['name']}→{new_name}")profile["name"]=new_name updated=Truenew_hobby=extract_hobby_from_text(user_input)print(f"[调试] 提取到的爱好:{new_hobby}")ifnew_hobbyandnew_hobby!=profile.get("hobby"):print(f"🔄 检测到爱好变化:{profile['hobby']}→{new_hobby}")profile["hobby"]=new_hobby updated=Trueifupdated:try:awaitstore.aput(namespace,"user_info",profile)print("💾 用户画像已更新到 PostgreSQL")verify=awaitstore.aget(namespace,"user_info")print(f"✅ 验证数据库最新内容:{verify.value}")exceptExceptionase:print(f"❌ 数据库写入失败:{e}")else:print("ℹ️ 无更新,数据库保持不变")# 构造对话(使用当前 profile)context=f"用户偏好:{profile}。请基于此回答。"messages=[("system",context),("user",user_input)]foreventingraph.stream({"messages":messages},config=config,stream_mode="values"):event["messages"][-1].pretty_print()if__name__=="__main__":asyncio.run(main())---## 一、整体架构图(异步流程)```mermaid sequenceDiagram participant Mainasmain()participant Loopasasyncio事件循环 participant StoreasAsyncPostgresStore participant DBasPostgreSQL participant GraphasLangGraph图 participant LLMasQwen模型 Main->>Loop:asyncio.run(main())Loop->>Store:AsyncPostgresStore.from_conn_string(DB_URI)Store-->>Loop:异步上下文管理器 Main->>Store:__aenter__ → 建立连接池 Main->>Store:awaitstore.setup()→ 创建表 Main->>Store:awaitstore.aget()→ 查询用户画像 DB-->>Store:返回 JSON 或Nonealt 不存在 Main->>Store:awaitstore.aput()→ 写入默认画像else存在 Main->>Store:读取到 profile end Main->>Graph:builder.compile(store=store)Main->>Graph:awaitgraph.stream()→ 执行图 Graph->>LLM:调用模型 LLM-->>Graph:返回回复 Graph-->>Main:流式输出消息 Main->>Store:__aexit__ → 关闭连接池--- ## 二、逐段代码流程详解(含异步机制) ### 1. Windows 异步兼容设置 ```python if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())- 为什么需要?
Windows 默认的ProactorEventLoop与psycopg(PostgreSQL 异步驱动)不兼容,必须改用SelectorEventLoop。 - 本质:切换 asyncio 的事件循环策略,让异步数据库操作能在 Windows 上正常运行。
2. 异步主函数入口
asyncdefmain():...if__name__=="__main__":asyncio.run(main())asyncio.run(main())是 Python 官方推荐的启动异步函数的方式。
它会:- 创建一个新的事件循环(Event Loop)。
- 执行
main()协程。 - 协程结束后关闭循环。
- 任何
async def函数内部都可以使用await等待异步操作。
3. 异步连接 PostgreSQL(核心难点)
asyncwithAsyncPostgresStore.from_conn_string(DB_URI)asstore:awaitstore.setup()AsyncPostgresStore.from_conn_string(DB_URI)返回一个异步上下文管理器。async with ... as store:等同于:store=awaitAsyncPostgresStore.from_conn_string(DB_URI).__aenter__()try:...finally:awaitstore.__aexit__()__aenter__会创建到 PostgreSQL 的连接池(异步非阻塞)。await store.setup()执行 DDL 语句创建表,也是异步非阻塞的(不会卡住事件循环)。
4. 读取/写入用户画像(异步 I/O)
existing=awaitstore.aget(namespace,"user_info")ifnotexisting:awaitstore.aput(namespace,"user_info",default_profile)else:profile=existing.valueaget/aput都是异步方法,需要await。- 异步 I/O 期间,事件循环可以切换去处理其他任务(虽然本例只有一个任务,但如果是并发请求就会体现优势)。
5. 同步与异步的混合(LangGraph 图执行)
builder=StateGraph(MessagesState)...graph=builder.compile(checkpointer=checkpointer,store=store)# 这里的 stream 是一个异步生成器foreventingraph.stream(...,stream_mode="values"):...graph.stream()内部是异步实现的,但 LangGraph 做了封装,可以直接在async函数中for await(实际上stream()返回的是同步迭代器?),但此处for event in graph.stream(...)是同步迭代,因为stream方法返回的是生成器,但内部调用 LLM 时其实是同步阻塞的(llm.invoke)。注意:这里为了简化,LLM 调用是同步的(llm.invoke阻塞),但数据库操作已经异步化。
潜在改进:可以将 LLM 调用也改为异步(
llm.ainvoke),进一步提高并发性能,但不是必须。
6. 更新数据库并验证
awaitstore.aput(namespace,"user_info",profile)verify=awaitstore.aget(namespace,"user_info")- 写入后立即读取验证,确保数据已持久化。
- 所有
await都是非阻塞的。
三、异步调用顺序(时间轴)
| 步骤 | 代码 | 操作 | 是否阻塞事件循环 |
|---|---|---|---|
| 1 | asyncio.run(main()) | 启动事件循环 | 阻塞(直到 main 结束) |
| 2 | async with AsyncPostgresStore... | 建立数据库连接池 | 非阻塞(I/O 等待) |
| 3 | await store.setup() | 发送建表 SQL | 非阻塞 |
| 4 | await store.aget(...) | 发送查询 SQL | 非阻塞 |
| 5 | if not existing: await store.aput(...) | 写入数据 | 非阻塞 |
| 6 | graph.stream(...) | 调用 LLM | 阻塞(LLM 响应时间长) |
| 7 | await store.aput(...)(若更新) | 写入更新 | 非阻塞 |
| 8 | async with结束 | 关闭连接池 | 非阻塞 |
总结:只有 LLM 调用是同步阻塞的(可改为异步),其他数据库操作均是非阻塞异步,不会阻塞事件循环。
四、常见异步疑问与解答
Q1: 为什么需要async def main()和asyncio.run()?
- 答:异步代码必须运行在事件循环中。
asyncio.run(main())创建并运行事件循环,直到main()完成。
Q2:async with和普通with有什么区别?
- 答:
async with用于异步上下文管理器,其__aenter__和__aexit__是协程,需要await。普通with用于同步上下文管理器。
Q3:await store.aget(...)时程序在干什么?
- 答:事件循环会挂起当前协程,去执行其他就绪的任务(如果有多任务并发)。如果没有其他任务,事件循环会等待 I/O 完成(数据库响应),然后恢复协程。CPU 不会空转。
Q4: 这个代码是真正的异步吗?LLM 调用为什么是同步?
- 答:数据库部分是完全异步的。LLM 调用(
llm.invoke)是同步的,因为ChatOpenAI默认使用同步 HTTP 客户端。若需要全异步,可改用llm.ainvoke(需模型支持)。本例重点演示数据库异步。
Q5: 如何在异步中处理用户输入(input())?
- 答:
input()是同步阻塞函数,会阻塞整个事件循环。本例中input()在async函数里直接调用,会阻塞,但通常用户输入很快,影响不大。如需彻底异步,可使用asyncio.to_thread(input)。
五、执行流程图(简化的数据流)
启动程序 │ ▼ 建立到 PostgreSQL 的异步连接池 │ ▼ 查询 store 表,获取用户画像 │ ├─ 无记录 → 写入默认 {"name":"张明","hobby":"摄影"} └─ 有记录 → 读取到 profile │ ▼ 用户输入消息(同步 input) │ ▼ 尝试提取 name 和 hobby(同步正则) │ ├─ 检测到变化 → await store.aput 更新数据库 │ │ │ ▼ │ 验证更新(再读一次) └─ 无变化 → 跳过 │ ▼ 构造系统提示(包含 profile) │ ▼ 执行 LangGraph 图(调用 LLM) │ ▼ 流式输出回复 │ ▼ 程序结束,关闭数据库连接池六、如何验证异步是否正常工作?
- 添加日志:在
await store.aget前后打印时间戳,观察是否阻塞。 - 模拟慢查询:在 PostgreSQL 中执行
SELECT pg_sleep(5);,此时程序应该可以响应其他任务(如果有并发)。 - 使用
asyncio.create_task同时发起多个数据库请求,观察是否并发执行。
七、总结:异步调用的核心要点
| 概念 | 说明 |
|---|---|
async def | 声明一个协程函数 |
await | 等待一个可等待对象(协程、Future、Task),期间让出事件循环 |
asyncio.run() | 启动事件循环并运行主协程 |
async with | 异步上下文管理器,用于管理需要异步初始化和清理的资源(如数据库连接池) |
AsyncPostgresStore | LangGraph 提供的异步 PostgreSQL 存储客户端,所有方法都是协程(需await) |
只要理解“遇到await时,事件循环可以切换去执行其他任务,I/O 完成后回来继续”,就能掌握异步的精髓。在这个例子中,虽然只有一个任务,但异步框架已经为未来扩展并发打下了基础。
如果你希望完整地使用全异步(包括 LLM 调用),可以将llm.invoke替换为llm.ainvoke,并将graph.stream改为异步迭代(需查看 LangGraph 文档)。但当前版本足以让你理解异步数据库操作的流程。