跳到主要内容

函数式API详解

函数式API概览

Functional API允许使用函数式的方式定义工作流,而不需要显式定义图。

核心概念

@entrypoint 装饰器

标记工作流的入口点:

from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

@entrypoint(checkpointer=InMemorySaver())
def my_workflow(input_data: str) -> str:
"""工作流的入口"""
# 所有代码都在这个函数中
result = process_step_1(input_data)
result = process_step_2(result)
return result

@task 装饰器

标记可以单独执行的任务:

from langgraph.func import task

@task
def long_running_task(data: str) -> str:
"""这是一个task,可以独立执行和追踪"""
import time
time.sleep(1)
return f"Processed: {data}"

@entrypoint
def workflow(data: str):
# 执行task
result = long_running_task(data).result()
return result

工作流定义

简单的线性工作流

from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

@task
def fetch_data(url: str) -> str:
"""获取数据"""
return f"数据来自{url}"

@task
def process_data(data: str) -> str:
"""处理数据"""
return data.upper()

@task
def save_data(data: str) -> bool:
"""保存数据"""
return True

@entrypoint(checkpointer=InMemorySaver())
def etl_workflow(url: str) -> bool:
"""ETL工作流"""
data = fetch_data(url).result()
processed = process_data(data).result()
saved = save_data(processed).result()
return saved

# 执行
result = etl_workflow("https://example.com/data")
print(result) # True

条件逻辑

@task
def validate(data: str) -> bool:
"""验证数据"""
return len(data) > 0

@task
def process_valid(data: str) -> str:
return f"处理: {data}"

@task
def process_invalid(data: str) -> str:
return "无效数据"

@entrypoint(checkpointer=InMemorySaver())
def conditional_workflow(data: str) -> str:
"""包含条件逻辑的工作流"""
is_valid = validate(data).result()

if is_valid:
return process_valid(data).result()
else:
return process_invalid(data).result()

# 执行
result = conditional_workflow("hello")
print(result) # 处理: hello

循环

@task
def check_condition(counter: int) -> bool:
"""检查条件"""
return counter < 5

@task
def process_item(item: int) -> int:
"""处理项"""
return item + 1

@entrypoint(checkpointer=InMemorySaver())
def loop_workflow() -> int:
"""包含循环的工作流"""
counter = 0

while check_condition(counter).result():
counter = process_item(counter).result()

return counter

result = loop_workflow()
print(result) # 5

中断与恢复

使用interrupt()暂停workflow并等待用户输入:

from langgraph.types import interrupt

@task
def generate_report(data: str) -> str:
"""生成报告"""
return f"报告: {data}"

@entrypoint(checkpointer=InMemorySaver())
def approval_workflow(data: str):
"""需要人工批准的工作流"""
report = generate_report(data).result()

# 暂停并请求批准
approval = interrupt({
"report": report,
"message": "请批准这个报告"
})

if approval:
return f"已批准: {report}"
else:
return "已拒绝"

# 第一次运行到中断点
try:
result = approval_workflow("重要数据")
except Exception:
# workflow被中断
pass

# 用户审查后,resume workflow
# result = workflow.resume(approval=True)

异步任务

import asyncio

@task
async def async_fetch(url: str) -> str:
"""异步获取数据"""
await asyncio.sleep(1)
return f"来自{url}的数据"

@task
async def async_process(data: str) -> str:
"""异步处理"""
await asyncio.sleep(0.5)
return data.upper()

@entrypoint(checkpointer=InMemorySaver())
async def async_workflow(url: str) -> str:
"""异步工作流"""
data = await async_fetch(url)
result = await async_process(data)
return result

# 执行
import asyncio
result = asyncio.run(async_workflow("https://example.com"))

并行任务

@task
def task_a() -> str:
return "结果A"

@task
def task_b() -> str:
return "结果B"

@task
def task_c() -> str:
return "结果C"

@entrypoint(checkpointer=InMemorySaver())
def parallel_workflow() -> str:
"""并行执行任务"""
# 获取future对象
future_a = task_a()
future_b = task_b()
future_c = task_c()

# 等待所有任务完成
result_a = future_a.result()
result_b = future_b.result()
result_c = future_c.result()

return f"{result_a}, {result_b}, {result_c}"

数据流

@task
def extract_data(source: str) -> dict:
"""提取数据"""
return {"source": source, "count": 100}

@task
def transform_data(data: dict) -> dict:
"""转换数据"""
data["count"] *= 2
return data

@task
def load_data(data: dict) -> bool:
"""加载数据"""
print(f"已加载: {data}")
return True

@entrypoint(checkpointer=InMemorySaver())
def etl_pipeline(source: str) -> bool:
"""ETL流水线"""
data = extract_data(source).result()
transformed = transform_data(data).result()
loaded = load_data(transformed).result()
return loaded

Graph API vs Functional API 对比

方面Graph APIFunctional API
代码风格声明式命令式
可视化支持不支持
条件逻辑conditional_edgesif语句
并行处理复杂Future模式
学习曲线中等平缓

何时使用Functional API

✅ 工作流主要是线性的 ✅ 有复杂的Python逻辑 ✅ 团队熟悉函数式编程 ✅ 不需要可视化

❌ 需要复杂的分支和循环 ❌ 需要可视化工作流 ❌ 需要详细的执行追踪

最佳实践

  • 保持任务简单,一个职责
  • 使用类型提示
  • 添加文档字符串
  • 适当使用checkpoint进行恢复
  • 处理异常
  • 测试每个任务

下一步