跳到主要内容

流式处理

流式处理的基本概念

流式处理允许应用在代理执行时实时获取更新,而无需等待整个操作完成。这对于改进用户体验、提供即时反馈以及处理长时间运行的任务至关重要。

为什么需要流式处理

场景无流式处理有流式处理
用户体验用户等待数十秒,无反馈实时看到进度
长任务处理响应超时风险可逐步接收结果
代理编排主代理阻塞,子代理无法并行主代理可继续,子代理进度可见
前端集成必须全部加载再显示可实时更新 UI

流式 API 基础

基本用法

from deepagents import create_deep_agent

agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
tools=[search_tool],
)

# 使用 stream 方法而不是 invoke
for event in agent.stream(
{"messages": [{"role": "user", "content": "研究 AI 的最新进展"}]},
stream_mode="updates",
):
print(f"事件:{event}")

流式模式

DeepAgents 支持多种流式模式:

1. Updates 模式

返回代理状态的更新:

for event in agent.stream(
{"messages": [...]},
stream_mode="updates",
):
# event 包含代理执行过程中的状态变化
print(event)
# 输出示例:
# {'type': 'message', 'content': 'I need to search...'}
# {'type': 'tool_call', 'tool': 'search', 'args': {...}}
# {'type': 'tool_result', 'result': '...'}

2. Values 模式

返回完整的最终状态:

for state in agent.stream(
{"messages": [...]},
stream_mode="values",
):
# 在流的末尾获得完整的最终状态
print(state)

3. Debug 模式

返回详细的调试信息:

for debug_info in agent.stream(
{"messages": [...]},
stream_mode="debug",
):
# 包含完整的执行追踪信息
print(f"[DEBUG] {debug_info}")

令牌流式处理

LLM Token 流

实时获取模型生成的token:

from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
stream_usage=True, # 启用 token 使用追踪
)

agent = create_deep_agent(model=model)

# 流式处理输出
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "解释量子纠缠"}]},
stream_mode="updates",
):
if "messages" in chunk:
message = chunk["messages"][-1]
if hasattr(message, "content"):
print(message.content, end="", flush=True)

代币计数

from langchain_core.callbacks import StreamingStdOutCallbackHandler

# 使用回调跟踪 token 使用
callbacks = [StreamingStdOutCallbackHandler()]

agent = create_deep_agent(
model=model,
# stream_config...
)

result = agent.invoke(
{"messages": [...]},
# callbacks=callbacks, # 如果支持
)

工具调用流式处理

追踪工具执行

def stream_with_tool_tracking(agent, user_input):
"""显示工具调用进度"""

for event in agent.stream(
{"messages": [{"role": "user", "content": user_input}]},
stream_mode="updates",
):
# 检查工具调用事件
if event.get("type") == "tool_call":
tool_name = event.get("tool", "unknown")
print(f"🔧 正在调用工具:{tool_name}")

# 检查工具结果事件
elif event.get("type") == "tool_result":
result = event.get("result", "")[:100] # 显示前 100 个字符
print(f"✓ 工具结果:{result}...")

# 显示文本消息
elif event.get("type") == "message":
print(f"💭 {event.get('content', '')}")

# 使用
stream_with_tool_tracking(agent, "为我总结最新的 AI 新闻")

子代理流式处理

这是 DeepAgents 的高级特性,允许实时追踪子代理的执行:

启用子代理流式处理

from deepagents import create_deep_agent, AsyncSubAgent

agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=[
{
"name": "researcher",
"description": "执行研究",
"system_prompt": "你是研究员",
"tools": [search_tool],
},
{
"name": "writer",
"description": "撰写报告",
"system_prompt": "你是作家",
"tools": [save_file],
}
],
)

# 启用子代理流式处理
for event in agent.stream(
{"messages": [{"role": "user", "content": "研究并撰写关于 AI 的报告"}]},
stream_mode="updates",
subgraphs=True, # 关键参数:启用子代理事件
):
# event 现在包含子代理的事件
namespace = event.get("ns", ())

if namespace:
# 这是子代理事件
subagent_name = namespace[0] if namespace else "unknown"
print(f"[{subagent_name}] {event}")
else:
# 这是主代理事件
print(f"[main] {event}")

事件命名空间

# 事件的命名空间表示其来源
ns = () # 主代理
ns = ("tools:123",) # 由主代理启动的子代理(任务ID:123)
ns = ("tools:123", "model:456") # 子代理中的模型调用

# 使用命名空间路由事件
def route_event(event):
ns = event.get("ns", ())

