跳到主要内容

消息系统与对话

消息系统基础

消息是LangChain中的核心数据结构,所有代理与模型的交互都是通过消息进行的。

消息类型

1. HumanMessage(用户消息)

from langchain_core.messages import HumanMessage

# 简单消息
msg = HumanMessage(content="今天天气如何?")

# 可以包含其他元数据
msg = HumanMessage(
content="出去玩的天气如何?",
metadata={
"user_id": "user123",
"location": "旧金山"
}
)

2. AIMessage(模型消息)

from langchain_core.messages import AIMessage

# 模型生成的回答
msg = AIMessage(content="今天是晴天,气温25°C")

# 可以包含工具调用
msg = AIMessage(
content="", # 如果有工具调用,通常为空
tool_calls=[{
"id": "call_123",
"name": "get_weather",
"args": {"city": "旧金山"}
}]
)

3. ToolMessage(工具结果消息)

from langchain_core.messages import ToolMessage

# 工具执行的结果
msg = ToolMessage(
content="晴天,气温25°C",
tool_call_id="call_123",
name="get_weather"
)

4. SystemMessage(系统消息)

from langchain_core.messages import SystemMessage

# 系统级指导
msg = SystemMessage(content="""
你是一个专业的天气顾问。
总是提供详细的天气信息和相关建议。
使用摄氏度表示温度。
""")

5. 其他消息类型

from langchain_core.messages import FunctionMessage

# 函数调用结果(向后兼容)
msg = FunctionMessage(
content="结果内容",
name="function_name"
)

消息历史管理

1. 基本消息列表

messages = [
SystemMessage(content="你是一个有帮助的助手"),
HumanMessage(content="旧金山的天气如何?"),
AIMessage(content="我来查询一下..."),
ToolMessage(content="晴天25°C", tool_call_id="...", name="get_weather"),
AIMessage(content="旧金山目前是晴天...")
]

# 传递给代理
result = agent.invoke({"messages": messages})

2. 消息去重与清理

def clean_messages(messages):
"""清理重复或过时的消息"""
unique_messages = []
seen_ids = set()

for msg in reversed(messages): # 从最新开始
if hasattr(msg, "id") and msg.id in seen_ids:
continue
seen_ids.add(getattr(msg, "id", None))
unique_messages.append(msg)

return list(reversed(unique_messages))

messages = clean_messages(messages)

3. 消息摘要与压缩

def summarize_messages(messages, max_length: int = 10):
"""对旧消息进行摘要以缩小上下文"""
if len(messages) <= max_length:
return messages

# 保留最新的消息
recent = messages[-max_length:]
old_messages = messages[:-max_length]

# 对旧消息进行摘要
summary_content = "之前的对话摘要:" + "\n".join([
f"{type(msg).__name__}: {msg.content[:50]}..."
for msg in old_messages
])

from langchain_core.messages import SystemMessage
summary_msg = SystemMessage(content=summary_content)

return [summary_msg] + recent

多轮对话

1. 基本多轮对话

from langchain.agents import create_agent
from langchain_core.messages import HumanMessage, AIMessage

agent = create_agent(
model="anthropic:claude-sonnet-4",
tools=[get_weather]
)

# 第一轮对话
messages = [
HumanMessage(content="北京的天气如何?")
]
result = agent.invoke({"messages": messages})
messages.extend([
HumanMessage(content="北京的天气如何?"),
result["messages"][-1] # 添加代理的回答
])

# 第二轮对话
messages.append(HumanMessage(content="那上海呢?"))
result = agent.invoke({"messages": messages})

2. 会话管理

from datetime import datetime

class ConversationHistory:
def __init__(self, session_id: str):
self.session_id = session_id
self.messages = []
self.created_at = datetime.now()
self.last_updated = datetime.now()

def add_message(self, message):
"""添加消息到历史"""
self.messages.append(message)
self.last_updated = datetime.now()

def get_messages(self):
"""获取所有消息"""
return self.messages

def clear(self):
"""清空消息"""
self.messages = []

def get_summary(self):
"""获取对话摘要"""
return {
"session_id": self.session_id,
"message_count": len(self.messages),
"created_at": self.created_at,
"last_updated": self.last_updated
}

# 使用
history = ConversationHistory(session_id="user123_session")

# 添加消息
history.add_message(HumanMessage(content="你好"))
history.add_message(AIMessage(content="你好!有什么我可以帮助的吗?"))

# 获取历史用于下一次调用
messages = history.get_messages()

3. 对话状态跟踪

class ConversationContext:
def __init__(self):
self.messages = []
self.context_data = {}
self.iteration = 0

def update(self, messages: list, **context):
"""更新对话状态"""
self.messages = messages
self.context_data.update(context)
self.iteration += 1

def get_state(self):
return {
"messages": self.messages,
"context": self.context_data,
"iteration": self.iteration
}

# 在代理执行中使用
context = ConversationContext()

for turn in range(5):
result = agent.invoke({
"messages": context.messages + [HumanMessage(content=user_input)]
})

