跳到主要内容

生产部署

从原型到生产

将 DeepAgents 代理从本地开发环境部署到生产环境涉及多个关键考虑事项。本章节涵盖了所有必要的步骤和最佳实践。

部署选项

选项 1:LangSmith Deployments(推荐)

LangSmith Deployments 是部署 DeepAgents 最简单和最推荐的方式。它提供:

  • 自动基础设施:无需手动设置服务器
  • 内置特性:认证、webhooks、定时任务、监控
  • 可观测性:完整的执行追踪和日志
  • 可扩展性:自动扩展以处理流量突增
  • 多交互方式:REST API、WebSocket、MCP 服务器

选项 2:自托管(LangGraph 平台)

用于需要更多控制或特殊要求的情况:

# 部署 LangGraph Server
langgraph up --env .env

选项 3:容器化部署(Docker)

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0"]

生产环境配置

langgraph.json

这是部署的核心配置文件:

{
"dependencies": ["."],
"graphs": {
"agent": "./src/agent.py:agent",
"researcher": "./src/researcher.py:researcher_agent",
"analyzer": "./src/analyzer.py:analyzer_agent"
},
"env": ".env.production",
"python_version": "3.11",
"dockerfile_lines": [
"RUN pip install --upgrade pip"
]
}

环境变量管理

# .env.production 示例
ANTHROPIC_API_KEY=sk-ant-...
LANGSMITH_API_KEY=ls-...
DATABASE_URL=postgresql://user:pass@prod-db:5432/agent_db
REDIS_URL=redis://prod-redis:6379
LOG_LEVEL=INFO
ENVIRONMENT=production

安全最佳实践

  • 使用 LangSmith 的"工作区密钥"来管理敏感信息
  • 启用密钥轮换
  • 使用 TLS/SSL 加密所有通信

多租户配置

# multi_tenant_agent.py

from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend

def create_tenant_agent(tenant_id: str, store):
"""为租户创建隔离的代理"""

return create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
tools=[search_tool, analyze_tool],
backend=CompositeBackend(
default=StateBackend(),
routes={
"/memories/": StoreBackend(
store=store,
namespace=lambda ctx: (tenant_id,), # 按租户隔离
),
"/uploads/": StoreBackend(
store=store,
namespace=lambda ctx: (tenant_id, "uploads"),
),
}
),
store=store,
)

# langgraph.json
{
"graphs": {
"tenant_agent": "./multi_tenant_agent.py:create_tenant_agent"
}
}

连接和身份验证

LangSmith 部署中的身份验证

from langchain_auth import Client

auth_client = Client()

# 在工具中:获取认证用户的令牌
@tool
async def github_action(runtime: ToolRuntime):
"""代表用户在 GitHub 上执行操作"""

auth_result = await auth_client.authenticate(
provider="github",
scopes=["repo", "read:org"],
user_id=runtime.server_info.user.identity,
)

# 使用 auth_result.token

工作区密钥管理

import os

# 在 LangSmith 部署中,敏感值存储为工作区密钥
def get_secret(key_name: str) -> str:
"""从 LangSmith 密钥库获取密钥"""
return os.environ.get(key_name)

api_key = get_secret("EXTERNAL_API_KEY")

状态持久化和检查点

启用持久化

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver

# 内存检查点(仅用于开发)
checkpointer = MemorySaver()

# 生产级别:PostgreSQL 检查点
checkpointer = PostgresSaver(
"postgresql://user:pass@db:5432/checkpoints"
)

# 配置代理
from langgraph.graph import StateGraph

graph = StateGraph(...)
graph.compile(checkpointer=checkpointer)

线程和状态管理

from langchain_core.utils.uuid import uuid7

class SessionManager:
@staticmethod
def create_session() -> dict:
"""创建新的会话配置"""
return {
"configurable": {
"thread_id": str(uuid7()),
"checkpoint_ns": "default",
}
}

@staticmethod
def get_session_history(thread_id: str, store):
"""获取会话的完整历史"""
# 从 Store 中恢复状态
pass

监控和日志

结构化日志

import logging
import json
from pythonjsonlogger import jsonlogger

# 配置 JSON 日志格式(便于解析和监控)
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)

# 记录关键事件
logger.info(
"agent_invoked",
extra={
"user_id": user_id,
"thread_id": thread_id,
"model": model_name,
"tool_count": len(tools),
}
)

LangSmith 集成

import os

# 在生产环境中启用 LangSmith 追踪
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "production-agents"

# 所有调用都被自动追踪和监控

指标和告警

from prometheus_client import Counter, Histogram, start_http_server
import time

# 定义指标
invocation_counter = Counter(
'agent_invocations_total',
'Total agent invocations',
['model', 'status']
)

execution_time = Histogram(
'agent_execution_seconds',
'Agent execution time',
['model']
)

error_counter = Counter(
'agent_errors_total',
'Total agent errors',
['model', 'error_type']
)

# 在 FastAPI 应用中启动 Prometheus
@app.on_event("startup")
def startup():
start_http_server(8000)

# 在代理调用中使用
def invoke_with_metrics(agent, messages, model: str):
start_time = time.time()