if not ns:
# 主代理事件
handle_main_agent_event(event)
elif len(ns) == 1:
# 子代理的顶级事件
subagent = ns[0]
handle_subagent_event(subagent, event)
else:
# 子代理内部的详细事件
handle_detailed_event(ns, event)

实时 UI 集成

前端示例 (JavaScript/React)

// 假设后端提供了 WebSocket 流式端点

async function* streamAgentResponse(userMessage) {
const response = await fetch('/api/agent/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: userMessage }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;
yield JSON.parse(decoder.decode(value));
}
}

// 在 React 组件中使用
function AgentChat() {
const [messages, setMessages] = useState([]);
const [isLoading, setIsLoading] = useState(false);

async function handleUserInput(userMessage) {
setIsLoading(true);
let assistantMessage = '';

for await (const event of streamAgentResponse(userMessage)) {
if (event.type === 'token') {
assistantMessage += event.content;
setMessages(prev => [
...prev.slice(0, -1),
{ role: 'assistant', content: assistantMessage }
]);
} else if (event.type === 'tool_call') {
setMessages(prev => [...prev, {
role: 'system',
content: `🔧 调用工具:${event.tool}`
}]);
}
}

setIsLoading(false);
}

return (
<div className="chat">
{messages.map((msg, i) => (
<div key={i} className={`message ${msg.role}`}>
{msg.content}
</div>
))}
</div>
);
}

后端 WebSocket 流式处理

from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.websocket("/ws/agent-stream")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()

try:
while True:
# 接收用户消息
user_message = await websocket.receive_text()

# 流式处理代理响应
for event in agent.stream(
{"messages": [{"role": "user", "content": user_message}]},
stream_mode="updates",
subgraphs=True,
):
# 转换为 JSON 并发送到客户端
response = convert_event_to_json(event)
await websocket.send_json(response)

except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
finally:
await websocket.close()

def convert_event_to_json(event):
"""将内部事件转换为 JSON"""
if isinstance(event, dict):
return event
# 处理其他类型...
return {"type": "unknown", "data": str(event)}

生产级流式处理最佳实践

1. 回压管理

class StreamingBuffer:
"""管理流式处理的缓冲区,防止内存溢出"""

def __init__(self, max_buffer_size: int = 100):
self.buffer = []
self.max_size = max_buffer_size

def add_event(self, event):
self.buffer.append(event)

# 当缓冲区满时触发写入
if len(self.buffer) >= self.max_size:
self.flush()

def flush(self):
"""将缓冲区事件写入存储"""
for event in self.buffer:
store_event(event)
self.buffer.clear()

2. 错误恢复

def stream_with_retry(agent, messages, max_retries=3):
"""带重试的流式处理"""

for attempt in range(max_retries):
try:
for event in agent.stream(
{"messages": messages},
stream_mode="updates",
):
yield event

# 成功完成
break

except Exception as e:
if attempt < max_retries - 1:
print(f"流式处理失败(尝试 {attempt + 1}/{max_retries}),重试...")
time.sleep(2 ** attempt) # 指数退避
else:
raise

3. 监控和日志

import logging
import time

logger = logging.getLogger(__name__)

def stream_with_monitoring(agent, messages):
"""带监控指标的流式处理"""

start_time = time.time()
event_count = 0
tool_calls = 0

try:
for event in agent.stream(
{"messages": messages},
stream_mode="updates",
):
event_count += 1

if event.get("type") == "tool_call":
tool_calls += 1

yield event

finally:
duration = time.time() - start_time
logger.info(
f"流式处理完成 - "
f"事件数:{event_count}, "
f"工具调用:{tool_calls}, "
f"耗时:{duration:.2f}s"
)

性能优化

1. 选择合适的流式模式

# ❌ 不必要的详细信息
for event in agent.stream(..., stream_mode="debug"):
pass

# ✅ 只获取必要的更新
for event in agent.stream(..., stream_mode="updates"):
pass

2. 事件过滤

def stream_filtered(agent, messages, filter_func):
"""只流式处理满足条件的事件"""

for event in agent.stream(messages, stream_mode="updates"):
if filter_func(event):
yield event

# 使用
def only_tool_events(event):
return event.get("type") in ("tool_call", "tool_result")

for event in stream_filtered(agent, messages, only_tool_events):
print(f"工具事件:{event}")

3. 流式处理聚合

def aggregate_stream_events(agent, messages, window_size=10):
"""批量处理流式事件以减少 I/O"""

buffer = []

for event in agent.stream(messages, stream_mode="updates"):
buffer.append(event)

if len(buffer) >= window_size:
yield buffer # 批量返回
buffer = []

if buffer:
yield buffer # 返回剩余事件

下一步