前言

2024 年被称为 AI Agent 元年。从早期的 AutoGPT 到后来的 CrewAI、AutoGen,再到如今的 LangGraph,多智能体系统的开发框架经历了快速的迭代。我在 2023 年第一次尝试用 AutoGPT 构建自动化任务系统时,整个流程充满了不确定性:Agent 经常"走神"、任务执行到一半就偏离主题、错误处理几乎为零。

到了 2024 年中,我开始使用 LangGraph,这种基于状态机的设计思路彻底改变了我对 Agent 开发的认知。与传统的"让 LLM 自由发挥"不同,LangGraph 让你可以:

  • 精确控制 Agent 的执行路径
  • 在节点间共享完整的状态信息
  • 实现条件分支、循环、重试等复杂逻辑
  • 可视化整个工作流的执行过程

过去半年里,我用 LangGraph 构建了十几个生产级别的多智能体系统:从自动化代码审查工具到技术文档生成器,再到市场研究助理。这些系统的共同特点是:稳定性高、可观测性强、出错时可以优雅回滚。

这篇文章是我对 LangGraph 多智能体开发的系统性总结。从最基础的 State 定义,到复杂的多 Agent 协作模式,再到生产环境的部署优化,我会带你一步步构建一个完整的、可运行的多智能体工作流系统。

LangGraph 多智能体工作流架构

一、为什么选择 LangGraph?

在深入技术细节之前,我们需要先回答一个问题:市面上有这么多多智能体框架,为什么我最终选择了 LangGraph?

1.1 传统 Agent 框架的痛点

让我先分享几个真实的"踩坑"经历:

故事一:CrewAI 的"集体幻觉"

去年用 CrewAI 做一个市场分析系统,三个 Agent(研究员、分析师、写作者)协作。一开始效果很好,但当任务涉及需要精确数字的财报分析时,我发现三个 Agent 会"互相说服",最终产出完全虚构的财务数据。更糟糕的是,整个执行过程是一个黑盒,我无法定位是哪个环节出了问题。

故事二:AutoGen 的无限循环

在用 AutoGen 构建代码生成系统时,我经常遇到"代码评审 Agent"和"代码编写 Agent"陷入无限争论的情况。一个说"这个代码有性能问题",另一个反驳"这是在可读性和性能之间的权衡",几十个来回后还在原地踏步。AutoGen 缺乏一个明确的"终止条件"机制。

故事三:纯 LangChain 的复杂度

我也尝试过用纯 LangChain 的 Chain 来构建工作流。简单的线性流程还好,但一旦需要循环、条件分支、错误重试,代码就会变得极其复杂,可维护性直线下降。

1.2 LangGraph 的核心优势

LangGraph 解决了上述所有问题,它的核心设计哲学是:“Agent 不应该是黑盒魔法,而应该是可观测、可控制的状态机。”

优势一:显式的状态管理

在 LangGraph 中,所有节点共享同一个 State 对象。每个节点执行后只需要返回 State 的更新部分,LangGraph 会自动合并。这意味着你可以在任何节点访问完整的执行历史,包括之前所有 Agent 的对话、工具调用的结果、中间产物等。

优势二:灵活的控制流

LangGraph 支持:

  • 条件边:根据当前状态动态决定下一个节点
  • 循环:支持任务重试、迭代优化
  • 分支:并行执行多个任务
  • 子图:将复杂逻辑模块化

优势三:原生的可观测性

LangGraph 内置了执行轨迹追踪功能,你可以清晰地看到每个节点的输入输出、执行时间、状态变化。这对于调试和优化至关重要。

优势四:与 LangChain 生态的无缝集成

如果你已经在使用 LangChain,迁移到 LangGraph 的成本非常低。所有的 Chain、Tool、LLM 都可以直接在 LangGraph 节点中使用。

1.3 LangGraph 的适用场景

LangGraph 并不是银弹,它最适合以下场景:

多步骤复杂任务:需要多个步骤、多种工具协作完成的任务 ✅ 需要精确控制:对执行路径有明确要求,不能让 LLM"自由发挥" ✅ 需要可观测性:需要审计、调试、优化执行过程 ✅ 多 Agent 协作:多个 specialized Agent 分工合作

