跳到主要内容

人工在环工作流

人工在环(HITL)的概念

人工在环(Human-In-The-Loop,HITL)工作流允许在代理执行敏感或关键操作前暂停执行,让人类审查和批准这些操作。这在许多生产场景中至关重要,特别是涉及删除、修改关键数据或调用外部 API 的情况。

HITL 工作流

配置敏感操作

基本配置

from deepagents import create_deep_agent

agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
tools=[delete_file, modify_database, send_email],
interrupt_on={
"delete_file": True, # 删除需要批准
"modify_database": True, # 数据库修改需要批准
"send_email": {"allowed_decisions": ["approve", "reject"]}, # 邮件:只能批准/拒绝
}
)

决策类型

三种可能的人工决策:

决策说明何时使用
approve使用原始参数执行低风险、简单操作
edit修改参数后执行需要微调的操作
reject完全跳过操作需要完全避免的操作

按操作的细粒度控制

agent = create_deep_agent(
interrupt_on={
# 删除操作:所有决策都允许
"delete_file": {
"allowed_decisions": ["approve", "edit", "reject"]
},

# 修改操作:允许批准或编辑,但不能拒绝
"modify_database": {
"allowed_decisions": ["approve", "edit"],
},

# 发送操作:只能批准,这是强制的
"send_critical_alert": {
"allowed_decisions": ["approve"],
},

# 低风险操作:无需审批
"read_file": False,
}
)

处理中断

与中断交互

from langchain_core.utils.uuid import uuid7
from langgraph.types import Command

# 创建一个配置来追踪线程
config = {"configurable": {"thread_id": str(uuid7())}}

# 第一次调用 - 代理遇到敏感操作并暂停
result = agent.invoke(
{"messages": [{"role": "user", "content": "删除临时文件 /tmp/cache/*"}]},
config=config,
version="v2", # 启用中断功能
)

# 检查是否有中断
if result.get("interrupts"):
print("⚠️ 代理需要人类审批")

interrupt_value = result["interrupts"][0].value
action_requests = interrupt_value["action_requests"]
review_configs = interrupt_value["review_configs"]

# 显示待处理的操作
for action in action_requests:
print(f"\n操作:{action['name']}")
print(f"参数:{action['args']}")
print(f"描述:{action.get('description', '')}")

# 获取人类决策
human_decisions = get_human_decisions(action_requests)

# 继续执行
final_result = agent.invoke(
Command(resume={"decisions": human_decisions}),
config=config,
version="v2",
)

print(f"\n执行结果:{final_result['messages'][-1].content}")

获取人类决策的实现

def get_human_decisions(action_requests: list) -> list[dict]:
"""从人类获取对每个操作的决策"""

decisions = []

for i, action in enumerate(action_requests):
print(f"\n【操作 {i+1}/{len(action_requests)}】")
print(f"工具:{action['name']}")
print(f"参数:{json.dumps(action['args'], indent=2, ensure_ascii=False)}")

while True:
choice = input("请选择(A=批准/E=编辑/R=拒绝):").upper()

if choice == "A":
decisions.append({"type": "approve"})
break
elif choice == "E":
edited_args = edit_arguments(action['args'])
decisions.append({
"type": "edit",
"args": edited_args
})
break
elif choice == "R":
decisions.append({"type": "reject"})
break
else:
print("无效选择,请重试")

return decisions

def edit_arguments(original_args: dict) -> dict:
"""允许用户编辑操作参数"""

edited_args = original_args.copy()

for key in edited_args:
current_value = edited_args[key]
print(f"\n编辑参数 '{key}'")
print(f"当前值:{current_value}")

new_value = input("新值(留空保持不变):").strip()
if new_value:
edited_args[key] = new_value

return edited_args

实际应用例子

例1:数据删除确认

from deepagents import create_deep_agent

def delete_records(query: str, limit: int) -> str:
"""删除匹配查询的数据库记录"""
# 实际的删除逻辑
return f"已删除 {limit} 条记录"

def list_records(query: str) -> list[dict]:
"""列出将被删除的记录预览"""
# 返回将被删除的记录
return [{"id": 1, "name": "record1"}, ...]

agent = create_deep_agent(
system_prompt="""你是数据管理助手。

当用户要求删除记录时:
1. 首先使用 list_records 显示将被删除的内容
2. 然后调用 delete_records""",
tools=[delete_records, list_records],
interrupt_on={
"delete_records": {
"allowed_decisions": ["approve", "edit", "reject"]
}
}
)

# 使用工作流
config = {"configurable": {"thread_id": "deletion-task-001"}}