try:
result = agent.invoke({"messages": messages})
invocation_counter.labels(model=model, status="success").inc()
return result

except Exception as e:
error_counter.labels(model=model, error_type=type(e).__name__).inc()
raise

finally:
duration = time.time() - start_time
execution_time.labels(model=model).observe(duration)

API 暴露

FastAPI 集成

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI(title="DeepAgents API", version="1.0.0")

class MessageRequest(BaseModel):
messages: list
thread_id: str = None

class MessageResponse(BaseModel):
response: str
thread_id: str

@app.post("/api/agent/invoke", response_model=MessageResponse)
async def invoke_agent(request: MessageRequest):
"""调用代理"""

config = None
if request.thread_id:
config = {
"configurable": {"thread_id": request.thread_id}
}

try:
result = agent.invoke(
{"messages": request.messages},
config=config,
)

return MessageResponse(
response=result["messages"][-1].content,
thread_id=config["configurable"]["thread_id"] if config else None,
)

except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e),
)

@app.websocket("/ws/agent/stream")
async def stream_agent(websocket: WebSocket):
"""流式代理响应"""

await websocket.accept()

try:
while True:
message = await websocket.receive_text()

for event in agent.stream(
{"messages": [{"role": "user", "content": message}]},
stream_mode="updates",
):
await websocket.send_json(serializable_event(event))

except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})

finally:
await websocket.close()

扩展性考虑

水平扩展

# docker-compose.yml 示例
version: '3'

services:
agent-1:
image: deepagents:latest
environment:
- AGENT_ID=agent-1
- WORKER_ID=1
ports:
- "8001:8000"

agent-2:
image: deepagents:latest
environment:
- AGENT_ID=agent-2
- WORKER_ID=2
ports:
- "8002:8000"

load-balancer:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf

速率限制

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/agent/invoke")
@limiter.limit("10/minute") # 每分钟最多 10 个请求
async def invoke_agent(request: MessageRequest):
pass

灾难恢复

备份

import shutil
from datetime import datetime

def backup_memories(store):
"""备份代理记忆"""

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"/backups/memories_{timestamp}.zip"

# 导出所有记忆数据
shutil.make_archive(backup_path, 'zip', '/memories/')

print(f"备份完成:{backup_path}")

# 定期运行备份
import schedule

schedule.every().day.at("02:00").do(backup_memories, store)

故障转移

class ResilientAgent:
def __init__(self, primary_agent, backup_agent=None):
self.primary = primary_agent
self.backup = backup_agent

def invoke(self, messages, config=None):
"""带故障转移的调用"""

try:
return self.primary.invoke({"messages": messages}, config=config)

except Exception as e:
logging.error(f"主代理失败:{e}")

if self.backup:
logging.info("尝试备用代理...")
return self.backup.invoke({"messages": messages}, config=config)

raise

安全性

API 密钥轮换

import hashlib
from datetime import datetime, timedelta

class APIKeyManager:
def __init__(self, key_lifetime_days=90):
self.key_lifetime = timedelta(days=key_lifetime_days)

def needs_rotation(self, last_rotated: datetime) -> bool:
"""检查密钥是否需要轮换"""
return datetime.now() - last_rotated > self.key_lifetime

def rotate_key(self, old_key: str) -> str:
"""生成新密钥"""
new_key = hashlib.sha256(
f"{old_key}{datetime.now().isoformat()}".encode()
).hexdigest()
return new_key

速率限制和 DDoS 防护

from slowapi.middleware import SlowAPIMiddleware

app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)

# 配置 IP 黑名单
BLOCKED_IPS = {"192.168.1.100"}

@app.middleware("http")
async def check_blocked_ips(request, call_next):
client_ip = request.client.host

if client_ip in BLOCKED_IPS:
return {"status_code": 403, "detail": "Access denied"}

return await call_next(request)

最佳实践清单

  • 使用 HTTPS/TLS 加密所有通信
  • 实施身份验证和授权
  • 启用审计日志
  • 配置 HITL 工作流用于敏感操作
  • 设置监控和告警
  • 实施速率限制
  • 定期备份和灾难恢复测试
  • 使用密钥管理服务(KMS)
  • 定期安全审计和渗透测试
  • 文档化所有配置和变更
  • 实施零停机部署策略
  • 监控模型成本和性能

常见问题排查

生产环境中的性能问题

症状:响应缓慢,特别是在高并发时

诊断

import time

def diagnose_performance():
# 检查模型 API 延迟
start = time.time()
model.invoke(...)
model_time = time.time() - start

# 检查工具执行时间
# 检查数据库查询时间
# 检查网络延迟

解决方案

  • 增加工作进程数
  • 启用缓存
  • 优化工具实现
  • 考虑使用更快的模型

成本超支

症状:API 费用高于预期

诊断

# 分析 token 使用
from langsmith import get_runs

runs = get_runs(project_name="production")
total_tokens = sum(r.usage_metadata.total_tokens for r in runs)

解决方案

  • 使用更小的模型
  • 实施 token 缓存
  • 优化提示词
  • 监控代理行为

下一步

  • 根据需要设置监控和告警
  • 启用 LangSmith 审计日志
  • 配置自动扩展政策
  • 创建灾难恢复流程