跳到主要内容

可观测性与调试

LangSmith集成

LangSmith是LangChain官方的监控和评估工具。

1. 基本配置

# 设置环境变量
export LANGCHAIN_TRACING_V2=true
export LANGSMITH_API_KEY="your-api-key"
export LANGSMITH_PROJECT="project-name"

或在Python中配置:

import os
from langsmith import Client

os.environ["LANGSMITH_API_KEY"] = "your-api-key"
os.environ["LANGSMITH_TRACING_V2"] = "true"
os.environ["LANGSMITH_PROJECT"] = "my-project"

client = Client()

2. 追踪代理执行

from langsmith import traceable

@traceable(name="agent_execution")
def run_agent_with_tracing(query: str):
"""所有调用都会被追踪"""
result = agent.invoke({
"messages": [{"role": "user", "content": query}]
})
return result

# 会自动记录到LangSmith
result = run_agent_with_tracing("北京天气如何?")

3. 自定义追踪

from langsmith import trace

def process_with_trace():
"""手动创建追踪"""
with trace(name="complex_operation"):
# 这里的所有LangChain调用都会被追踪
step1_result = perform_step_1()
step2_result = perform_step_2(step1_result)
return step2_result

调试技巧

1. 启用详细日志

import logging

# 为LangChain设置日志
logging.basicConfig(level=logging.DEBUG)

# 特定日志管理器
logger = logging.getLogger("langchain")
logger.setLevel(logging.DEBUG)

# 现在执行代理,看到详细日志
result = agent.invoke({...})

2. 检查中间步骤

def debug_agent_execution(agent, input_data):
"""详细查看代理执行的每一步"""

result = agent.invoke(input_data)

messages = result.get("messages", [])

print("=== 执行步骤 ===")
for i, msg in enumerate(messages):
print(f"\n步骤 {i}:")
print(f" 类型: {type(msg).__name__}")
print(f" 内容: {msg.content[:100]}...")

if hasattr(msg, "tool_calls") and msg.tool_calls:
print(f" 工具调用: {msg.tool_calls}")

# 使用
debug_agent_execution(agent, {"messages": [...]})

3. 性能分析

import time
from functools import wraps

