跳到主要内容

异步子代理

异步子代理的核心区别

异步子代理与同步子代理的根本区别在于执行模型:

维度同步子代理异步子代理
执行模式主代理等待子代理完成主代理立即返回,后台执行
阻塞主代理被阻塞主代理继续响应用户
实时交互是 - 支持中途更新和取消
调用方式内进程调用(IPC)网络调用(HTTP/ASGI)
部署同一进程各自独立部署

代理间通信方式

异步子代理基于 Agent Protocol(开放标准),支持多种通信方式:

1. ASGI 传输(共部署模式)

当子代理与主代理在同一部署中时,使用高效的进程内通信:

from deepagents import AsyncSubAgent, create_deep_agent

async_subagents = [
AsyncSubAgent(
name="researcher",
description="后台研究任务",
graph_id="researcher",
# 无 URL → ASGI 传输
),
]

agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=async_subagents,
)

2. HTTP 传输(远程部署模式)

当子代理部署在不同服务器上时,使用 HTTP 通信:

async_subagents = [
AsyncSubAgent(
name="coder",
description="远程编码代理",
graph_id="coder",
url="https://coder-deployment.langsmith.dev", # 远程 URL
headers={"Authorization": "Bearer token"}, # 认证
),
]

agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=async_subagents,
)

容量并行执行

使用场景

异步子代理最适合以下场景:

  • 多个独立的长时间运行任务
  • 用户需要实时反馈和进度更新
  • 需要在高层协调多个专门工作流程
  • 支持中途修改任务或取消任务

实际例子:并行内容生成管道

from deepagents import AsyncSubAgent, create_deep_agent

async_subagents = [
AsyncSubAgent(
name="blog_writer",
description="撰写博客文章",
graph_id="blog_writer",
),
AsyncSubAgent(
name="social_media_manager",
description="创建社交媒体内容",
graph_id="social_media_manager",
),
AsyncSubAgent(
name="newsletter_creator",
description="编辑邮件通讯",
graph_id="newsletter_creator",
),
]

orchestrator = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
system_prompt="""你是内容 CEO。

当用户给出一个主题时,并行启动三个代理:
1. 让博客作者撰写深度文章
2. 让社交媒体经理创建相关帖子
3. 让通讯编辑创建相关内容

然后监控它们的进度,如需要可提供指导。""",
subagents=async_subagents,
)

# 为了支持长期任务和进度追踪
config = {"configurable": {"thread_id": "content-batch-001"}}

result = orchestrator.invoke(
{"messages": [{"role": "user", "content": "为'AI 日报'话题创建内容"}]},
config=config,
)

任务管理

启动异步任务

from langchain_core.utils.uuid import uuid7
from deepagents import create_deep_agent, AsyncSubAgent

agent = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
subagents=[
AsyncSubAgent(
name="data_processor",
description="处理大型数据集",
graph_id="processor",
),
],
)

# 启动任务(主代理立即返回)
config = {"configurable": {"thread_id": str(uuid7())}}

result = agent.invoke(
{"messages": [{"role": "user", "content": "处理'customers.csv'文件"}]},
config=config,
)

print(f"任务已启动!任务 ID:{config['configurable']['thread_id']}")

检查任务进度

# 稍后检查进度
progress_result = agent.invoke(
{"messages": [{"role": "user", "content": "进度如何?"}]},
config=config, # 使用相同的线程 ID
)

print(progress_result["messages"][-1].content)

向任务发送更新

from langgraph.types import Command

# 给运行中的任务发送新指令
update_result = agent.invoke(
Command(update=[
{
"name": "data_processor",
"value": "使用 CSV 格式导出结果到 /outputs/results.csv"
}
]),
config=config, # 同一线程
)

取消任务

# 停止运行中的任务
cancel_result = agent.invoke(
Command(cancel=["data_processor"]),
config=config,
)

print("任务已取消")

实际例子:研究和分析工作流

from deepagents import AsyncSubAgent, create_deep_agent

def log_progress(task: str, status: str):
"""记录任务进度"""
print(f"[{task}] {status}")

async_subagents = [
AsyncSubAgent(
name="literature_searcher",
description="在学术数据库中搜索论文",
graph_id="searcher",
),
AsyncSubAgent(
name="data_analyst",
description="分析收集的数据",
graph_id="analyzer",
),
AsyncSubAgent(
name="report_writer",
description="编写研究报告",
graph_id="writer",
),
]

