什么是 LangGraph

LangGraph 是 LangChain 生态系统中的一个框架,专门用于构建有状态的、多步骤的 AI 应用程序。它通过图(Graph)的方式来定义和管理复杂的 AI 工作流,让开发者能够创建具有循环、条件分支和持久化状态的智能体系统。

LangGraph 的核心理念是将 AI 应用程序建模为状态机,其中节点代表操作步骤,边代表流程控制,状态在节点之间传递和更新。

核心概念

State(状态)

State 是在整个工作流中传递和更新的数据结构。通常使用 TypedDict 定义:

from typing import TypedDict, Annotated
from langgraph.graph import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    user_input: str
    analysis_result: str

Node(节点)

Node 是执行具体操作的函数,接收当前状态并返回更新后的状态:

def research_node(state: AgentState):
    # 执行研究任务
    result = do_research(state['user_input'])
    return {
        "analysis_result": result,
        "messages": [("assistant", f"研究完成:{result}")]
    }

Edge(边)

Edge 定义节点之间的连接关系:

  • Normal Edge(普通边):直接连接两个节点
  • Conditional Edge(条件边):根据状态决定下一个节点
from langgraph.graph import StateGraph, END

# 普通边
workflow.add_edge("node_a", "node_b")

# 条件边
def should_continue(state):
    if state['done']:
        return END
    return "continue_node"

workflow.add_conditional_edges(
    "decision_node",
    should_continue
)

Graph(图)

Graph 是完整的工作流定义,包含所有节点、边和状态管理:

from langgraph.graph import StateGraph

workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("respond", respond_node)

# 定义边
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "respond")

# 设置入口点
workflow.set_entry_point("research")

# 编译图
app = workflow.compile()

核心特性

1. 状态持久化

LangGraph 支持状态的持久化存储,可以实现:

  • 会话恢复
  • 长时间运行的任务
  • 断点续传
from langgraph.checkpoint.sqlite import SqliteSaver

# 使用 SQLite 作为检查点存储
memory = SqliteSaver.from_conn_string("checkpoints.db")

app = workflow.compile(checkpointer=memory)

# 运行时指定线程 ID
config = {"configurable": {"thread_id": "conversation-1"}}
result = app.invoke({"messages": [("user", "你好")]}, config)

2. 循环和迭代

支持在图中创建循环,实现迭代式的任务处理:

def should_continue(state):
    if len(state['messages']) > 10:
        return END
    if state['task_done']:
        return END
    return "continue_processing"

workflow.add_conditional_edges(
    "process_node",
    should_continue,
    {
        "continue_processing": "process_node",  # 循环回自己
        END: END
    }
)

3. 人机协作(Human-in-the-Loop)

可以在工作流中插入人工干预节点:

from langgraph.checkpoint.memory import MemorySaver

workflow.add_node("human_review", human_review_node)
workflow.add_edge("agent_action", "human_review")

# 使用中断点暂停执行
workflow.add_edge("human_review", "continue_action")

app = workflow.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["human_review"]  # 在此节点前暂停
)

# 首次运行,会在 human_review 前暂停
result = app.invoke(input_data, config)

# 人工审核后继续
result = app.invoke(None, config)  # 从暂停点继续

4. 条件分支

根据状态动态决定执行路径:

def route_question(state):
    question_type = classify_question(state['question'])
    
    if question_type == "math":
        return "math_solver"
    elif question_type == "search":
        return "web_search"
    else:
        return "general_qa"

workflow.add_conditional_edges(
    "classifier",
    route_question,
    {
        "math_solver": "math_node",
        "web_search": "search_node",
        "general_qa": "qa_node"
    }
)

与 LangChain 的关系

LangGraph 是 LangChain 生态系统的一部分,两者的关系:

  • LangChain:提供基础组件(LLM、Prompt、Chain、Tool 等)
  • LangGraph:提供工作流编排和状态管理能力

LangGraph 可以无缝集成 LangChain 的组件:

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph

llm = ChatOpenAI(model="gpt-4")

def agent_node(state):
    messages = state['messages']
    response = llm.invoke(messages)
    return {"messages": [response]}

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)

实际应用场景

智能客服系统

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END, add_messages

class CustomerServiceState(TypedDict):
    messages: Annotated[list, add_messages]
    intent: str
    resolved: bool

