前言
2024 年被称为 AI Agent 元年。从早期的 AutoGPT 到后来的 CrewAI、AutoGen,再到如今的 LangGraph,多智能体系统的开发框架经历了快速的迭代。我在 2023 年第一次尝试用 AutoGPT 构建自动化任务系统时,整个流程充满了不确定性:Agent 经常"走神"、任务执行到一半就偏离主题、错误处理几乎为零。
到了 2024 年中,我开始使用 LangGraph,这种基于状态机的设计思路彻底改变了我对 Agent 开发的认知。与传统的"让 LLM 自由发挥"不同,LangGraph 让你可以:
- 精确控制 Agent 的执行路径
- 在节点间共享完整的状态信息
- 实现条件分支、循环、重试等复杂逻辑
- 可视化整个工作流的执行过程
过去半年里,我用 LangGraph 构建了十几个生产级别的多智能体系统:从自动化代码审查工具到技术文档生成器,再到市场研究助理。这些系统的共同特点是:稳定性高、可观测性强、出错时可以优雅回滚。
这篇文章是我对 LangGraph 多智能体开发的系统性总结。从最基础的 State 定义,到复杂的多 Agent 协作模式,再到生产环境的部署优化,我会带你一步步构建一个完整的、可运行的多智能体工作流系统。
一、为什么选择 LangGraph?
在深入技术细节之前,我们需要先回答一个问题:市面上有这么多多智能体框架,为什么我最终选择了 LangGraph?
1.1 传统 Agent 框架的痛点
让我先分享几个真实的"踩坑"经历:
故事一:CrewAI 的"集体幻觉"
去年用 CrewAI 做一个市场分析系统,三个 Agent(研究员、分析师、写作者)协作。一开始效果很好,但当任务涉及需要精确数字的财报分析时,我发现三个 Agent 会"互相说服",最终产出完全虚构的财务数据。更糟糕的是,整个执行过程是一个黑盒,我无法定位是哪个环节出了问题。
故事二:AutoGen 的无限循环
在用 AutoGen 构建代码生成系统时,我经常遇到"代码评审 Agent"和"代码编写 Agent"陷入无限争论的情况。一个说"这个代码有性能问题",另一个反驳"这是在可读性和性能之间的权衡",几十个来回后还在原地踏步。AutoGen 缺乏一个明确的"终止条件"机制。
故事三:纯 LangChain 的复杂度
我也尝试过用纯 LangChain 的 Chain 来构建工作流。简单的线性流程还好,但一旦需要循环、条件分支、错误重试,代码就会变得极其复杂,可维护性直线下降。
1.2 LangGraph 的核心优势
LangGraph 解决了上述所有问题,它的核心设计哲学是:“Agent 不应该是黑盒魔法,而应该是可观测、可控制的状态机。”
优势一:显式的状态管理
在 LangGraph 中,所有节点共享同一个 State 对象。每个节点执行后只需要返回 State 的更新部分,LangGraph 会自动合并。这意味着你可以在任何节点访问完整的执行历史,包括之前所有 Agent 的对话、工具调用的结果、中间产物等。
优势二:灵活的控制流
LangGraph 支持:
- 条件边:根据当前状态动态决定下一个节点
- 循环:支持任务重试、迭代优化
- 分支:并行执行多个任务
- 子图:将复杂逻辑模块化
优势三:原生的可观测性
LangGraph 内置了执行轨迹追踪功能,你可以清晰地看到每个节点的输入输出、执行时间、状态变化。这对于调试和优化至关重要。
优势四:与 LangChain 生态的无缝集成
如果你已经在使用 LangChain,迁移到 LangGraph 的成本非常低。所有的 Chain、Tool、LLM 都可以直接在 LangGraph 节点中使用。
1.3 LangGraph 的适用场景
LangGraph 并不是银弹,它最适合以下场景:
✅ 多步骤复杂任务:需要多个步骤、多种工具协作完成的任务 ✅ 需要精确控制:对执行路径有明确要求,不能让 LLM"自由发挥" ✅ 需要可观测性:需要审计、调试、优化执行过程 ✅ 多 Agent 协作:多个 specialized Agent 分工合作
而对于简单的单轮问答或者线性流程,直接用 LangChain 或者原生 LLM 调用可能更简单。
二、LangGraph 核心概念详解
在开始写代码之前,让我们先理解 LangGraph 的几个核心概念。这些概念是构建一切的基础,理解了它们,后面的代码就会变得非常直观。
2.1 State(状态)
State 是 LangGraph 的核心。它是一个在所有节点之间共享的数据结构,包含了工作流执行过程中的所有信息。
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
task_description: str
research_results: str
code: str
tests: str
final_report: str
iteration_count: int
这里有几个关键点:
Annotated类型注解:operator.add表示这个字段会被追加而不是覆盖。对于messages字段来说,每个节点只需要返回新的消息,LangGraph 会自动添加到列表中。数据持久化:State 在整个工作流执行过程中持久存在,任何节点都可以读写任何字段。
增量更新:节点只需要返回它修改的字段,不需要返回整个 State。
你也可以用 Pydantic 来定义 State,获得类型检查和数据验证:
from pydantic import BaseModel, Field
from typing import List, Optional
class AgentState(BaseModel):
messages: List[BaseMessage] = Field(default_factory=list)
task_description: Optional[str] = None
research_results: Optional[str] = None
code: Optional[str] = None
iteration_count: int = 0
2.2 Node(节点)
Node 是工作流中的执行单元。每个节点接收 State 作为输入,执行某些逻辑,然后返回 State 的更新。
一个典型的节点函数:
def research_agent(state: AgentState) -> AgentState:
"""研究 Agent:搜索网络,收集信息"""
task = state["task_description"]
# 调用 LLM + 搜索工具
results = search_tool.run(f"研究关于 {task} 的最新信息")
# 返回 State 的更新部分
return {
"research_results": results,
"messages": [HumanMessage(content=results, role="researcher")]
}
节点可以是:
- LLM 调用:让某个 Agent 思考或生成内容
- 工具调用:执行 Python 代码、调用 API、查询数据库等
- 纯函数:数据转换、格式处理、状态检查
- 子图:调用另一个 Graph 作为子流程
2.3 Edge(边)
Edge 定义了节点之间的连接关系,决定了执行流程。
普通边:从 A 节点执行完总是到 B 节点
graph.add_edge("researcher", "coder")
条件边:根据当前状态动态决定下一个节点
def should_continue(state: AgentState) -> str:
"""决定是继续迭代还是结束"""
if state["iteration_count"] >= 3:
return "end"
if "需要更多研究" in state["messages"][-1].content:
return "researcher"
return "writer"
graph.add_conditional_edges(
"coder",
should_continue,
{
"researcher": "researcher",
"writer": "writer",
"end": END
}
)
条件边是 LangGraph 最强大的特性之一,它让你可以实现:
- 智能路由:根据任务类型选择合适的 Agent
- 错误重试:失败时自动重试或降级
- 质量检查:不满足要求时返回上一步
- 提前终止:达到某些条件时提前结束
2.4 Graph(图)
Graph 是节点和边的集合,代表完整的工作流。
from langgraph.graph import StateGraph, END
# 1. 创建 Graph
workflow = StateGraph(AgentState)
# 2. 添加节点
workflow.add_node("planner", planner_agent)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", code_agent)
workflow.add_node("writer", writer_agent)
# 3. 设置入口点
workflow.set_entry_point("planner")
# 4. 添加边
workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "coder")
workflow.add_conditional_edges("coder", router)
workflow.add_edge("writer", END)
# 5. 编译 Graph
app = workflow.compile()
编译后的 Graph 可以像函数一样调用:
result = app.invoke({
"task_description": "写一篇关于 LangGraph 的技术文章",
"iteration_count": 0
})
也支持流式输出:
for output in app.stream({
"task_description": "写一篇关于 LangGraph 的技术文章"
}):
for key, value in output.items():
print(f"节点 {key} 完成: {value}")
2.5 Checkpointer(检查点)
Checkpointer 允许你持久化工作流的执行状态,支持:
- 暂停和恢复:工作流可以随时暂停,之后从暂停处继续
- 时间旅行:可以回到之前的任何检查点,从那里重新执行
- 人工介入:在关键点等待人类审核或输入
- 错误恢复:失败时可以从最近的检查点重试
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)
# 每个执行线程需要唯一的 thread_id
config = {"configurable": {"thread_id": "1"}}
result = app.invoke(inputs, config=config)
# 之后可以用同一个 thread_id 继续
next_result = app.invoke(next_inputs, config=config)
理解了这些核心概念后,我们就可以开始构建真正的多智能体工作流了。
三、环境搭建与基础配置
在开始构建多智能体系统之前,让我们先准备好开发环境。
3.1 安装依赖
pip install langgraph langchain langchain-openai python-dotenv
主要依赖说明:
langgraph:核心框架langchain:提供 Chains、Tools、Message 等基础组件langchain-openai:OpenAI 模型集成python-dotenv:环境变量管理
3.2 基础配置
创建 .env 文件:
OPENAI_API_KEY=your-api-key-here
OPENAI_API_BASE=https://api.openai.com/v1
OPENAI_MODEL_NAME=gpt-4o
然后在代码中加载:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
load_dotenv()
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL_NAME", "gpt-4o"),
temperature=0,
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_API_BASE")
)
关于 Model 选择的建议:
- GPT-4o:多智能体协调、复杂推理任务首选
- GPT-4 Turbo:代码生成、技术写作,成本比 GPT-4o 低
- Claude 3 Opus:处理长文本、生成长篇报告时效果更好
- 本地模型:如 Llama 3 70B,适合对数据安全有要求的场景
(第一部分完,约 2200 字)
四、单 Agent 工作流实现
让我们从最简单的单 Agent 工作流开始,逐步增加复杂度。这是一个很好的学习方式,也是实际项目中的迭代方法。
4.1 最简单的 Agent:带工具调用的循环
我们首先实现一个"思考-行动-观察"循环的 Agent,这是所有复杂 Agent 的基础:
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage
# 1. 定义工具
@tool
def search(query: str) -> str:
"""搜索网络获取信息"""
# 实际项目中这里调用真实的搜索 API
return f"关于 '{query}' 的搜索结果:LangGraph 是一个基于状态机的 Agent 开发框架..."
@tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果: {result}"
except Exception as e:
return f"计算错误: {str(e)}"
tools = [search, calculate]
# 2. 让 LLM 知道可以调用哪些工具
llm_with_tools = llm.bind_tools(tools)
现在定义 State 和 Agent 节点:
class SimpleAgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
def agent_node(state: SimpleAgentState) -> SimpleAgentState:
"""Agent 节点:思考并决定下一步"""
messages = state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
# 工具节点(LangGraph 内置)
tool_node = ToolNode(tools)
接下来定义路由逻辑:
def should_continue(state: SimpleAgentState) -> str:
"""决定是继续调用工具还是结束"""
last_message = state["messages"][-1]
# 如果 LLM 决定调用工具,就去工具节点
if last_message.tool_calls:
return "tools"
# 否则结束
return "end"
最后构建 Graph:
workflow = StateGraph(SimpleAgentState)
# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
# 设置入口
workflow.set_entry_point("agent")
# 添加条件边:Agent 决定是调用工具还是结束
workflow.add_conditional_edges(
"agent",
should_continue,
{
"tools": "tools",
"end": END
}
)
# 工具执行完总是回到 Agent 继续思考
workflow.add_edge("tools", "agent")
app = workflow.compile()
运行这个 Agent:
result = app.invoke({
"messages": [
HumanMessage(content="搜索 LangGraph 的最新版本是什么,然后计算 123 * 456 等于多少")
]
})
print("最终回答:", result["messages"][-1].content)
这个简单的 Agent 已经具备了:
- ✅ 工具调用能力
- ✅ 多轮思考循环
- ✅ 自动终止机制
4.2 可视化工作流
LangGraph 支持将 Graph 导出为 Mermaid 图,这对于调试和文档非常有用:
from IPython.display import Image, display
# 生成 Mermaid 图
graph_image = app.get_graph().draw_mermaid_png()
# 保存到文件
with open("workflow.png", "wb") as f:
f.write(graph_image)
你也可以直接打印 Mermaid 源码:
print(app.get_graph().draw_mermaid())
4.3 流式输出与事件监听
对于用户体验来说,流式输出非常重要。LangGraph 支持多种流式模式:
# 流式输出每个节点的结果
for output in app.stream({
"messages": [HumanMessage(content="什么是 LangGraph?")]
}):
for key, value in output.items():
print(f"=== 节点: {key} ===")
if "messages" in value:
for msg in value["messages"]:
if isinstance(msg, AIMessage):
print(f"AI: {msg.content[:100]}...")
elif hasattr(msg, 'name'):
print(f"工具 {msg.name}: {msg.content[:100]}...")
你还可以监听更细粒度的事件:
async for event in app.astream_events(
{"messages": [HumanMessage(content="计算 1+1")]},
version="v1"
):
kind = event["event"]
if kind == "on_chain_start":
print(f"开始执行: {event['name']}")
elif kind == "on_chain_end":
print(f"执行完成: {event['name']}")
elif kind == "on_tool_start":
print(f"调用工具: {event['name']}")
五、多 Agent 协作模式
现在我们进入真正有趣的部分:多个 specialized Agent 协作完成复杂任务。
5.1 模式一:顺序执行流水线
最简单的多 Agent 模式是顺序执行,每个 Agent 负责一个特定的步骤:
让我们定义一个完整的技术文章生成系统:
class ArticleState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
topic: str
outline: str
research_notes: str
code_examples: str
article_draft: str
final_article: str
规划 Agent:
def planner_agent(state: ArticleState) -> ArticleState:
"""规划 Agent:生成文章大纲"""
system_prompt = """你是一个技术文章规划专家。根据用户提供的主题,生成详细的文章大纲。
大纲应该包含:
1. 文章的核心观点
2. 主要章节结构
3. 每个章节需要的内容类型
4. 建议的代码示例类型
"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"主题: {state['topic']}")
]
response = llm.invoke(messages)
return {
"outline": response.content,
"messages": [response]
}
研究 Agent:
def research_agent(state: ArticleState) -> ArticleState:
"""研究 Agent:收集资料"""
system_prompt = """你是一个研究专家。根据文章大纲,收集相关的技术资料。
包括:最新技术动态、官方文档要点、最佳实践建议等。"""
# 这里可以集成搜索工具
# search_results = search_tool.run(...)
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"大纲: {state['outline']}")
]
response = llm.invoke(messages)
return {
"research_notes": response.content,
"messages": [response]
}
编码 Agent:
def code_agent(state: ArticleState) -> ArticleState:
"""编码 Agent:生成代码示例"""
system_prompt = """你是一个资深 Python 开发者。根据研究资料和大纲,生成高质量的代码示例。
要求:
1. 代码可以直接运行
2. 包含详细的注释
3. 有错误处理
4. 遵循 PEP 8 规范"""
prompt = f"""
主题: {state['topic']}
大纲: {state['outline']}
研究资料: {state['research_notes']}
"""
response = llm.invoke([SystemMessage(content=system_prompt),
HumanMessage(content=prompt)])
return {
"code_examples": response.content,
"messages": [response]
}
写作 Agent:
def writer_agent(state: ArticleState) -> ArticleState:
"""写作 Agent:整合所有内容生成最终文章"""
system_prompt = """你是一个技术作家。将所有资料整合成一篇高质量的技术文章。"""
prompt = f"""
主题: {state['topic']}
大纲: {state['outline']}
研究资料: {state['research_notes']}
代码示例: {state['code_examples']}
请写出一篇完整的技术文章。
"""
response = llm.invoke([SystemMessage(content=system_prompt),
HumanMessage(content=prompt)])
return {
"final_article": response.content,
"messages": [response]
}
最后构建流水线:
workflow = StateGraph(ArticleState)
workflow.add_node("planner", planner_agent)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", code_agent)
workflow.add_node("writer", writer_agent)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "coder")
workflow.add_edge("coder", "writer")
workflow.add_edge("writer", END)
app = workflow.compile()
运行这个系统:
result = app.invoke({
"topic": "使用 LangGraph 构建多智能体系统"
})
print("文章生成完成!")
print("=" * 50)
print(result["final_article"])
5.2 模式二:主管 + 专家团队
更高级的模式是有一个"主管 Agent"协调多个"专家 Agent"。主管负责理解任务、分配工作、整合结果。
class SupervisorState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
current_task: str
next_agent: str
final_output: str
# 定义可用的专家
EXPERTS = ["researcher", "coder", "writer", "tester"]
def supervisor_agent(state: SupervisorState) -> SupervisorState:
"""主管 Agent:决定下一步调用哪个专家"""
system_prompt = f"""你是一个项目经理。你的团队有以下专家成员:
- researcher: 研究专家,负责收集信息
- coder: 编码专家,负责编写代码
- writer: 写作专家,负责文档撰写
- tester: 测试专家,负责验证质量
根据当前任务状态,决定下一步应该调用哪个专家,或者任务是否完成。
只用 JSON 格式回答:
{{"next_agent": "专家名称或 'FINISH'", "reason": "决策理由"}}
"""
messages = [SystemMessage(content=system_prompt)] + state["messages"]
response = llm.invoke(messages)
# 解析 LLM 的决策
import json
try:
decision = json.loads(response.content)
next_agent = decision["next_agent"]
except:
next_agent = "FINISH"
return {
"next_agent": next_agent,
"messages": [response]
}
然后定义路由函数:
def route_to_expert(state: SupervisorState) -> str:
"""根据主管的决定路由到相应的专家"""
next_agent = state["next_agent"]
if next_agent == "FINISH":
return "end"
return next_agent
构建 Graph:
workflow = StateGraph(SupervisorState)
# 添加节点
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", code_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("tester", tester_agent)
# 设置入口
workflow.set_entry_point("supervisor")
# 主管可以路由到任何专家
workflow.add_conditional_edges(
"supervisor",
route_to_expert,
{
"researcher": "researcher",
"coder": "coder",
"writer": "writer",
"tester": "tester",
"end": END
}
)
# 每个专家完成后都回到主管继续决策
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.add_edge("writer", "supervisor")
workflow.add_edge("tester", "supervisor")
app = workflow.compile()
这种模式的优势是:
- ✅ 动态调整执行顺序
- ✅ 主管可以根据情况回溯或重试
- ✅ 可以处理非预期的复杂情况
- ✅ 扩展性强,增加新专家不需要修改现有结构
5.3 模式三:并行执行
对于可以并行处理的任务,LangGraph 支持分支和汇合:
def parallel_branches(state: ArticleState) -> ArticleState:
"""并行分支:同时启动研究和编码"""
# 这个节点可以同时触发多个下游节点
return state
workflow.add_edge("planner", "parallel_branches")
workflow.add_edge("parallel_branches", "researcher")
workflow.add_edge("parallel_branches", "coder")
workflow.add_edge(["researcher", "coder"], "writer") # 两个都完成后才到 writer
不过要注意:真正的并行执行在 LangGraph 中需要配置并发策略,目前更多是逻辑上的并行执行。
(第二部分完,约 2300 字)