异步子代理
异步子代理的核心区别
异步子代理与同步子代理的根本区别在于执行模型:
| 维度 | 同步子代理 | 异步子代理 |
|---|---|---|
| 执行模式 | 主代理等待子代理完成 | 主代理立即返回,后台执行 |
| 阻塞 | 主代理被阻塞 | 主代理继续响应用户 |
| 实时交互 | 否 | 是 - 支持中途更新和取消 |
| 调用方式 | 内进程调用(IPC) | 网络调用(HTTP/ASGI) |
| 部署 | 同一进程 | 各自独立部署 |
代理间通信方式
异步子代理基于 Agent Protocol(开放标准),支持多种通信方式:
1. ASGI 传输(共部署模式)
当子代理与主代理在同一部署中时,使用高效的进程内通信:
from deepagents import AsyncSubAgent, create_deep_agent
async_subagents = [
AsyncSubAgent(
name="researcher",
description="后台研究任务",
graph_id="researcher",
# 无 URL → ASGI 传输
),
]
agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=async_subagents,
)
2. HTTP 传输(远程部署模式)
当子代理部署在不同服务器上时,使用 HTTP 通信:
async_subagents = [
AsyncSubAgent(
name="coder",
description="远程编码代理",
graph_id="coder",
url="https://coder-deployment.langsmith.dev", # 远程 URL
headers={"Authorization": "Bearer token"}, # 认证
),
]
agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=async_subagents,
)
容量并行执行
使用场景
异步子代理最适合以下场景:
- 多个独立的长时间运行任务
- 用户需要实时反馈和进度更新
- 需要在高层协调多个专门工作流程
- 支持中途修改任务或取消任务
实际例子:并行内容生成管道
from deepagents import AsyncSubAgent, create_deep_agent
async_subagents = [
AsyncSubAgent(
name="blog_writer",
description="撰写博客文章",
graph_id="blog_writer",
),
AsyncSubAgent(
name="social_media_manager",
description="创建社交媒体内容",
graph_id="social_media_manager",
),
AsyncSubAgent(
name="newsletter_creator",
description="编辑邮件通讯",
graph_id="newsletter_creator",
),
]
orchestrator = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
system_prompt="""你是内容 CEO。
当用户给出一个主题时,并行启动三个代理:
1. 让博客作者撰写深度文章
2. 让社交媒体经理创建相关帖子
3. 让通讯编辑创建相关内容
然后监控它们的进度,如需要可提供指导。""",
subagents=async_subagents,
)
# 为了支持长期任务和进度追踪
config = {"configurable": {"thread_id": "content-batch-001"}}
result = orchestrator.invoke(
{"messages": [{"role": "user", "content": "为'AI 日报'话题创建内容"}]},
config=config,
)
任务管理
启动异步任务
from langchain_core.utils.uuid import uuid7
from deepagents import create_deep_agent, AsyncSubAgent
agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=[
AsyncSubAgent(
name="data_processor",
description="处理大型数据集",
graph_id="processor",
),
],
)
# 启动任务(主代理立即返回)
config = {"configurable": {"thread_id": str(uuid7())}}
result = agent.invoke(
{"messages": [{"role": "user", "content": "处理'customers.csv'文件"}]},
config=config,
)
print(f"任务已启动!任务 ID:{config['configurable']['thread_id']}")
检查任务进度
# 稍后检查进度
progress_result = agent.invoke(
{"messages": [{"role": "user", "content": "进度如何?"}]},
config=config, # 使用相同的线程 ID
)
print(progress_result["messages"][-1].content)
向任务发送更新
from langgraph.types import Command
# 给运行中的任务发送新指令
update_result = agent.invoke(
Command(update=[
{
"name": "data_processor",
"value": "使用 CSV 格式导出结果到 /outputs/results.csv"
}
]),
config=config, # 同一线程
)
取消任务
# 停止运行中的任务
cancel_result = agent.invoke(
Command(cancel=["data_processor"]),
config=config,
)
print("任务已取消")
实际例子:研究和分析工作流
from deepagents import AsyncSubAgent, create_deep_agent
def log_progress(task: str, status: str):
"""记录任务进度"""
print(f"[{task}] {status}")
async_subagents = [
AsyncSubAgent(
name="literature_searcher",
description="在学术数据库中搜索论文",
graph_id="searcher",
),
AsyncSubAgent(
name="data_analyst",
description="分析收集的数据",
graph_id="analyzer",
),
AsyncSubAgent(
name="report_writer",
description="编写研究报告",
graph_id="writer",
),
]
research_coordinator = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
system_prompt="""你是研究项目经理。
给定一个研究课题,启动三个并行工作流:
1. literature_searcher:搜索相关论 文和数据
2. data_analyst:开始分析已有的数据
3. report_writer:准备报告框架
监控进度:
- 每隔一段时间检查各个代理的进度
- 当任何代理遇到问题时,提供建议
- 最后综合所有输出生成最终报告
支持用户的中途修改和指导。""",
subagents=async_subagents,
)
# 启动研究项目
config = {"configurable": {"thread_id": "research-2024-quantum"}}
log_progress("project", "starting")
initial_result = research_coordinator.invoke(
{"messages": [{"role": "user", "content": "请研究量子纠缠的最新进展"}]},
config=config,
)
log_progress("project", "initialized")
# 等待一段时间(模拟用户操作)
import time
time.sleep(5)
# 用户检查进度
log_progress("project", "checking status")
status_result = research_coordinator.invoke(
{"messages": [{"role": "user", "content": "各个研究小组的进展如何?"}]},
config=config,
)
log_progress("project", "status checked")
# 用户提供新指令
from langgraph.types import Command
log_progress("project", "sending update")
update_result = research_coordinator.invoke(
Command(update=[
{
"name": "literature_searcher",
"value": "重点关注 2023-2024 年发表的论文"
},
{
"name": "data_analyst",
"value": "使用统计方差分析方法"
}
]),
config=config,
)
log_progress("project", "update sent")
# 最后收集结 果
final_result = research_coordinator.invoke(
{"messages": [{"role": "user", "content": "生成最终研究报告"}]},
config=config,
)
log_progress("project", "completed")
print(final_result["messages"][-1].content)
异步子代理与 LangSmith Deployments
对于生产环境,异步子代理通常与 LangSmith Deployments 集成:
# 部署架构
# ┌─────────────────────────────────────┐
# │ LangSmith Deployment │
# │ ┌──────────────────────────────┐ │
# │ │ Main Orchestrator Agent │ │
# │ │ (HTTP API 端点) │ │
# │ └──────────────────────────────┘ │
# │ ↓↓↓ │
# │ ┌──────────────────────────────┐ │
# │ │ Async SubAgent Runners │ │
# │ │ ├─ researcher │ │
# │ │ ├─ analyzer │ │
# │ │ └─ writer │ │
# │ └──── ──────────────────────────┘ │
# └─────────────────────────────────────┘
部署配置示例
# langgraph.json
{
"dependencies": ["."],
"graphs": {
"orchestrator": "./orchestrator.py:main_agent",
"researcher": "./researcher.py:researcher_agent",
"analyzer": "./analyzer.py:analyzer_agent",
"writer": "./writer.py:writer_agent"
},
"env": ".env"
}
性能考虑
1. 超时配置
长时间运行的任务需要适当的超时配置:
async_subagent = AsyncSubAgent(
name="long_running_task",
description="处理大型数据集",
graph_id="processor",
url="https://worker.langsmith.dev",
# 支持自定义超时(在部署配置中设置)
)