def measure_performance(func):
"""测量函数执行时间"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start

print(f"{func.__name__} 耗时: {elapsed:.2f}秒")
return result

return wrapper

@measure_performance
def slow_operation():
return agent.invoke({...})

slow_operation() # 会打印执行时间

监控代理健康

1. 响应时间监控

from collections import defaultdict
import statistics

class PerformanceMonitor:
def __init__(self):
self.response_times = defaultdict(list)

def record_response_time(self, agent_name: str, duration: float):
"""记录响应时间"""
self.response_times[agent_name].append(duration)

def get_stats(self, agent_name: str):
"""获取统计信息"""
times = self.response_times[agent_name]

if not times:
return None

return {
"min": min(times),
"max": max(times),
"avg": statistics.mean(times),
"median": statistics.median(times),
"stdev": statistics.stdev(times) if len(times) > 1 else 0,
"count": len(times)
}

# 使用
monitor = PerformanceMonitor()

for _ in range(100):
start = time.time()
agent.invoke({...})
duration = time.time() - start
monitor.record_response_time("weather_agent", duration)

stats = monitor.get_stats("weather_agent")
print(f"平均响应时间: {stats['avg']:.2f}s")

2. 错误率监控

class ErrorMonitor:
def __init__(self):
self.total_calls = 0
self.errors = 0
self.error_types = defaultdict(int)

def record_call(self, success: bool, error_type: str = None):
"""记录调用"""
self.total_calls += 1

if not success:
self.errors += 1
if error_type:
self.error_types[error_type] += 1

def get_error_rate(self):
"""获取错误率"""
if self.total_calls == 0:
return 0
return (self.errors / self.total_calls) * 100

def health_status(self):
"""获取健康状态"""
error_rate = self.get_error_rate()

if error_rate < 1:
return "healthy"
elif error_rate < 5:
return "degraded"
else:
return "unhealthy"

# 使用
error_monitor = ErrorMonitor()

try:
result = agent.invoke({...})
error_monitor.record_call(success=True)
except Exception as e:
error_monitor.record_call(success=False, error_type=type(e).__name__)

print(f"错误率: {error_monitor.get_error_rate():.2f}%")
print(f"状态: {error_monitor.health_status()}")

评估代理性能

1. 自动评估

def evaluate_agent(test_cases: list):
"""评估代理在测试用例上的性能"""
results = []

for test_case in test_cases:
try:
output = agent.invoke({
"messages": [{"role": "user", "content": test_case["input"]}]
})

# 获取代理的最终答案
final_answer = output["messages"][-1]["content"]

# 检查是否包含预期的关键词
is_correct = any(
keyword in final_answer
for keyword in test_case["expected_keywords"]
)

results.append({
"input": test_case["input"],
"output": final_answer,
"expected": test_case["expected"],
"passed": is_correct
})

except Exception as e:
results.append({
"input": test_case["input"],
"error": str(e),
"passed": False
})

# 计算准确率
accuracy = sum(1 for r in results if r["passed"]) / len(results)

return {
"accuracy": accuracy,
"total": len(results),
"passed": sum(1 for r in results if r["passed"]),
"details": results
}

# 使用
test_cases = [
{"input": "北京天气如何?", "expected_keywords": ["天气", "北京"]},
{"input": "今天几号?", "expected_keywords": ["号", "日期"]},
]

eval_results = evaluate_agent(test_cases)
print(f"准确率: {eval_results['accuracy']:.2%}")

2. LLM评估

from langchain.evaluation import load_evaluator

# 使用LLM进行评估
evaluator = load_evaluator("qa")

eval_results = evaluator.evaluate_strings(
prediction="北京今天是晴天,气温25°C",
reference="北京天气预报",
input="北京的天气如何?"
)

print(f"评分: {eval_results['score']}")

告警和通知

from enum import Enum
from abc import ABC, abstractmethod

class AlertLevel(Enum):
INFO = 1
WARNING = 2
CRITICAL = 3

class NotificationChannel(ABC):
@abstractmethod
def send(self, message: str, level: AlertLevel):
pass

class EmailNotification(NotificationChannel):
def __init__(self, recipients: list):
self.recipients = recipients

def send(self, message: str, level: AlertLevel):
# 发送邮件
print(f"发送邮件给 {self.recipients}: {message}")

class SlackNotification(NotificationChannel):
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url

def send(self, message: str, level: AlertLevel):
# 发送Slack消息
print(f"发送Slack消息: {message}")

class AlertManager:
def __init__(self):
self.channels = []
self.alert_rules = []

def add_channel(self, channel: NotificationChannel):
self.channels.append(channel)

def add_rule(self, condition_func, level: AlertLevel, message: str):
"""添加警报规则"""
self.alert_rules.append({
"condition": condition_func,
"level": level,
"message": message
})

def check_alerts(self, **metrics):
"""检查是否触发警报"""
for rule in self.alert_rules:
if rule["condition"](**metrics):
for channel in self.channels:
channel.send(rule["message"], rule["level"])

# 使用
alert_manager = AlertManager()
alert_manager.add_channel(SlackNotification("https://hooks.slack.com/..."))

# 添加规则
alert_manager.add_rule(
condition_func=lambda error_rate: error_rate > 5,
level=AlertLevel.CRITICAL,
message="错误率超过5%,请立即检查!"
)

# 检查
alert_manager.check_alerts(error_rate=6.5)

常见问题

Q: LangSmith有成本吗? A: 有免费层和付费层。根据追踪调用数量计费。

Q: 如何在本地调试而不发送数据到LangSmith? A: 设置 LANGSMITH_TRACING_V2=false 或不设置API密钥。

Q: 如何追踪自定义代码? A: 使用 @traceable 装饰器或 trace() 上下文管理器。