流式处理
流式处理基础
流式处理允许在执行过程中逐步获取结果。
基本流式处理
# 同步流式处理
for output in app.stream(initial_state):
print(output)
# 输出: {"node_name": {...state...}}
# 异步流式处理
async for output in app.astream(initial_state):
print(output)
流模式
1. Updates模式(默认)
for output in app.stream(initial_state, stream_mode="updates"):
# 每个节点输出一次
# {"node1": {...}}
# {"node2": {...}}
pass
2. Values模式
for output in app.stream(initial_state, stream_mode="values"):
# 完整的状态
# {...full_state...}
pass
3. Debug模式
for output in app.stream(initial_state, stream_mode="debug"):
# 包含执行信息
# {"type": "on_chain_start", ...}
# {"type": "on_chain_end", ...}
pass
实际用例
显示执行进度
def stream_with_progress(query):
"""流式处理并显示进度"""
steps = []
for output in app.stream({"query": query}):
for node_name, node_output in output.items():
steps.append(node_name)
print(f"执行: {node_name}")
if node_output.get("status") == "error":
print(f"错误: {node_output.get('error')}")
return None
return steps
实时输出到前端
def stream_to_frontend(request):
"""通过WebSocket流式输出"""
query = request.json["query"]
for output in app.stream({"query": query}):
for node_name, state in output.items():
# 发送到前端
send_to_websocket({
"node": node_name,
"state": state
})
性能特性
- 流式处理不会加快执行速度
- 允许更早地获取中间结果
- 改善用户体验(实时反馈)
- 减少内存占用(不需要等待完整结果)