流式处理
流式处理的基本概念
流式处理允许应用在代理执行时实时获取更新,而无需等待整个操作完成。这对于改进用户体验、提供即时反馈以及处理长时间运行的任务至关重要。
为什么需要流式处理
| 场景 | 无流式处理 | 有流式处理 |
|---|---|---|
| 用户体验 | 用户等待数十秒,无反馈 | 实时看到进度 |
| 长任务处理 | 响应超时风险 | 可逐步接收结果 |
| 代理编排 | 主代理阻塞,子代理无法并行 | 主代理可继续,子代理进度可见 |
| 前端集成 | 必须全部加载再显示 | 可实时更新 UI |
流式 API 基础
基本用法
from deepagents import create_deep_agent
agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
tools=[search_tool],
)
# 使用 stream 方法而不是 invoke
for event in agent.stream(
{"messages": [{"role": "user", "content": "研究 AI 的最新进展"}]},
stream_mode="updates",
):
print(f"事件:{event}")
流式模式
DeepAgents 支持多种流式模式:
1. Updates 模式
返回代理状态的更新:
for event in agent.stream(
{"messages": [...]},
stream_mode="updates",
):
# event 包含代理执行过程中的状态变化
print(event)
# 输出示例:
# {'type': 'message', 'content': 'I need to search...'}
# {'type': 'tool_call', 'tool': 'search', 'args': {...}}
# {'type': 'tool_result', 'result': '...'}
2. Values 模式
返回完整的最终状态:
for state in agent.stream(
{"messages": [...]},
stream_mode="values",
):
# 在流的末尾获得完整的最终状态
print(state)
3. Debug 模式
返回详细的调试信息:
for debug_info in agent.stream(
{"messages": [...]},
stream_mode="debug",
):
# 包含完整的执行追踪信息
print(f"[DEBUG] {debug_info}")
令牌流式处理
LLM Token 流
实时获取模型生成的token:
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
stream_usage=True, # 启用 token 使用追踪
)
agent = create_deep_agent(model=model)
# 流式处理输出
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "解释量子纠缠"}]},
stream_mode="updates",
):
if "messages" in chunk:
message = chunk["messages"][-1]
if hasattr(message, "content"):
print(message.content, end="", flush=True)
代币计数
from langchain_core.callbacks import StreamingStdOutCallbackHandler
# 使用回调跟踪 token 使用
callbacks = [StreamingStdOutCallbackHandler()]
agent = create_deep_agent(
model=model,
# stream_config...
)
result = agent.invoke(
{"messages": [...]},
# callbacks=callbacks, # 如果支持
)
工具调用流式处理
追踪工具执行
def stream_with_tool_tracking(agent, user_input):
"""显示工具调用进度"""
for event in agent.stream(
{"messages": [{"role": "user", "content": user_input}]},
stream_mode="updates",
):
# 检查工具调用事件
if event.get("type") == "tool_call":
tool_name = event.get("tool", "unknown")
print(f"🔧 正在调用工具:{tool_name}")
# 检查工具结果事件
elif event.get("type") == "tool_result":
result = event.get("result", "")[:100] # 显示前 100 个字符
print(f"✓ 工具结果:{result}...")
# 显示文本消息
elif event.get("type") == "message":
print(f"💭 {event.get('content', '')}")
# 使用
stream_with_tool_tracking(agent, "为我总结最新的 AI 新闻")
子代理流式处理
这是 DeepAgents 的高级特性,允许实时追踪子代理的执行:
启用子代理流式处理
from deepagents import create_deep_agent, AsyncSubAgent
agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=[
{
"name": "researcher",
"description": "执行研究",
"system_prompt": "你是研究员",
"tools": [search_tool],
},
{
"name": "writer",
"description": "撰写报告",
"system_prompt": "你是作家",
"tools": [save_file],
}
],
)
# 启用子代理流式处理
for event in agent.stream(
{"messages": [{"role": "user", "content": "研究并撰写关于 AI 的报告"}]},
stream_mode="updates",
subgraphs=True, # 关键参数:启用子代理事件
):
# event 现在包含子代理的事件
namespace = event.get("ns", ())
if namespace:
# 这是子代理事件
subagent_name = namespace[0] if namespace else "unknown"
print(f"[{subagent_name}] {event}")
else:
# 这是主代理事件
print(f"[main] {event}")
事件命名空间
# 事件的命名空间表示其来源
ns = () # 主代理
ns = ("tools:123",) # 由主代理启动的子代理(任务ID:123)
ns = ("tools:123", "model:456") # 子代理中的模型调用
# 使用命名空间路由事件
def route_event(event):
ns = event.get("ns", ())
if not ns:
# 主代理事件
handle_main_agent_event(event)
elif len(ns) == 1:
# 子代理的顶级事件
subagent = ns[0]
handle_subagent_event(subagent, event)
else:
# 子代理内部的详细事件
handle_detailed_event(ns, event)
实时 UI 集成
前端示例 (JavaScript/React)
// 假设后端提供了 WebSocket 流式端点
async function* streamAgentResponse(userMessage) {
const response = await fetch('/api/agent/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: userMessage }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield JSON.parse(decoder.decode(value));
}
}
// 在 React 组件中使用
function AgentChat() {
const [messages, setMessages] = useState([]);
const [isLoading, setIsLoading] = useState(false);
async function handleUserInput(userMessage) {
setIsLoading(true);
let assistantMessage = '';
for await (const event of streamAgentResponse(userMessage)) {
if (event.type === 'token') {
assistantMessage += event.content;
setMessages(prev => [
...prev.slice(0, -1),
{ role: 'assistant', content: assistantMessage }
]);
} else if (event.type === 'tool_call') {
setMessages(prev => [...prev, {
role: 'system',
content: `🔧 调用工具:${event.tool}`
}]);
}
}
setIsLoading(false);
}
return (
<div className="chat">
{messages.map((msg, i) => (
<div key={i} className={`message ${msg.role}`}>
{msg.content}
</div>
))}
</div>
);
}
后端 WebSocket 流式处理
from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.websocket("/ws/agent-stream")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收用户消息
user_message = await websocket.receive_text()
# 流式处理代理响应
for event in agent.stream(
{"messages": [{"role": "user", "content": user_message}]},
stream_mode="updates",
subgraphs=True,
):
# 转换为 JSON 并发送到客户端
response = convert_event_to_json(event)
await websocket.send_json(response)
except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
finally:
await websocket.close()
def convert_event_to_json(event):
"""将内部事件转换为 JSON"""
if isinstance(event, dict):
return event
# 处理其他类型...
return {"type": "unknown", "data": str(event)}
生产级流式处理最佳实践
1. 回压管理
class StreamingBuffer:
"""管理流式处理的缓冲区,防止内存溢出"""
def __init__(self, max_buffer_size: int = 100):
self.buffer = []
self.max_size = max_buffer_size
def add_event(self, event):
self.buffer.append(event)
# 当缓冲区满时触发写入
if len(self.buffer) >= self.max_size:
self.flush()
def flush(self):
"""将缓冲区事件写入存储"""
for event in self.buffer:
store_event(event)
self.buffer.clear()
2. 错误恢复
def stream_with_retry(agent, messages, max_retries=3):
"""带重试的流式处理"""
for attempt in range(max_retries):
try:
for event in agent.stream(
{"messages": messages},
stream_mode="updates",
):
yield event
# 成功完成
break
except Exception as e:
if attempt < max_retries - 1:
print(f"流式处理失败(尝试 {attempt + 1}/{max_retries}),重试...")
time.sleep(2 ** attempt) # 指数退避
else:
raise
3. 监控和日志
import logging
import time
logger = logging.getLogger(__name__)
def stream_with_monitoring(agent, messages):
"""带监控指标的流式处理"""
start_time = time.time()
event_count = 0
tool_calls = 0
try:
for event in agent.stream(
{"messages": messages},
stream_mode="updates",
):
event_count += 1
if event.get("type") == "tool_call":
tool_calls += 1
yield event
finally:
duration = time.time() - start_time
logger.info(
f"流式处理完成 - "
f"事件数:{event_count}, "
f"工具调用:{tool_calls}, "
f"耗时:{duration:.2f}s"
)
性能优化
1. 选择合适的流式模式
# ❌ 不必要的详细信息
for event in agent.stream(..., stream_mode="debug"):
pass
# ✅ 只获取必要的更新
for event in agent.stream(..., stream_mode="updates"):
pass
2. 事件过滤
def stream_filtered(agent, messages, filter_func):
"""只流式处理满足条件的事件"""
for event in agent.stream(messages, stream_mode="updates"):
if filter_func(event):
yield event
# 使用
def only_tool_events(event):
return event.get("type") in ("tool_call", "tool_result")
for event in stream_filtered(agent, messages, only_tool_events):
print(f"工具事件:{event}")
3. 流式处理聚合
def aggregate_stream_events(agent, messages, window_size=10):
"""批量处理流式事件以减少 I/O"""
buffer = []
for event in agent.stream(messages, stream_mode="updates"):
buffer.append(event)
if len(buffer) >= window_size:
yield buffer # 批量返回
buffer = []
if buffer:
yield buffer # 返回剩余事件