而对于简单的单轮问答或者线性流程,直接用 LangChain 或者原生 LLM 调用可能更简单。

二、LangGraph 核心概念详解

在开始写代码之前,让我们先理解 LangGraph 的几个核心概念。这些概念是构建一切的基础,理解了它们,后面的代码就会变得非常直观。

2.1 State(状态)

State 是 LangGraph 的核心。它是一个在所有节点之间共享的数据结构,包含了工作流执行过程中的所有信息。

from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    task_description: str
    research_results: str
    code: str
    tests: str
    final_report: str
    iteration_count: int

这里有几个关键点:

  1. Annotated 类型注解operator.add 表示这个字段会被追加而不是覆盖。对于 messages 字段来说,每个节点只需要返回新的消息,LangGraph 会自动添加到列表中。

  2. 数据持久化:State 在整个工作流执行过程中持久存在,任何节点都可以读写任何字段。

  3. 增量更新:节点只需要返回它修改的字段,不需要返回整个 State。

你也可以用 Pydantic 来定义 State,获得类型检查和数据验证:

from pydantic import BaseModel, Field
from typing import List, Optional

class AgentState(BaseModel):
    messages: List[BaseMessage] = Field(default_factory=list)
    task_description: Optional[str] = None
    research_results: Optional[str] = None
    code: Optional[str] = None
    iteration_count: int = 0

2.2 Node(节点)

Node 是工作流中的执行单元。每个节点接收 State 作为输入,执行某些逻辑,然后返回 State 的更新。

一个典型的节点函数:

def research_agent(state: AgentState) -> AgentState:
    """研究 Agent:搜索网络,收集信息"""
    task = state["task_description"]
    
    # 调用 LLM + 搜索工具
    results = search_tool.run(f"研究关于 {task} 的最新信息")
    
    # 返回 State 的更新部分
    return {
        "research_results": results,
        "messages": [HumanMessage(content=results, role="researcher")]
    }

节点可以是:

  • LLM 调用:让某个 Agent 思考或生成内容
  • 工具调用:执行 Python 代码、调用 API、查询数据库等
  • 纯函数:数据转换、格式处理、状态检查
  • 子图:调用另一个 Graph 作为子流程

2.3 Edge(边)

Edge 定义了节点之间的连接关系,决定了执行流程。

普通边:从 A 节点执行完总是到 B 节点

graph.add_edge("researcher", "coder")

条件边:根据当前状态动态决定下一个节点

def should_continue(state: AgentState) -> str:
    """决定是继续迭代还是结束"""
    if state["iteration_count"] >= 3:
        return "end"
    if "需要更多研究" in state["messages"][-1].content:
        return "researcher"
    return "writer"

graph.add_conditional_edges(
    "coder",
    should_continue,
    {
        "researcher": "researcher",
        "writer": "writer",
        "end": END
    }
)

条件边是 LangGraph 最强大的特性之一,它让你可以实现:

  • 智能路由:根据任务类型选择合适的 Agent
  • 错误重试:失败时自动重试或降级
  • 质量检查:不满足要求时返回上一步
  • 提前终止:达到某些条件时提前结束

2.4 Graph(图)

Graph 是节点和边的集合,代表完整的工作流。

from langgraph.graph import StateGraph, END

# 1. 创建 Graph
workflow = StateGraph(AgentState)

# 2. 添加节点
workflow.add_node("planner", planner_agent)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", code_agent)
workflow.add_node("writer", writer_agent)

# 3. 设置入口点
workflow.set_entry_point("planner")

# 4. 添加边
workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "coder")
workflow.add_conditional_edges("coder", router)
workflow.add_edge("writer", END)

# 5. 编译 Graph
app = workflow.compile()

编译后的 Graph 可以像函数一样调用:

result = app.invoke({
    "task_description": "写一篇关于 LangGraph 的技术文章",
    "iteration_count": 0
})

也支持流式输出:

for output in app.stream({
    "task_description": "写一篇关于 LangGraph 的技术文章"
}):
    for key, value in output.items():
        print(f"节点 {key} 完成: {value}")

2.5 Checkpointer(检查点)

Checkpointer 允许你持久化工作流的执行状态,支持:

  • 暂停和恢复:工作流可以随时暂停,之后从暂停处继续
  • 时间旅行:可以回到之前的任何检查点,从那里重新执行
  • 人工介入:在关键点等待人类审核或输入
  • 错误恢复:失败时可以从最近的检查点重试
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)

# 每个执行线程需要唯一的 thread_id
config = {"configurable": {"thread_id": "1"}}
result = app.invoke(inputs, config=config)

# 之后可以用同一个 thread_id 继续
next_result = app.invoke(next_inputs, config=config)

理解了这些核心概念后,我们就可以开始构建真正的多智能体工作流了。

三、环境搭建与基础配置

在开始构建多智能体系统之前,让我们先准备好开发环境。

3.1 安装依赖

pip install langgraph langchain langchain-openai python-dotenv

主要依赖说明:

  • langgraph:核心框架
  • langchain:提供 Chains、Tools、Message 等基础组件
  • langchain-openai:OpenAI 模型集成
  • python-dotenv:环境变量管理

3.2 基础配置

创建 .env 文件:

OPENAI_API_KEY=your-api-key-here
OPENAI_API_BASE=https://api.openai.com/v1
OPENAI_MODEL_NAME=gpt-4o

然后在代码中加载:

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()

llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL_NAME", "gpt-4o"),
    temperature=0,
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url=os.getenv("OPENAI_API_BASE")
)

关于 Model 选择的建议:

  • GPT-4o:多智能体协调、复杂推理任务首选
  • GPT-4 Turbo:代码生成、技术写作,成本比 GPT-4o 低
  • Claude 3 Opus:处理长文本、生成长篇报告时效果更好
  • 本地模型:如 Llama 3 70B,适合对数据安全有要求的场景

(第一部分完,约 2200 字)

四、单 Agent 工作流实现

让我们从最简单的单 Agent 工作流开始,逐步增加复杂度。这是一个很好的学习方式,也是实际项目中的迭代方法。

4.1 最简单的 Agent:带工具调用的循环

我们首先实现一个"思考-行动-观察"循环的 Agent,这是所有复杂 Agent 的基础:

from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage

# 1. 定义工具
@tool
def search(query: str) -> str:
    """搜索网络获取信息"""
    # 实际项目中这里调用真实的搜索 API
    return f"关于 '{query}' 的搜索结果:LangGraph 是一个基于状态机的 Agent 开发框架..."

@tool
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return f"计算结果: {result}"
    except Exception as e:
        return f"计算错误: {str(e)}"

tools = [search, calculate]

# 2. 让 LLM 知道可以调用哪些工具
llm_with_tools = llm.bind_tools(tools)

现在定义 State 和 Agent 节点:

class SimpleAgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

def agent_node(state: SimpleAgentState) -> SimpleAgentState:
    """Agent 节点:思考并决定下一步"""
    messages = state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

# 工具节点(LangGraph 内置)
tool_node = ToolNode(tools)

接下来定义路由逻辑:

def should_continue(state: SimpleAgentState) -> str:
    """决定是继续调用工具还是结束"""
    last_message = state["messages"][-1]
    # 如果 LLM 决定调用工具,就去工具节点
    if last_message.tool_calls:
        return "tools"
    # 否则结束
    return "end"

最后构建 Graph:

workflow = StateGraph(SimpleAgentState)

# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

# 设置入口
workflow.set_entry_point("agent")

# 添加条件边:Agent 决定是调用工具还是结束
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tools": "tools",
        "end": END
    }
)

# 工具执行完总是回到 Agent 继续思考
workflow.add_edge("tools", "agent")

app = workflow.compile()

运行这个 Agent:

result = app.invoke({
    "messages": [
        HumanMessage(content="搜索 LangGraph 的最新版本是什么,然后计算 123 * 456 等于多少")
    ]
})

print("最终回答:", result["messages"][-1].content)