context.update(
messages=result["messages"],
last_tool_used=get_last_tool_call(result),
user_satisfaction=None
)

内存管理策略

1. 有限窗口内存

class LimitedWindowMemory:
"""只保留最后N条消息"""

def __init__(self, window_size: int = 10):
self.window_size = window_size
self.messages = []

def add(self, message):
self.messages.append(message)
# 保持窗口大小
if len(self.messages) > self.window_size:
self.messages.pop(0)

def get_messages(self):
return self.messages

memory = LimitedWindowMemory(window_size=10)
memory.add(HumanMessage(content="..."))

2. 摘要内存

from langchain_core.messages import SystemMessage
from langchain_anthropic import ChatAnthropic

class SummaryMemory:
"""自动摘要旧消息"""

def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
self.messages = []
self.summary = ""
self.model = ChatAnthropic(model="claude-sonnet-4")

def add(self, message):
self.messages.append(message)

if len(self.messages) > self.max_messages:
# 产生摘要
old_messages = self.messages[:-self.max_messages]
self.summary = self._summarize(old_messages)
self.messages = self.messages[-self.max_messages:]

def _summarize(self, messages):
"""使用模型生成摘要"""
content = "\n".join([
f"{type(m).__name__}: {m.content}"
for m in messages
])

response = self.model.invoke([
SystemMessage(content="请生成对话摘要"),
HumanMessage(content=content)
])

return response.content

def get_messages(self):
"""包含摘要的消息列表"""
if self.summary:
return [
SystemMessage(content=f"之前会话摘要: {self.summary}")
] + self.messages
return self.messages

memory = SummaryMemory()
memory.add(HumanMessage(content="..."))

3. 实体内存

from typing import Dict, List

class EntityMemory:
"""记住对话中提及的实体(人、地、物等)"""

def __init__(self):
self.entities: Dict[str, List[str]] = {}

def extract_and_store(self, content: str):
"""从内容中提取实体"""
# 这里可以使用NLP库提取实体
# 示例:
entities = {
"people": ["Alice", "Bob"],
"places": ["北京", "上海"],
"products": ["产品A", "产品B"]
}

for entity_type, entity_list in entities.items():
if entity_type not in self.entities:
self.entities[entity_type] = []
self.entities[entity_type].extend(entity_list)

def get_context(self):
"""获取实体上下文"""
context = "已知实体:\n"
for entity_type, entities in self.entities.items():
context += f"{entity_type}: {', '.join(set(entities))}\n"
return context

消息格式化

1. 显示对话历史

def format_conversation(messages):
"""格式化对话为可读形式"""
output = []

for msg in messages:
if isinstance(msg, HumanMessage):
output.append(f"👤 用户: {msg.content}")
elif isinstance(msg, AIMessage):
output.append(f"🤖 助手: {msg.content}")
elif isinstance(msg, ToolMessage):
output.append(f"🔧 工具: {msg.content}")
elif isinstance(msg, SystemMessage):
output.append(f"⚙️ 系统: {msg.content}")

return "\n".join(output)

# 使用
print(format_conversation(messages))

2. 导出对话

import json

def export_conversation(messages, format="json"):
"""导出对话为不同格式"""

if format == "json":
data = {
"messages": [
{
"type": type(msg).__name__,
"content": msg.content,
"metadata": getattr(msg, "metadata", {})
}
for msg in messages
]
}
return json.dumps(data, ensure_ascii=False, indent=2)

elif format == "markdown":
lines = []
for msg in messages:
if isinstance(msg, HumanMessage):
lines.append("## 用户")
elif isinstance(msg, AIMessage):
lines.append("## 助手")
lines.append(msg.content)
lines.append("")
return "\n".join(lines)

elif format == "csv":
import csv
from io import StringIO

output = StringIO()
writer = csv.writer(output)
writer.writerow(["Type", "Content", "Timestamp"])

for msg in messages:
writer.writerow([
type(msg).__name__,
msg.content,
getattr(msg, "metadata", {}).get("timestamp", "")
])

return output.getvalue()

性能优化

1. 消息批处理

def batch_messages(messages, batch_size=5):
"""将消息分批处理"""
for i in range(0, len(messages), batch_size):
yield messages[i:i + batch_size]

# 使用
for batch in batch_messages(messages):
process_batch(batch)

2. 异步消息处理

import asyncio

async def process_messages_async(messages):
"""异步处理消息"""
tasks = [process_message_async(msg) for msg in messages]
results = await asyncio.gather(*tasks)
return results

async def process_message_async(message):
# 异步处理
await asyncio.sleep(0.1)
return message

常见问题

Q: 如何限制消息历史的大小? A: 使用LimitedWindowMemory或SummaryMemory实现消息压缩。

Q: 消息会被永久保存吗? A: 默认不会。需要显式实现持久化逻辑。参考生产部署指南

Q: 如何处理敏感信息在消息中? A: 在存储前进行加密,使用人工在环工作流进行审核。