跳到主要内容

流式处理

流式处理基础

流式处理允许在执行过程中逐步获取结果。

基本流式处理

# 同步流式处理
for output in app.stream(initial_state):
print(output)
# 输出: {"node_name": {...state...}}

# 异步流式处理
async for output in app.astream(initial_state):
print(output)

流模式

1. Updates模式(默认)

for output in app.stream(initial_state, stream_mode="updates"):
# 每个节点输出一次
# {"node1": {...}}
# {"node2": {...}}
pass

2. Values模式

for output in app.stream(initial_state, stream_mode="values"):
# 完整的状态
# {...full_state...}
pass

3. Debug模式

for output in app.stream(initial_state, stream_mode="debug"):
# 包含执行信息
# {"type": "on_chain_start", ...}
# {"type": "on_chain_end", ...}
pass

实际用例

显示执行进度

def stream_with_progress(query):
"""流式处理并显示进度"""
steps = []

for output in app.stream({"query": query}):
for node_name, node_output in output.items():
steps.append(node_name)
print(f"执行: {node_name}")

if node_output.get("status") == "error":
print(f"错误: {node_output.get('error')}")
return None

return steps

实时输出到前端

def stream_to_frontend(request):
"""通过WebSocket流式输出"""
query = request.json["query"]

for output in app.stream({"query": query}):
for node_name, state in output.items():
# 发送到前端
send_to_websocket({
"node": node_name,
"state": state
})

性能特性

  • 流式处理不会加快执行速度
  • 允许更早地获取中间结果
  • 改善用户体验(实时反馈)
  • 减少内存占用(不需要等待完整结果)

下一步