这个简单的 Agent 已经具备了:

  • ✅ 工具调用能力
  • ✅ 多轮思考循环
  • ✅ 自动终止机制

4.2 可视化工作流

LangGraph 支持将 Graph 导出为 Mermaid 图,这对于调试和文档非常有用:

from IPython.display import Image, display

# 生成 Mermaid 图
graph_image = app.get_graph().draw_mermaid_png()

# 保存到文件
with open("workflow.png", "wb") as f:
    f.write(graph_image)

你也可以直接打印 Mermaid 源码:

print(app.get_graph().draw_mermaid())

4.3 流式输出与事件监听

对于用户体验来说,流式输出非常重要。LangGraph 支持多种流式模式:

# 流式输出每个节点的结果
for output in app.stream({
    "messages": [HumanMessage(content="什么是 LangGraph?")]
}):
    for key, value in output.items():
        print(f"=== 节点: {key} ===")
        if "messages" in value:
            for msg in value["messages"]:
                if isinstance(msg, AIMessage):
                    print(f"AI: {msg.content[:100]}...")
                elif hasattr(msg, 'name'):
                    print(f"工具 {msg.name}: {msg.content[:100]}...")

你还可以监听更细粒度的事件:

async for event in app.astream_events(
    {"messages": [HumanMessage(content="计算 1+1")]},
    version="v1"
):
    kind = event["event"]
    if kind == "on_chain_start":
        print(f"开始执行: {event['name']}")
    elif kind == "on_chain_end":
        print(f"执行完成: {event['name']}")
    elif kind == "on_tool_start":
        print(f"调用工具: {event['name']}")

五、多 Agent 协作模式

现在我们进入真正有趣的部分:多个 specialized Agent 协作完成复杂任务。

5.1 模式一:顺序执行流水线

最简单的多 Agent 模式是顺序执行,每个 Agent 负责一个特定的步骤:

AgentAgentAgentAgent

让我们定义一个完整的技术文章生成系统:

class ArticleState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    topic: str
    outline: str
    research_notes: str
    code_examples: str
    article_draft: str
    final_article: str

规划 Agent

def planner_agent(state: ArticleState) -> ArticleState:
    """规划 Agent:生成文章大纲"""
    system_prompt = """你是一个技术文章规划专家。根据用户提供的主题,生成详细的文章大纲。
    大纲应该包含:
    1. 文章的核心观点
    2. 主要章节结构
    3. 每个章节需要的内容类型
    4. 建议的代码示例类型
    """
    
    messages = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=f"主题: {state['topic']}")
    ]
    
    response = llm.invoke(messages)
    return {
        "outline": response.content,
        "messages": [response]
    }

研究 Agent

def research_agent(state: ArticleState) -> ArticleState:
    """研究 Agent:收集资料"""
    system_prompt = """你是一个研究专家。根据文章大纲,收集相关的技术资料。
    包括:最新技术动态、官方文档要点、最佳实践建议等。"""
    
    # 这里可以集成搜索工具
    # search_results = search_tool.run(...)
    
    messages = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=f"大纲: {state['outline']}")
    ]
    
    response = llm.invoke(messages)
    return {
        "research_notes": response.content,
        "messages": [response]
    }

编码 Agent

def code_agent(state: ArticleState) -> ArticleState:
    """编码 Agent:生成代码示例"""
    system_prompt = """你是一个资深 Python 开发者。根据研究资料和大纲,生成高质量的代码示例。
    要求:
    1. 代码可以直接运行
    2. 包含详细的注释
    3. 有错误处理
    4. 遵循 PEP 8 规范"""
    
    prompt = f"""
    主题: {state['topic']}
    大纲: {state['outline']}
    研究资料: {state['research_notes']}
    """
    
    response = llm.invoke([SystemMessage(content=system_prompt), 
                           HumanMessage(content=prompt)])
    return {
        "code_examples": response.content,
        "messages": [response]
    }

写作 Agent

