生产部署
从原型到生产
将 DeepAgents 代理从本地开发环境部署到生产环境涉及多个关键考虑事项。本章节涵盖了所有必要的步骤和最佳实践。
部署选项
选项 1:LangSmith Deployments(推荐)
LangSmith Deployments 是部署 DeepAgents 最简单和最推荐的方式。它提供:
- 自动基础设施:无需手动设置服务器
- 内置特性:认证、webhooks、定时任务、监控
- 可观测性:完整的执行追踪和日志
- 可扩展性:自动扩展以处理流量突增
- 多交互方式:REST API、WebSocket、MCP 服务器
选项 2:自托管(LangGraph 平台)
用于需要更多控制或特殊要求的情况:
# 部署 LangGraph Server
langgraph up --env .env
选项 3:容器化部署(Docker)
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0"]
生产环境配置
langgraph.json
这是部署的核心配置文件:
{
"dependencies": ["."],
"graphs": {
"agent": "./src/agent.py:agent",
"researcher": "./src/researcher.py:researcher_agent",
"analyzer": "./src/analyzer.py:analyzer_agent"
},
"env": ".env.production",
"python_version": "3.11",
"dockerfile_lines": [
"RUN pip install --upgrade pip"
]
}
环境变量管理
# .env.production 示例
ANTHROPIC_API_KEY=sk-ant-...
LANGSMITH_API_KEY=ls-...
DATABASE_URL=postgresql://user:pass@prod-db:5432/agent_db
REDIS_URL=redis://prod-redis:6379
LOG_LEVEL=INFO
ENVIRONMENT=production
安全最佳实践:
- 使用 LangSmith 的"工作区密钥"来管理敏感信息
- 启用密钥轮换
- 使用 TLS/SSL 加密所有通信
多租户配置
# multi_tenant_agent.py
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
def create_tenant_agent(tenant_id: str, store):
"""为租户创建隔离的代理"""
return create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
tools=[search_tool, analyze_tool],
backend=CompositeBackend(
default=StateBackend(),
routes={
"/memories/": StoreBackend(
store=store,
namespace=lambda ctx: (tenant_id,), # 按租户隔离
),
"/uploads/": StoreBackend(
store=store,
namespace=lambda ctx: (tenant_id, "uploads"),
),
}
),
store=store,
)
# langgraph.json
{
"graphs": {
"tenant_agent": "./multi_tenant_agent.py:create_tenant_agent"
}
}
连接和身份验证
LangSmith 部署中的身份验证
from langchain_auth import Client
auth_client = Client()
# 在工具中:获取认证用户的令牌
@tool
async def github_action(runtime: ToolRuntime):
"""代表用户在 GitHub 上执行操作"""
auth_result = await auth_client.authenticate(
provider="github",
scopes=["repo", "read:org"],
user_id=runtime.server_info.user.identity,
)
# 使用 auth_result.token
工作区密钥管理
import os
# 在 LangSmith 部署中,敏感值存储为工作区密钥
def get_secret(key_name: str) -> str:
"""从 LangSmith 密钥库获取密钥"""
return os.environ.get(key_name)
api_key = get_secret("EXTERNAL_API_KEY")
状态持久化和检查点
启用持久化
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
# 内存检查点(仅用于开发)
checkpointer = MemorySaver()
# 生产级别:PostgreSQL 检查点
checkpointer = PostgresSaver(
"postgresql://user:pass@db:5432/checkpoints"
)
# 配置代理
from langgraph.graph import StateGraph
graph = StateGraph(...)
graph.compile(checkpointer=checkpointer)
线程和状态管理
from langchain_core.utils.uuid import uuid7
class SessionManager:
@staticmethod
def create_session() -> dict:
"""创建新的会话配置"""
return {
"configurable": {
"thread_id": str(uuid7()),
"checkpoint_ns": "default",
}
}
@staticmethod
def get_session_history(thread_id: str, store):
"""获取会话的完整历史"""
# 从 Store 中恢复状态
pass
监控和日志
结构化日志
import logging
import json
from pythonjsonlogger import jsonlogger
# 配置 JSON 日志格式(便于解析和监控)
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
# 记录关键事件
logger.info(
"agent_invoked",
extra={
"user_id": user_id,
"thread_id": thread_id,
"model": model_name,
"tool_count": len(tools),
}
)
LangSmith 集成
import os
# 在生产环境中启用 LangSmith 追踪
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "production-agents"
# 所有调用都被自动追踪和监控
指标和告警
from prometheus_client import Counter, Histogram, start_http_server
import time
# 定义指标
invocation_counter = Counter(
'agent_invocations_total',
'Total agent invocations',
['model', 'status']
)
execution_time = Histogram(
'agent_execution_seconds',
'Agent execution time',
['model']
)
error_counter = Counter(
'agent_errors_total',
'Total agent errors',
['model', 'error_type']
)
# 在 FastAPI 应用中启动 Prometheus
@app.on_event("startup")
def startup():
start_http_server(8000)
# 在代理调用中使用
def invoke_with_metrics(agent, messages, model: str):
start_time = time.time()
try:
result = agent.invoke({"messages": messages})
invocation_counter.labels(model=model, status="success").inc()
return result
except Exception as e:
error_counter.labels(model=model, error_type=type(e).__name__).inc()
raise
finally:
duration = time.time() - start_time
execution_time.labels(model=model).observe(duration)
API 暴露
FastAPI 集成
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="DeepAgents API", version="1.0.0")
class MessageRequest(BaseModel):
messages: list
thread_id: str = None
class MessageResponse(BaseModel):
response: str
thread_id: str
@app.post("/api/agent/invoke", response_model=MessageResponse)
async def invoke_agent(request: MessageRequest):
"""调用代理"""
config = None
if request.thread_id:
config = {
"configurable": {"thread_id": request.thread_id}
}
try:
result = agent.invoke(
{"messages": request.messages},
config=config,
)
return MessageResponse(
response=result["messages"][-1].content,
thread_id=config["configurable"]["thread_id"] if config else None,
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e),
)
@app.websocket("/ws/agent/stream")
async def stream_agent(websocket: WebSocket):
"""流式代理响应"""
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
for event in agent.stream(
{"messages": [{"role": "user", "content": message}]},
stream_mode="updates",
):
await websocket.send_json(serializable_event(event))
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
finally:
await websocket.close()
扩展性考虑
水平扩展
# docker-compose.yml 示例
version: '3'
services:
agent-1:
image: deepagents:latest
environment:
- AGENT_ID=agent-1
- WORKER_ID=1
ports:
- "8001:8000"
agent-2:
image: deepagents:latest
environment:
- AGENT_ID=agent-2
- WORKER_ID=2
ports:
- "8002:8000"
load-balancer:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
速率限制
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/agent/invoke")
@limiter.limit("10/minute") # 每分钟最多 10 个请求
async def invoke_agent(request: MessageRequest):
pass
灾难恢复
备份
import shutil
from datetime import datetime
def backup_memories(store):
"""备份代理记忆"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"/backups/memories_{timestamp}.zip"
# 导出所有记忆数据
shutil.make_archive(backup_path, 'zip', '/memories/')
print(f"备份完成:{backup_path}")
# 定期运行备份
import schedule
schedule.every().day.at("02:00").do(backup_memories, store)
故障转移
class ResilientAgent:
def __init__(self, primary_agent, backup_agent=None):
self.primary = primary_agent
self.backup = backup_agent
def invoke(self, messages, config=None):
"""带故障转移的调用"""
try:
return self.primary.invoke({"messages": messages}, config=config)
except Exception as e:
logging.error(f"主代理失败:{e}")
if self.backup:
logging.info("尝试备用代理...")
return self.backup.invoke({"messages": messages}, config=config)
raise
安全性
API 密钥轮换
import hashlib
from datetime import datetime, timedelta
class APIKeyManager:
def __init__(self, key_lifetime_days=90):
self.key_lifetime = timedelta(days=key_lifetime_days)
def needs_rotation(self, last_rotated: datetime) -> bool:
"""检查密钥是否需要轮换"""
return datetime.now() - last_rotated > self.key_lifetime
def rotate_key(self, old_key: str) -> str:
"""生成新密钥"""
new_key = hashlib.sha256(
f"{old_key}{datetime.now().isoformat()}".encode()
).hexdigest()
return new_key
速率限制和 DDoS 防护
from slowapi.middleware import SlowAPIMiddleware
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
# 配置 IP 黑名单
BLOCKED_IPS = {"192.168.1.100"}
@app.middleware("http")
async def check_blocked_ips(request, call_next):
client_ip = request.client.host
if client_ip in BLOCKED_IPS:
return {"status_code": 403, "detail": "Access denied"}
return await call_next(request)
最佳实践清单
- 使用 HTTPS/TLS 加密所有通信
- 实施身份验证和授权
- 启用审计日志
- 配置 HITL 工作流用于敏感操作
- 设置监控和告警
- 实施速率限制
- 定期备份和灾难恢复测试
- 使用密钥管理服务(KMS)
- 定期安全审计和渗透测试
- 文档化所有配置和变更
- 实施零停机部署策略
- 监控模型成本和性能
常见问题排查
生产环境中的性能问题
症状:响应缓慢,特别是在高并发时
诊断:
import time
def diagnose_performance():
# 检查模型 API 延迟
start = time.time()
model.invoke(...)
model_time = time.time() - start
# 检查工具执行时间
# 检查数据库查询时间
# 检查网络延迟
解决方案:
- 增加工作进程数
- 启用缓存
- 优化工具实现
- 考虑使用更快的模型
成本超支
症状:API 费用高于预期
诊断:
# 分析 token 使用
from langsmith import get_runs
runs = get_runs(project_name="production")
total_tokens = sum(r.usage_metadata.total_tokens for r in runs)
解决方案:
- 使用更小的模型
- 实施 token 缓存
- 优化提示词
- 监控代理行为
下一步
- 根据需要设置监控和告警
- 启用 LangSmith 审计日志
- 配置自动扩展政策
- 创建灾难 恢复流程