跳到主要内容

生产部署指南

部署架构

1. 典型的生产架构

FastAPI集成

1. 基本应用

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain.agents import create_agent

app = FastAPI()

# 创建代理
agent = create_agent(
model="anthropic:claude-sonnet-4",
tools=[get_weather, search_web]
)

class Query(BaseModel):
user_id: str
question: str

@app.post("/query")
async def query_agent(query: Query):
"""处理用户查询"""
try:
result = agent.invoke({
"messages": [{"role": "user", "content": query.question}]
})

return {
"status": "success",
"answer": result["messages"][-1]["content"],
"user_id": query.user_id
}

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

2. 异步处理

from fastapi import BackgroundTasks
import asyncio

@app.post("/query-async")
async def query_async(query: Query, background_tasks: BackgroundTasks):
"""异步处理查询"""

task_id = str(uuid.uuid4())

# 在后台执行
background_tasks.add_task(process_query, task_id, query.question)

return {"task_id": task_id, "status": "processing"}

@app.get("/result/{task_id}")
async def get_result(task_id: str):
"""获取异步处理结果"""
result = cache.get(f"result:{task_id}")

if result is None:
return {"status": "processing"}

return {"status": "completed", "result": result}

async def process_query(task_id: str, question: str):
"""后台处理查询"""
result = agent.invoke({
"messages": [{"role": "user", "content": question}]
})

# 存储结果
cache.set(f"result:{task_id}", result)

环境配置

1. 使用环境变量

# config.py
import os
from pydantic import BaseSettings

class Settings(BaseSettings):
# API配置
anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY")
openai_api_key: str = os.getenv("OPENAI_API_KEY")

# 数据库
database_url: str = os.getenv("DATABASE_URL", "sqlite:///app.db")
redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379")

# 应用设置
debug: bool = os.getenv("DEBUG", "false").lower() == "true"
log_level: str = os.getenv("LOG_LEVEL", "INFO")
max_workers: int = int(os.getenv("MAX_WORKERS", "4"))

class Config:
env_file = ".env"

settings = Settings()

2. Docker部署

FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用
COPY . .

# 暴露端口
EXPOSE 8000

# 运行应用
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
app:
build: .
ports:
- "8000:8000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis

redis:
image: redis:7-alpine
ports:
- "6379:6379"

缓存策略

from functools import wraps
import hashlib
import json

class CacheManager:
def __init__(self, ttl: int = 3600):
self.cache = {} # 实际应用中使用Redis
self.ttl = ttl

def cache_key(self, *args, **kwargs):
"""生成缓存键"""
key_data = json.dumps({
"args": args,
"kwargs": kwargs
}, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()

def get(self, key: str):
"""获取缓存"""
return self.cache.get(key)

def set(self, key: str, value, ttl: int = None):
"""设置缓存"""
self.cache[key] = value
# 实际应用中设置过期时间

cache_manager = CacheManager()

def cached_agent_call(ttl: int = 300):
"""缓存代理调用"""
def decorator(func):
@wraps(func)
def wrapper(query: str, *args, **kwargs):
cache_key = cache_manager.cache_key(query, *args, **kwargs)

# 检查缓存
cached_result = cache_manager.get(cache_key)
if cached_result:
return cached_result

# 执行函数
result = func(query, *args, **kwargs)

# 存储缓存
cache_manager.set(cache_key, result, ttl)

return result

return wrapper

return decorator

@cached_agent_call(ttl=300)
def query_agent(question: str):
return agent.invoke({
"messages": [{"role": "user", "content": question}]
})

速率限制

from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict

class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.requests = defaultdict(list)

async def dispatch(self, request: Request, call_next):
client_id = request.client.host
current_time = time.time()

# 清理超过1分钟的请求记录
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if current_time - req_time < 60
]

# 检查是否超过限制
if len(self.requests[client_id]) >= self.requests_per_minute:
return JSONResponse(
{"error": "Rate limit exceeded"},
status_code=429
)

self.requests[client_id].append(current_time)

return await call_next(request)

app.add_middleware(RateLimitMiddleware, requests_per_minute=60)

错误处理与恢复

1. 优雅降级

def get_agent_with_fallback():
"""在主模型不可用时使用备择模型"""

try:
return create_agent(
model="anthropic:claude-sonnet-4",
tools=[get_weather]
)
except Exception as e:
logger.warning(f"Claude不可用: {e}, 使用备择模型")
return create_agent(
model="openai:gpt-3.5-turbo",
tools=[get_weather]
)

def query_with_fallback(question: str):
"""使用备选代理查询"""
try:
agent = get_agent_with_fallback()
return agent.invoke({
"messages": [{"role": "user", "content": question}]
})
except Exception as e:
logger.error(f"查询失败: {e}")
return {
"status": "error",
"message": "服务暂时不可用,请稍后重试"
}

2. 重试机制

from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(Exception)
)
def query_with_retry(agent, question: str):
"""具有重试机制的查询"""
return agent.invoke({
"messages": [{"role": "user", "content": question}]
})

监控和日志

1. 结构化日志

import structlog
from datetime import datetime

structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
)

logger = structlog.get_logger()

# 使用
logger.info("query_received", user_id="user123", question="天气?")
logger.error("query_failed", error="API超时", attempt=2)

2. 指标收集

from prometheus_client import Counter, Histogram, generate_latest

# 定义指标
query_count = Counter('agent_queries_total', 'Total queries')
query_duration = Histogram('agent_query_duration_seconds', 'Query duration')
query_errors = Counter('agent_query_errors_total', 'Total errors')

@app.post("/query")
async def query_endpoint(query: Query):
query_count.inc()

start = time.time()
try:
result = agent.invoke({...})
duration = time.time() - start
query_duration.observe(duration)
return result
except Exception as e:
query_errors.inc()
raise

@app.get("/metrics")
async def metrics():
"""Prometheus指标端点"""
return generate_latest()

常见问题

Q: 如何处理长时间运行的查询? A: 使用异步处理或消息队列,向用户返回任务ID,提供查询结果的接口。

Q: 生产环境如何管理多个LLM提供商的API密钥? A: 使用密钥管理服务(AWS Secrets Manager、HashiCorp Vault等)。

Q: 如何扩展部署以处理高并发? A: 使用负载均衡、水平扩展多个应用实例、使用消息队列。