函数式API详解
函数式API概览
Functional API允许使用函数式的方式定义工作流,而不需要显式定义图。
核心概念
@entrypoint 装饰器
标记工作流的入口点:
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
@entrypoint(checkpointer=InMemorySaver())
def my_workflow(input_data: str) -> str:
"""工作流的入口"""
# 所有代码都在这个函数中
result = process_step_1(input_data)
result = process_step_2(result)
return result
@task 装饰器
标记可以单独执行的任务:
from langgraph.func import task
@task
def long_running_task(data: str) -> str:
"""这是一个task,可以独立执行和追踪"""
import time
time.sleep(1)
return f"Processed: {data}"
@entrypoint
def workflow(data: str):
# 执行task
result = long_running_task(data).result()
return result
工作流定义
简单的线性工作流
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
@task
def fetch_data(url: str) -> str:
"""获取数据"""
return f"数据来自{url}"
@task
def process_data(data: str) -> str:
"""处理数据"""
return data.upper()
@task
def save_data(data: str) -> bool:
"""保存数据"""
return True
@entrypoint(checkpointer=InMemorySaver())
def etl_workflow(url: str) -> bool:
"""ETL工作流"""
data = fetch_data(url).result()
processed = process_data(data).result()
saved = save_data(processed).result()
return saved
# 执行
result = etl_workflow("https://example.com/data")
print(result) # True
条件逻辑
@task
def validate(data: str) -> bool:
"""验证数据"""
return len(data) > 0
@task
def process_valid(data: str) -> str:
return f"处理: {data}"
@task
def process_invalid(data: str) -> str:
return "无效数据"
@entrypoint(checkpointer=InMemorySaver())
def conditional_workflow(data: str) -> str:
"""包含条件逻辑的工作流"""
is_valid = validate(data).result()
if is_valid:
return process_valid(data).result()
else:
return process_invalid(data).result()
# 执行
result = conditional_workflow("hello")
print(result) # 处理: hello
循环
@task
def check_condition(counter: int) -> bool:
"""检查条件"""
return counter < 5
@task
def process_item(item: int) -> int:
"""处理项"""
return item + 1
@entrypoint(checkpointer=InMemorySaver())
def loop_workflow() -> int:
"""包含循环的工作流"""
counter = 0
while check_condition(counter).result():
counter = process_item(counter).result()
return counter
result = loop_workflow()
print(result) # 5
中断与恢复
使用interrupt()暂停workflow并等待用户输入:
from langgraph.types import interrupt
@task
def generate_report(data: str) -> str:
"""生成报告"""
return f"报告: {data}"
@entrypoint(checkpointer=InMemorySaver())
def approval_workflow(data: str):
"""需要人工批准的工作流"""
report = generate_report(data).result()
# 暂停并请求批准
approval = interrupt({
"report": report,
"message": "请批准这个报告"
})
if approval:
return f"已批准: {report}"
else:
return "已拒绝"
# 第一次运行到中断点
try:
result = approval_workflow("重要数据")
except Exception:
# workflow被中断
pass
# 用户审查后,resume workflow
# result = workflow.resume(approval=True)
异步任务
import asyncio
@task
async def async_fetch(url: str) -> str:
"""异步获取数据"""
await asyncio.sleep(1)
return f"来自{url}的数据"
@task
async def async_process(data: str) -> str:
"""异步处理"""
await asyncio.sleep(0.5)
return data.upper()
@entrypoint(checkpointer=InMemorySaver())
async def async_workflow(url: str) -> str:
"""异步工作流"""
data = await async_fetch(url)
result = await async_process(data)
return result
# 执行
import asyncio
result = asyncio.run(async_workflow("https://example.com"))
并行任务
@task
def task_a() -> str:
return "结果A"
@task
def task_b() -> str:
return "结果B"
@task
def task_c() -> str:
return "结果C"
@entrypoint(checkpointer=InMemorySaver())
def parallel_workflow() -> str:
"""并行执行任务"""
# 获取future对象
future_a = task_a()
future_b = task_b()
future_c = task_c()
# 等待所有任务完成
result_a = future_a.result()
result_b = future_b.result()
result_c = future_c.result()
return f"{result_a}, {result_b}, {result_c}"
数据流
@task
def extract_data(source: str) -> dict:
"""提取数据"""
return {"source": source, "count": 100}
@task
def transform_data(data: dict) -> dict:
"""转换数据"""
data["count"] *= 2
return data
@task
def load_data(data: dict) -> bool:
"""加载数据"""
print(f"已加载: {data}")
return True
@entrypoint(checkpointer=InMemorySaver())
def etl_pipeline(source: str) -> bool:
"""ETL流水线"""
data = extract_data(source).result()
transformed = transform_data(data).result()
loaded = load_data(transformed).result()
return loaded
Graph API vs Functional API 对比
| 方面 | Graph API | Functional API |
|---|---|---|
| 代码风格 | 声明式 | 命令式 |
| 可视化 | 支持 | 不支持 |
| 条件逻辑 | conditional_edges | if语句 |
| 并行处理 | 复杂 | Future模式 |
| 学习曲线 | 中等 | 平缓 |
何时使用Functional API
✅ 工作流主要是线性的 ✅ 有复杂的Python逻辑 ✅ 团队熟悉函数式编程 ✅ 不需要可视化
❌ 需要复杂的分支和循环 ❌ 需要可视化工作流 ❌ 需要详细的执行追踪
最佳实践
- 保持任务简单,一个职责
- 使用类型提示
- 添加文档字符串
- 适当使用checkpoint进行恢复
- 处理异常
- 测试每个任务