research_coordinator = create_deep_agent(
model="anthropic:claude-3-5-sonnet-20241022",
system_prompt="""你是研究项目经理。

给定一个研究课题,启动三个并行工作流:
1. literature_searcher:搜索相关论文和数据
2. data_analyst:开始分析已有的数据
3. report_writer:准备报告框架

监控进度:
- 每隔一段时间检查各个代理的进度
- 当任何代理遇到问题时,提供建议
- 最后综合所有输出生成最终报告

支持用户的中途修改和指导。""",
subagents=async_subagents,
)

# 启动研究项目
config = {"configurable": {"thread_id": "research-2024-quantum"}}

log_progress("project", "starting")

initial_result = research_coordinator.invoke(
{"messages": [{"role": "user", "content": "请研究量子纠缠的最新进展"}]},
config=config,
)

log_progress("project", "initialized")

# 等待一段时间(模拟用户操作)
import time
time.sleep(5)

# 用户检查进度
log_progress("project", "checking status")

status_result = research_coordinator.invoke(
{"messages": [{"role": "user", "content": "各个研究小组的进展如何?"}]},
config=config,
)

log_progress("project", "status checked")

# 用户提供新指令
from langgraph.types import Command

log_progress("project", "sending update")

update_result = research_coordinator.invoke(
Command(update=[
{
"name": "literature_searcher",
"value": "重点关注 2023-2024 年发表的论文"
},
{
"name": "data_analyst",
"value": "使用统计方差分析方法"
}
]),
config=config,
)

log_progress("project", "update sent")

# 最后收集结果
final_result = research_coordinator.invoke(
{"messages": [{"role": "user", "content": "生成最终研究报告"}]},
config=config,
)

log_progress("project", "completed")
print(final_result["messages"][-1].content)

异步子代理与 LangSmith Deployments

对于生产环境,异步子代理通常与 LangSmith Deployments 集成:

# 部署架构
# ┌─────────────────────────────────────┐
# │ LangSmith Deployment │
# │ ┌──────────────────────────────┐ │
# │ │ Main Orchestrator Agent │ │
# │ │ (HTTP API 端点) │ │
# │ └──────────────────────────────┘ │
# │ ↓↓↓ │
# │ ┌──────────────────────────────┐ │
# │ │ Async SubAgent Runners │ │
# │ │ ├─ researcher │ │
# │ │ ├─ analyzer │ │
# │ │ └─ writer │ │
# │ └──────────────────────────────┘ │
# └─────────────────────────────────────┘

部署配置示例

# langgraph.json

{
"dependencies": ["."],
"graphs": {
"orchestrator": "./orchestrator.py:main_agent",
"researcher": "./researcher.py:researcher_agent",
"analyzer": "./analyzer.py:analyzer_agent",
"writer": "./writer.py:writer_agent"
},
"env": ".env"
}

性能考虑

1. 超时配置

长时间运行的任务需要适当的超时配置:

async_subagent = AsyncSubAgent(
name="long_running_task",
description="处理大型数据集",
graph_id="processor",
url="https://worker.langsmith.dev",
# 支持自定义超时(在部署配置中设置)
)

2. 资源管理

# 限制并行任务数量,防止资源耗尽
async_subagents = [
AsyncSubAgent(name=f"worker_{i}", ...)
for i in range(3) # 最多 3 个并行任务
]

3. 失败恢复

from langgraph.types import Command

def handle_failed_task(task_name: str, error: str, config: dict):
"""处理失败的任务"""

# 重试逻辑
retry_result = agent.invoke(
Command(update=[
{"name": task_name, "value": f"重新开始。原错误:{error}"}
]),
config=config,
)

return retry_result

与同步子代理的比较

同步子代理:顺序处理

# 同步方式(阻塞)
researcher → analyzer → writer # 串联处理,耗时

异步子代理:并行处理

# 异步方式(非阻塞)
researcher ─┐
analyzer ├─→ 并行执行 # 平行处理,更快
writer ─┘

最佳实践

1. 任务大小适当

# ❌ 太重的子代理
AsyncSubAgent( # 试图做太多事情
name="everything",
...
)

# ✅ 合理的粒度
AsyncSubAgent(name="searcher", ...)
AsyncSubAgent(name="analyzer", ...)
AsyncSubAgent(name="writer", ...)

2. 清晰的状态管理

# 使用一致的线程 ID 跟踪工作流
config = {"configurable": {"thread_id": f"research-{topic}-{timestamp}"}}

# 所有检查点都使用相同的配置
first_step = agent.invoke(input1, config)
second_step = agent.invoke(input2, config) # 同一线程

3. 错误处理

try:
result = agent.invoke(
{"messages": [...]},
config=config,
)
except Exception as e:
print(f"代理执行失败:{e}")

# 重试或降级处理
fallback_result = handle_failure(e)

下一步