人工在环工作流
人工在环(HITL)的概念
人工在环(Human in the loop)是指在关键决策点让人类审核或批准代理的操作。这对于需要高度准确性和可信度的应用至关重要。
审批工作流
1. 基本审批流程
代理执行
↓
检查是否需要审批
↓
是 → 暂停,等待人类审批
↓
人类决策:批准/拒绝/修改
↓
继续执行或回滚
↓
完成
2. 实现审批
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call
def require_approval_for_sensitive_tools(tool_call):
"""检查是否需要人工审批"""
sensitive_tools = ["delete_user", "transfer_funds", "send_email"]
if tool_call["name"] in sensitive_tools:
# 生成审批请求
approval_request = {
"tool": tool_call["name"],
"args": tool_call["args"],
"timestamp": datetime.now().isoformat(),
"reason": f"执行敏感操作:{tool_call['name']}"
}
# 发送给审批队列
approval_id = submit_for_approval(approval_request)
# 等待审批
approval = wait_for_approval(approval_id, timeout=300)
if not approval["approved"]:
raise PermissionError(
f"审批被拒绝: {approval.get('reason', '无理由')}"
)
# 如果有修改,应用修改
if approval.get("modifications"):
tool_call["args"].update(approval["modifications"])
return tool_call
# 在代理中使用
agent = create_agent(
model="anthropic:claude-sonnet-4",
tools=[delete_user, transfer_funds, send_email],
middleware=[wrap_tool_call(require_approval_for_sensitive_tools)]
)
中断与恢复
1. 设置中断点
from langchain.types import interrupt
def agent_with_interrupts():
"""在关键点设置中断"""
# 执行第一段逻辑
analysis = perform_analysis(data)
# 中断以获取用户确认
user_decision = interrupt({
"message": "分析完成,是否继续?",
"analysis_result": analysis,
"options": ["继续", "修改参数", "取消"]
})
if user_decision == "继续":
result = continue_execution()
elif user_decision == "修改参数":
new_params = get_user_input()
result = redo_analysis(data, new_params)
else:
return "已取消"
return result
2. 恢复执行
class InterruptibleAgent:
def __init__(self, agent):
self.agent = agent
self.checkpoints = {}
def save_checkpoint(self, checkpoint_id: str, state: dict):
"""保存检查点"""
self.checkpoints[checkpoint_id] = state
def resume_from_checkpoint(self, checkpoint_id: str, user_input: str):
"""从检查点恢复"""
state = self.checkpoints.get(checkpoint_id)
if not state:
raise ValueError(f"检查点 {checkpoint_id} 不存在")
# 使用保存的状态和用户输入继续
state["user_response"] = user_input
return self.agent.invoke(state)
# 使用
agent_with_interrupts = InterruptibleAgent(agent)
# 第一步:执行到中断点
result = agent_with_interrupts.agent.invoke({...})
checkpoint_id = result.get("checkpoint_id")
# 用户审核并做出决定
user_decision = get_user_approval(result)
# 第二步:恢复执行
final_result = agent_with_interrupts.resume_from_checkpoint(
checkpoint_id,
user_decision
)
审核队列
1. 审核任务管理
from dataclasses import dataclass
from datetime import datetime
import uuid
@dataclass
class ReviewTask:
id: str
task_type: str # "approval", "feedback", "verification"
content: dict
priority: str # "high", "normal", "low"
status: str # "pending", "approved", "rejected", "modified"
created_at: datetime
assigned_to: str # 审核者ID
completed_at: datetime = None
decision: dict = None
class ReviewQueue:
def __init__(self):
self.tasks = {}
self.assigned_tasks = {} # 用户 -> 任务列表
def submit_task(self, task_type: str, content: dict, priority: str = "normal"):
"""提交审核任务"""
task = ReviewTask(
id=str(uuid.uuid4()),
task_type=task_type,
content=content,
priority=priority,
status="pending",
created_at=datetime.now(),
assigned_to=""
)
self.tasks[task.id] = task
return task.id
def assign_task(self, task_id: str, reviewer_id: str):
"""分配任务给审核者"""
task = self.tasks[task_id]
task.assigned_to = reviewer_id
if reviewer_id not in self.assigned_tasks:
self.assigned_tasks[reviewer_id] = []
self.assigned_tasks[reviewer_id].append(task_id)
def complete_task(self, task_id: str, decision: dict):
"""完成审核任务"""
task = self.tasks[task_id]
task.status = decision.get("status", "approved")
task.decision = decision
task.completed_at = datetime.now()
def get_pending_tasks(self, reviewer_id: str):
"""获取待审核任务"""
return [
self.tasks[task_id]
for task_id in self.assigned_tasks.get(reviewer_id, [])
if self.tasks[task_id].status == "pending"
]
def get_task_stats(self):
"""获取统计信息"""
statuses = {}
for task in self.tasks.values():
statuses[task.status] = statuses.get(task.status, 0) + 1
return statuses
# 使用
review_queue = ReviewQueue()
# 提交任务
task_id = review_queue.submit_task(
task_type="approval",
content={"action": "delete_user", "user_id": "user123"},
priority="high"
)
# 分配给审核者
review_queue.assign_task(task_id, "reviewer001")
# 审核者查看任务
pending = review_queue.get_pending_tasks("reviewer001")
# 完成审核
review_queue.complete_task(task_id, {
"status": "approved",
"reason": "操作正当"
})
反馈循环
1. 收集用户反馈
class FeedbackCollector:
def __init__(self):
self.feedback_data = []
def collect_feedback(self, agent_output: str, user_id: str):
"""收集用户对代理输出的反馈"""
feedback = {
"output": agent_output,
"user_id": user_id,
"timestamp": datetime.now(),
"rating": None, # 1-5星
"comments": None,
"corrections": None,
"issues": []
}
# 向用户显示反馈界面,实际收集反馈
feedback.update(get_user_feedback_ui(agent_output))
self.feedback_data.append(feedback)
return feedback
def analyze_feedback(self):
"""分析反馈数据"""
total_feedbacks = len(self.feedback_data)
avg_rating = sum(
f["rating"] for f in self.feedback_data if f["rating"]
) / total_feedbacks if total_feedbacks > 0 else 0
common_issues = {}
for feedback in self.feedback_data:
for issue in feedback.get("issues", []):
common_issues[issue] = common_issues.get(issue, 0) + 1
return {
"total_feedbacks": total_feedbacks,
"average_rating": avg_rating,
"common_issues": common_issues,
"improvement_areas": [
issue for issue, count in common_issues.items()
if count > total_feedbacks * 0.2 # 超过20%的反馈
]
}
# 使用
feedback_collector = FeedbackCollector()
# 收集反馈
feedback_collector.collect_feedback(agent_output, user_id)
# 分析趋势
analysis = feedback_collector.analyze_feedback()
print(f"平均评分: {analysis['average_rating']}")
print(f"常见问题: {analysis['common_issues']}")
审计日志
1. 记录所有操作
import logging
from datetime import datetime
class AuditLogger:
def __init__(self, log_file: str):
self.logger = logging.getLogger("audit")
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_action(self, action: str, actor: str, resource: str, result: str, **extra):
"""记录操作"""
message = (
f"Action: {action} | Actor: {actor} | "
f"Resource: {resource} | Result: {result}"
)
if extra:
message += f" | Extra: {extra}"
self.logger.info(message)
def log_approval(self, approval_request_id: str, approver: str, decision: str, reason: str):
"""记录审批决定"""
self.logger.info(
f"Approval: {approval_request_id} | Approver: {approver} | "
f"Decision: {decision} | Reason: {reason}"
)
def generate_audit_report(self, start_date, end_date):
"""生成审计报告"""
# 从日志文件中提取指定日期范围的记录
pass
# 使用
audit_logger = AuditLogger("audit.log")
# 记录操作
audit_logger.log_action(
action="delete",
actor="user_123",
resource="user_456",
result="success"
)
# 记录审批
audit_logger.log_approval(
approval_request_id="apr_789",
approver="reviewer_001",
decision="approved",
reason="操作符合政策"
)
级联审批
class CascadingApprovalFlow:
"""多级审批流程"""
def __init__(self):
self.approval_levels = [
{"level": 1, "title": "处理者", "min_amount": 0},
{"level": 2, "title": "主管", "min_amount": 1000},
{"level": 3, "title": "经理", "min_amount": 5000},
{"level": 4, "title": "总监", "min_amount": 50000},
]
def get_required_approvals(self, amount: float):
"""根据金额确定需要的审批级别"""
required_levels = []
for level in self.approval_levels:
if amount >= level["min_amount"]:
required_levels.append(level)
return required_levels
def process_approval_flow(self, request: dict):
"""处理级联审批"""
amount = request.get("amount", 0)
required_levels = self.get_required_approvals(amount)
current_status = "pending"
for level in required_levels:
# 提交该级别的审批
approval_result = submit_to_level(request, level)
if not approval_result["approved"]:
current_status = "rejected"
break
current_status = "approved"
return {
"status": current_status,
"levels_required": len(required_levels),
"levels_approved": sum(1 for level in required_levels
if approval_result.get("approved"))
}
# 使用
cascade = CascadingApprovalFlow()
# 请求$3000的转账
request = {"action": "transfer", "amount": 3000}
required_approvals = cascade.get_required_approvals(request["amount"])
print(f"需要的审批级别: {len(required_approvals)}") # 2(主管和经理)
最佳实践
- ✅ 为敏感操作设置明确的审批点
- ✅ 保持详细的审计日志
- ✅ 提供清晰的用户界面进行审批
- ✅ 设置合理的审批超时时间
- ✅ 实现审批级联,以适应不同风险级别
- ✅ 定期分析反馈以改进系统
常见问题
Q: 审批流程太慢怎么办? A: 使用优先级队列,并行处理不相关的任务。
Q: 如何防止审批者的滥用权力? A: 实现审计日志,定期审查审批决定,配置权限约束。