图API完全指南
图API的核心概念
Graph API是LangGraph的主要编程模式,基于节点和边构建工作流图。
StateGraph类
from langgraph.graph import StateGraph
from typing import TypedDict
class MyState(TypedDict):
data: str
processed: bool
# 创建图
graph = StateGraph(MyState)
添加节点
def node_function(state: MyState) -> MyState:
"""节点函数的签名"""
# 处理状态
state["processed"] = True
return state
# 添加节点
graph.add_node("my_node", node_function)
# 也可以添加多个节点
graph.add_node("node_a", func_a)
graph.add_node("node_b", func_b)
graph.add_node("node_c", func_c)
添加边
1. 直连边
from langgraph.graph import START, END
# 从START到节点
graph.add_edge(START, "node_a")
# 节点之间
graph.add_edge("node_a", "node_b")
# 到达END
graph.add_edge("node_b", END)
2. 条件边
from typing import Literal
def routing_function(state: MyState) -> Literal["path_a", "path_b"]:
"""根据状态决定路由"""
if state["data"] == "special":
return "path_a"
else:
return "path_b"
graph.add_conditional_edges(
"decision_node",
routing_function,
{
"path_a": "node_a",
"path_b": "node_b"
}
)
高级特性
1. Self-loops(循环)
# 节点可以循环回自己,实现重试逻辑
graph.add_edge("retry_node", "retry_node")
# 通过条件边控制何时停止
graph.add_conditional_edges(
"retry_node",
lambda state: "retry" if not state["success"] else "next",
{
"retry": "retry_node",
"next": "next_node"
}
)
2. 多个起点
# 可以从多个节点开始,但通常从START开始
graph.add_edge(START, "node_a")
graph.add_edge(START, "node_b") # 不建议,通常只有一个起点
3. 合并多条路径
# 多个节点可以连向同一个节点
graph.add_edge("node_a", "merge_node")
graph.add_edge("node_b", "merge_node")
graph.add_edge("node_c", "merge_node")
Reducers(状态减并)
from typing import Annotated
from operator import add
class AgentState(TypedDict):
# 使用reducer自动合并多个值
messages: Annotated[list, add]
iterations: int
def process(state: AgentState):
# 添加消息,会自动通过add操作符合并
return {"messages": [new_message]}
编译和执行
# 编译图
app = graph.compile()
# 同步执行
result = app.invoke({"data": "test", "processed": False})
# 流式执行
for output in app.stream({"data": "test", "processed": False}):
print(output)
# 异步执行
async def async_run():
result = await app.ainvoke({"data": "test", "processed": False})
return result
可视化
# 生成Mermaid图
mermaid_str = app.get_graph().to_mermaid()
print(mermaid_str)
# 生成PNG图像
try:
image = app.get_graph().draw_mermaid_png()
# 保存图像
except Exception as e:
print(f"需要安装: pip install pillow")
实际示例:数据处理流水线
from typing import TypedDict, Annotated, Literal
from operator import add
from langgraph.graph import StateGraph, START, END
class PipelineState(TypedDict):
raw_data: str
validated: bool
processed: bool
results: Annotated[list, add]
errors: Annotated[list, add]
def validate_data(state: PipelineState) -> dict:
"""验证数据"""
try:
# 验证逻辑
return {"validated": True}
except Exception as e:
return {"errors": [str(e)]}
def process_data(state: PipelineState) -> dict:
"""处理数据"""
try:
result = f"Processed: {state['raw_data']}"
return {
"processed": True,
"results": [result]
}
except Exception as e:
return {"errors": [str(e)]}
def check_status(state: PipelineState) -> Literal["success", "failure"]:
"""检查处理状态"""
if state["errors"]:
return "failure"
else:
return "success"
def handle_error(state: PipelineState) -> dict:
"""错误处理"""
return {"results": ["处理失败"] + state["errors"]}
# 构建流水线
graph = StateGraph(PipelineState)
graph.add_node("validate", validate_data)
graph.add_node("process", process_data)
graph.add_node("check", check_status)
graph.add_node("error_handler", handle_error)
graph.add_edge(START, "validate")
graph.add_edge("validate", "process")
graph.add_edge("process", "check")
graph.add_conditional_edges(
"check",
lambda state: "success" if not state["errors"] else "failure",
{
"success": END,
"failure": "error_handler"
}
)
graph.add_edge("error_handler", END)
# 运行
app = graph.compile()
result = app.invoke({
"raw_data": "sample data",
"validated": False,
"processed": False,
"results": [],
"errors": []
})
print(result)
性能优化
1. 并行节点
虽然单个invoke是顺序的,但可以在节点内部使用并行:
import asyncio
def parallel_node(state):
"""节点内部的并行处理"""
async def async_work():
tasks = [async_task_1(), async_task_2(), async_task_3()]
results = await asyncio.gather(*tasks)
return results
results = asyncio.run(async_work())
return {"results": results}
2. 状态优化
# 只返回改变的字段
def optimized_node(state: MyState) -> dict:
# 只返回修改过的值,not覆盖整个状态
return {"processed": True} # 只改这个字段
最佳实践
✅ 每个节点只做一件事 ✅ 清晰地定义状态结构 ✅ 使用类型提示 ✅ 添加适当的错误处理 ✅ 使用条件边而不是if-else逻辑 ✅ 测试每个节点 ✅ 使用描述性的节点名称
常见错误
❌ 修改state后忘记返回 ❌ 节点做太多事情 ❌ 状态结构不清晰 ❌ 缺少错误处理 ❌ 循环没有退出条件