result = agent.invoke(
{"messages": [{"role": "user", "content": "删除 2024 年之前的旧记录"}]},
config=config,
version="v2",
)

if result.get("interrupts"):
# 显示预览和等待人类决策...
pass

例2:财务交易审批

def transfer_funds(
from_account: str,
to_account: str,
amount: float
) -> str:
"""执行账户转账"""
return f"已从 {from_account} 转账 {amount}{to_account}"

def verify_accounts(from_account: str, to_account: str) -> dict:
"""验证账户信息"""
return {
"from_account": {"balance": 10000, "name": "Main Account"},
"to_account": {"name": "External Account", "risk": "high"},
}

agent = create_deep_agent(
system_prompt="""你是财务处理助手。

处理转账时:
1. 使用 verify_accounts 验证账户
2. 提供账户摘要
3. 调用 transfer_funds""",
tools=[transfer_funds, verify_accounts],
interrupt_on={
"transfer_funds": {
"allowed_decisions": ["approve", "reject"] # 财务:编辑可能不安全
}
}
)

例3:API 调用审批

def call_external_api(
endpoint: str,
method: str,
data: dict
) -> str:
"""调用外部 API"""
# 实际的 API 调用
return f"API 响应:..."

agent = create_deep_agent(
system_prompt="""你是 API 集成助手。

在调用第三方 API 前:
1. 验证端点和参数
2. 评估潜在风险
3. 记录意图""",
tools=[call_external_api],
interrupt_on={
"call_external_api": {
"allowed_decisions": ["approve", "edit", "reject"]
}
}
)

与子代理结合

子代理可以继承或覆盖主代理的 HITL 设置:

from deepagents import create_deep_agent

# 定义子代理的附加控制
finance_subagent = {
"name": "finance_handler",
"description": "处理财务操作",
"system_prompt": "你是财务处理员",
"tools": [transfer_funds, record_transaction],
"interrupt_on": {
"transfer_funds": {"allowed_decisions": ["approve"]}, # 子代理级别的控制
"record_transaction": False, # 不需要审批
}
}

# 主代理
main_agent = create_deep_agent(
system_prompt="你是主协调员",
subagents=[finance_subagent],
interrupt_on={
"some_tool": True, # 主代理的控制
}
)

日志和审计

审计日志

import json
from datetime import datetime

class AuditLogger:
def __init__(self, log_file: str):
self.log_file = log_file

def log_operation(
self,
operation_name: str,
args: dict,
decision: str,
approved_by: str,
timestamp: str = None,
):
"""记录审批操作"""

timestamp = timestamp or datetime.now().isoformat()

log_entry = {
"timestamp": timestamp,
"operation": operation_name,
"arguments": args,
"decision": decision,
"reviewed_by": approved_by,
}

with open(self.log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")

def get_audit_trail(self, start_date=None, end_date=None):
"""查询审计日志"""
entries = []

with open(self.log_file, "r") as f:
for line in f:
entry = json.loads(line)
entries.append(entry)

return entries

# 使用
audit_logger = AuditLogger("/var/log/agent-audit.log")

# 在决策后记录
audit_logger.log_operation(
operation_name="delete_file",
args={"path": "/tmp/cache/*"},
decision="approve",
approved_by="admin@example.com",
)

最佳实践

1. 明确的操作描述

# ❌ 不清楚
agent = create_deep_agent(
interrupt_on={"process": True}
)

# ✅ 清晰明确
agent = create_deep_agent(
system_prompt="""
关键操作会请求人工批准:
- 删除超过 1000 条记录的操作
- 转账超过 $10,000
- 修改系统配置
""",
interrupt_on={
"bulk_delete": {"allowed_decisions": ["approve", "edit", "reject"]},
"large_transfer": {"allowed_decisions": ["approve", "reject"]},
"modify_config": {"allowed_decisions": ["approve"]},
}
)

2. 合理的粒度

# ❌ 过度审批(每个操作都需要)
interrupt_on={
key: True for key in all_tools
}

# ✅ 只审批真正敏感的操作
interrupt_on={
"delete_user": True,
"modify_payment": True,
"send_notification": False, # 不需要
"read_document": False, # 安全操作
}

3. 超时和限制

# 设置审批超时(在生产环境中重要)
def agent_invoke_with_timeout(agent, messages, timeout_minutes=5):
"""带超时的代理调用"""

import threading

result = {"value": None, "timed_out": False}

def invoke():
result["value"] = agent.invoke(messages, version="v2")

thread = threading.Thread(target=invoke)
thread.daemon = True
thread.start()
thread.join(timeout=timeout_minutes * 60)

if thread.is_alive():
result["timed_out"] = True
# 处理超时情况

return result

下一步