def classify_intent(state):
    # 意图识别
    last_message = state['messages'][-1].content
    intent = intent_classifier(last_message)
    return {"intent": intent}

def handle_inquiry(state):
    # 处理查询
    response = handle_customer_inquiry(state['intent'])
    return {
        "messages": [("assistant", response)],
        "resolved": True
    }

def escalate_to_human(state):
    # 转人工
    return {
        "messages": [("assistant", "正在为您转接人工客服...")],
        "resolved": True
    }

def should_escalate(state):
    if state['intent'] == "complex" or state['intent'] == "complaint":
        return "escalate"
    return "handle"

# 构建工作流
workflow = StateGraph(CustomerServiceState)
workflow.add_node("classify", classify_intent)
workflow.add_node("handle", handle_inquiry)
workflow.add_node("escalate", escalate_to_human)

workflow.set_entry_point("classify")
workflow.add_conditional_edges(
    "classify",
    should_escalate,
    {
        "handle": "handle",
        "escalate": "escalate"
    }
)
workflow.add_edge("handle", END)
workflow.add_edge("escalate", END)

app = workflow.compile()

研究助手

class ResearchState(TypedDict):
    question: str
    research_data: list
    analysis: str
    final_report: str
    iterations: int

def search_node(state):
    # 搜索相关信息
    results = web_search(state['question'])
    return {"research_data": results}

def analyze_node(state):
    # 分析数据
    analysis = llm_analyze(state['research_data'])
    return {"analysis": analysis}

def should_search_more(state):
    if state['iterations'] >= 3:
        return "report"
    if is_enough_data(state['research_data']):
        return "report"
    return "search_more"

def report_node(state):
    # 生成最终报告
    report = generate_report(state['analysis'])
    return {"final_report": report}

workflow = StateGraph(ResearchState)
workflow.add_node("search", search_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("report", report_node)

workflow.set_entry_point("search")
workflow.add_edge("search", "analyze")
workflow.add_conditional_edges(
    "analyze",
    should_search_more,
    {
        "search_more": "search",  # 循环搜索
        "report": "report"
    }
)
workflow.add_edge("report", END)

app = workflow.compile()

多工具 Agent

from langchain_core.tools import tool

@tool
def calculator(expression: str) -> str:
    """计算数学表达式"""
    return str(eval(expression))

@tool
def search(query: str) -> str:
    """搜索信息"""
    return web_search_api(query)

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    tool_calls: list

def agent_node(state):
    # Agent 决策
    response = llm_with_tools.invoke(state['messages'])
    return {"messages": [response]}

def tool_node(state):
    # 执行工具
    last_message = state['messages'][-1]
    results = execute_tools(last_message.tool_calls)
    return {"messages": results}

def should_continue(state):
    last_message = state['messages'][-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "tools"
    return END

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tools": "tools",
        END: END
    }
)
workflow.add_edge("tools", "agent")  # 工具执行后返回 agent

app = workflow.compile()

安装和快速开始

安装

pip install langgraph
pip install langchain-openai  # 如果使用 OpenAI

快速示例

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END, add_messages
from langchain_openai import ChatOpenAI

# 定义状态
class State(TypedDict):
    messages: Annotated[list, add_messages]

# 创建 LLM
llm = ChatOpenAI(model="gpt-4")

# 定义节点
def chatbot(state: State):
    return {"messages": [llm.invoke(state["messages"])]}

# 构建图
workflow = StateGraph(State)
workflow.add_node("chatbot", chatbot)
workflow.set_entry_point("chatbot")
workflow.add_edge("chatbot", END)

# 编译并运行
app = workflow.compile()

result = app.invoke({
    "messages": [("user", "你好,介绍一下你自己")]
})

print(result["messages"][-1].content)

高级特性

并行执行

多个节点可以并行执行:

from langgraph.graph import StateGraph

workflow = StateGraph(State)
workflow.add_node("task_a", task_a_node)
workflow.add_node("task_b", task_b_node)
workflow.add_node("merge", merge_node)

workflow.set_entry_point("task_a")
workflow.set_entry_point("task_b")  # 并行入口

workflow.add_edge("task_a", "merge")
workflow.add_edge("task_b", "merge")
workflow.add_edge("merge", END)

子图(Subgraph)

可以将复杂的工作流模块化:

# 定义子图
sub_workflow = StateGraph(SubState)
sub_workflow.add_node("step1", step1_node)
sub_workflow.add_node("step2", step2_node)
sub_workflow.set_entry_point("step1")
sub_workflow.add_edge("step1", "step2")
sub_workflow.add_edge("step2", END)

sub_graph = sub_workflow.compile()

# 在主图中使用子图
main_workflow = StateGraph(MainState)
main_workflow.add_node("subprocess", sub_graph)
main_workflow.add_edge("start", "subprocess")

流式输出

支持流式处理,实时获取输出:

inputs = {"messages": [("user", "讲个故事")]}

# 流式输出每个节点的结果
for output in app.stream(inputs):
    for key, value in output.items():
        print(f"节点 '{key}':")
        print(value)
        print("\n---\n")

时间旅行和回放

利用检查点机制实现时间旅行:

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "1"}}

# 第一次运行
app.invoke(input1, config)

# 第二次运行
app.invoke(input2, config)

# 获取状态历史
state_history = app.get_state_history(config)

# 回到之前的状态
for state in state_history:
    print(f"时间点: {state.created_at}")
    print(f"状态: {state.values}")

最佳实践

1. 状态设计

保持状态简洁,只包含必要信息:

# 好的设计
class State(TypedDict):
    user_input: str
    context: list
    result: str

# 避免过度复杂
class BadState(TypedDict):
    user_input: str
    intermediate_step_1: str
    intermediate_step_2: str
    intermediate_step_3: str
    # ... 太多中间状态

2. 错误处理

在节点中添加错误处理:

def robust_node(state):
    try:
        result = risky_operation(state['input'])
        return {"result": result, "error": None}
    except Exception as e:
        return {"result": None, "error": str(e)}

def error_router(state):
    if state.get('error'):
        return "error_handler"
    return "next_node"

3. 测试策略

单独测试每个节点:

def test_research_node():
    test_state = {
        "question": "测试问题",
        "research_data": []
    }
    result = research_node(test_state)
    assert "research_data" in result
    assert len(result["research_data"]) > 0

4. 性能优化

  • 使用异步节点处理 I/O 密集型任务
  • 合理使用并行执行
  • 限制循环次数,避免无限循环
async def async_node(state):
    result = await async_operation()
    return {"result": result}

def limit_iterations(state):
    if state.get('iterations', 0) >= MAX_ITERATIONS:
        return END
    return "continue"

LangGraph vs 其他框架

LangGraph vs CrewAI

  • LangGraph:底层工作流框架,灵活但需要更多代码
  • CrewAI:高层多智能体框架,更关注角色和任务抽象

LangGraph vs LangChain Expression Language (LCEL)

  • LCEL:链式调用,适合简单的线性流程
  • LangGraph:图结构,适合复杂的分支和循环逻辑

选择建议

  • 简单链式调用 → LCEL
  • 需要循环、分支、状态管理 → LangGraph
  • 需要角色化多智能体 → CrewAI + LangGraph

注意事项

  1. 状态管理:合理设计状态结构,避免状态过于庞大
  2. 循环控制:必须有明确的终止条件,防止无限循环
  3. 内存使用:持久化检查点会占用存储空间,定期清理
  4. 调试复杂度:复杂图的调试较困难,建议使用 LangSmith 追踪
  5. 版本兼容性:LangGraph 更新较快,注意版本兼容性

可视化工具

LangGraph 支持将工作流可视化:

from IPython.display import Image, display

# 生成图的可视化
display(Image(app.get_graph().draw_mermaid_png()))

生成 Mermaid 图:

print(app.get_graph().draw_mermaid())

调试和监控

使用 LangSmith 进行追踪:

import os

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"

# 运行后可以在 LangSmith 中查看完整追踪
result = app.invoke(inputs)

资源链接

  • 官方网站:https://www.langchain.com/langgraph
  • 官方文档:https://langchain-ai.github.io/langgraph/
  • GitHub 仓库:https://github.com/langchain-ai/langgraph
  • LangChain 官网:https://www.langchain.com/
  • 教程和示例:https://github.com/langchain-ai/langgraph/tree/main/examples

小结

LangGraph 提供了一个强大的框架来构建复杂的、有状态的 AI 应用程序。通过图结构、状态管理和灵活的控制流,LangGraph 让开发者能够创建具有循环、分支、人机协作等高级特性的智能体系统。它是构建生产级 AI 应用的理想选择,特别适合需要复杂决策逻辑和长期状态维护的场景。