def writer_agent(state: ArticleState) -> ArticleState:
    """写作 Agent:整合所有内容生成最终文章"""
    system_prompt = """你是一个技术作家。将所有资料整合成一篇高质量的技术文章。"""
    
    prompt = f"""
    主题: {state['topic']}
    大纲: {state['outline']}
    研究资料: {state['research_notes']}
    代码示例: {state['code_examples']}
    
    请写出一篇完整的技术文章。
    """
    
    response = llm.invoke([SystemMessage(content=system_prompt),
                           HumanMessage(content=prompt)])
    return {
        "final_article": response.content,
        "messages": [response]
    }

最后构建流水线:

workflow = StateGraph(ArticleState)

workflow.add_node("planner", planner_agent)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", code_agent)
workflow.add_node("writer", writer_agent)

workflow.set_entry_point("planner")

workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "coder")
workflow.add_edge("coder", "writer")
workflow.add_edge("writer", END)

app = workflow.compile()

运行这个系统:

result = app.invoke({
    "topic": "使用 LangGraph 构建多智能体系统"
})

print("文章生成完成!")
print("=" * 50)
print(result["final_article"])

5.2 模式二:主管 + 专家团队

更高级的模式是有一个"主管 Agent"协调多个"专家 Agent"。主管负责理解任务、分配工作、整合结果。

class SupervisorState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    current_task: str
    next_agent: str
    final_output: str

# 定义可用的专家
EXPERTS = ["researcher", "coder", "writer", "tester"]

def supervisor_agent(state: SupervisorState) -> SupervisorState:
    """主管 Agent:决定下一步调用哪个专家"""
    system_prompt = f"""你是一个项目经理。你的团队有以下专家成员:
    - researcher: 研究专家,负责收集信息
    - coder: 编码专家,负责编写代码
    - writer: 写作专家,负责文档撰写
    - tester: 测试专家,负责验证质量
    
    根据当前任务状态,决定下一步应该调用哪个专家,或者任务是否完成。
    
    只用 JSON 格式回答:
    {{"next_agent": "专家名称或 'FINISH'", "reason": "决策理由"}}
    """
    
    messages = [SystemMessage(content=system_prompt)] + state["messages"]
    response = llm.invoke(messages)
    
    # 解析 LLM 的决策
    import json
    try:
        decision = json.loads(response.content)
        next_agent = decision["next_agent"]
    except:
        next_agent = "FINISH"
    
    return {
        "next_agent": next_agent,
        "messages": [response]
    }

然后定义路由函数:

def route_to_expert(state: SupervisorState) -> str:
    """根据主管的决定路由到相应的专家"""
    next_agent = state["next_agent"]
    if next_agent == "FINISH":
        return "end"
    return next_agent

构建 Graph:

workflow = StateGraph(SupervisorState)

# 添加节点
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", code_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("tester", tester_agent)

# 设置入口
workflow.set_entry_point("supervisor")

# 主管可以路由到任何专家
workflow.add_conditional_edges(
    "supervisor",
    route_to_expert,
    {
        "researcher": "researcher",
        "coder": "coder",
        "writer": "writer",
        "tester": "tester",
        "end": END
    }
)

# 每个专家完成后都回到主管继续决策
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.add_edge("writer", "supervisor")
workflow.add_edge("tester", "supervisor")

app = workflow.compile()

这种模式的优势是:

  • ✅ 动态调整执行顺序
  • ✅ 主管可以根据情况回溯或重试
  • ✅ 可以处理非预期的复杂情况
  • ✅ 扩展性强,增加新专家不需要修改现有结构

5.3 模式三:并行执行

对于可以并行处理的任务,LangGraph 支持分支和汇合:

def parallel_branches(state: ArticleState) -> ArticleState:
    """并行分支:同时启动研究和编码"""
    # 这个节点可以同时触发多个下游节点
    return state

workflow.add_edge("planner", "parallel_branches")
workflow.add_edge("parallel_branches", "researcher")
workflow.add_edge("parallel_branches", "coder")
workflow.add_edge(["researcher", "coder"], "writer")  # 两个都完成后才到 writer

不过要注意:真正的并行执行在 LangGraph 中需要配置并发策略,目前更多是逻辑上的并行执行。

(第二部分完,约 2300 字)