生产部署指南
部署架构
1. 典型的生产架构
FastAPI集成
1. 基本应用
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain.agents import create_agent
app = FastAPI()
# 创建代理
agent = create_agent(
model="anthropic:claude-sonnet-4",
tools=[get_weather, search_web]
)
class Query(BaseModel):
user_id: str
question: str
@app.post("/query")
async def query_agent(query: Query):
"""处理用户查询"""
try:
result = agent.invoke({
"messages": [{"role": "user", "content": query.question}]
})
return {
"status": "success",
"answer": result["messages"][-1]["content"],
"user_id": query.user_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
2. 异步处理
from fastapi import BackgroundTasks
import asyncio
@app.post("/query-async")
async def query_async(query: Query, background_tasks: BackgroundTasks):
"""异步处理查询"""
task_id = str(uuid.uuid4())
# 在后台执行
background_tasks.add_task(process_query, task_id, query.question)
return {"task_id": task_id, "status": "processing"}
@app.get("/result/{task_id}")
async def get_result(task_id: str):
"""获取异步处理结果"""
result = cache.get(f"result:{task_id}")
if result is None:
return {"status": "processing"}
return {"status": "completed", "result": result}
async def process_query(task_id: str, question: str):
"""后台处理查询"""
result = agent.invoke({
"messages": [{"role": "user", "content": question}]
})
# 存储结果
cache.set(f"result:{task_id}", result)
环境配置
1. 使用环境变量
# config.py
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
# API配置
anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY")
openai_api_key: str = os.getenv("OPENAI_API_KEY")
# 数据库
database_url: str = os.getenv("DATABASE_URL", "sqlite:///app.db")
redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379")
# 应用设置
debug: bool = os.getenv("DEBUG", "false").lower() == "true"
log_level: str = os.getenv("LOG_LEVEL", "INFO")
max_workers: int = int(os.getenv("MAX_WORKERS", "4"))
class Config:
env_file = ".env"
settings = Settings()
2. Docker部署
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用
COPY . .
# 暴露端口
EXPOSE 8000
# 运行应用
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
缓存策略
from functools import wraps
import hashlib
import json
class CacheManager:
def __init__(self, ttl: int = 3600):
self.cache = {} # 实际应用中使用Redis
self.ttl = ttl
def cache_key(self, *args, **kwargs):
"""生成缓存键"""
key_data = json.dumps({
"args": args,
"kwargs": kwargs
}, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str):
"""获取缓存"""
return self.cache.get(key)
def set(self, key: str, value, ttl: int = None):
"""设置缓存"""
self.cache[key] = value
# 实际应用中设置过期时间
cache_manager = CacheManager()
def cached_agent_call(ttl: int = 300):
"""缓存代理调用"""
def decorator(func):
@wraps(func)
def wrapper(query: str, *args, **kwargs):
cache_key = cache_manager.cache_key(query, *args, **kwargs)
# 检查缓存
cached_result = cache_manager.get(cache_key)
if cached_result:
return cached_result
# 执行函数
result = func(query, *args, **kwargs)
# 存储缓存
cache_manager.set(cache_key, result, ttl)
return result
return wrapper
return decorator
@cached_agent_call(ttl=300)
def query_agent(question: str):
return agent.invoke({
"messages": [{"role": "user", "content": question}]
})
速率限制
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.requests = defaultdict(list)
async def dispatch(self, request: Request, call_next):
client_id = request.client.host
current_time = time.time()
# 清理超过1分钟的请求记录
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if current_time - req_time < 60
]
# 检查是否超过限制
if len(self.requests[client_id]) >= self.requests_per_minute:
return JSONResponse(
{"error": "Rate limit exceeded"},
status_code=429
)
self.requests[client_id].append(current_time)
return await call_next(request)
app.add_middleware(RateLimitMiddleware, requests_per_minute=60)
错误处理与恢复
1. 优雅降级
def get_agent_with_fallback():
"""在主模型不可用时使用备择模型"""
try:
return create_agent(
model="anthropic:claude-sonnet-4",
tools=[get_weather]
)
except Exception as e:
logger.warning(f"Claude不可用: {e}, 使用备择模型")
return create_agent(
model="openai:gpt-3.5-turbo",
tools=[get_weather]
)
def query_with_fallback(question: str):
"""使用备选代理查询"""
try:
agent = get_agent_with_fallback()
return agent.invoke({
"messages": [{"role": "user", "content": question}]
})
except Exception as e:
logger.error(f"查询失败: {e}")
return {
"status": "error",
"message": "服务暂时不可用,请稍后重试"
}