Agent 设计模式有哪些?
Agent 设计模式有哪些?
Agent 设计模式详细指南
1. Agentic Loop(代理循环)
模式描述
最基础的 Agent 架构,Agent 在感知-决策-执行-反馈的循环中不断运作。
核心流程
┌─────────────┐
│ 感知 │ 收集环境信息
└──────┬──────┘
│
┌──────▼──────┐
│ 决策 │ 分析信息,选择行动
└──────┬──────┘
│
┌──────▼──────┐
│ 执行 │ 执行选定的行动
└──────┬──────┘
│
┌──────▼──────┐
│ 反馈 │ 评估结果,更新状态
└──────┬──────┘
│
└─────────────┘(循环)
代码案例
Python 实现
class Agent:
def __init__(self, name):
self.name = name
self.state = {}
self.max_iterations = 10
def perceive(self, environment):
"""感知环境"""
observations = {
'time': environment.current_time,
'location': environment.agent_location,
'resources': environment.available_resources,
'threats': environment.detect_threats()
}
return observations
def decide(self, observations):
"""做出决策"""
if observations['threats']:
action = 'escape'
elif observations['resources'] < 20:
action = 'gather_resources'
else:
action = 'explore'
return action
def execute(self, action, environment):
"""执行行动"""
result = environment.perform_action(action)
return result
def feedback(self, result):
"""反馈和学习"""
self.state['last_action'] = result['action']
self.state['success'] = result['success']
if result['success']:
self.state['reward'] = result['reward']
def run(self, environment):
"""主循环"""
for iteration in range(self.max_iterations):
print(f"\n--- Iteration {iteration + 1} ---")
# 感知
observations = self.perceive(environment)
print(f"Observations: {observations}")
# 决策
action = self.decide(observations)
print(f"Decision: {action}")
# 执行
result = self.execute(action, environment)
print(f"Result: {result}")
# 反馈
self.feedback(result)
# 检查是否完成
if environment.is_goal_reached():
print(f"Goal reached!")
break
环境模拟
class Environment:
def __init__(self):
self.current_time = 0
self.agent_location = (0, 0)
self.available_resources = 100
self.threats = []
def perform_action(self, action):
self.current_time += 1
result = {'action': action, 'success': True, 'reward': 0}
if action == 'gather_resources':
self.available_resources += 30
result['reward'] = 1
elif action == 'explore':
self.agent_location = (self.agent_location[0] + 1,
self.agent_location[1] + 1)
result['reward'] = 0.5
elif action == 'escape':
self.threats = []
result['reward'] = 0.3
# 随机生成威胁
if self.current_time % 3 == 0:
self.threats.append('enemy')
return result
def detect_threats(self):
return len(self.threats)
def is_goal_reached(self):
return self.current_time >= 10
# 使用案例
env = Environment()
agent = Agent("MainAgent")
agent.run(env)
最佳实践
- 明确定义状态:确保状态能准确反映 Agent 的当前情况
- 灵活的循环控制:提供退出条件(目标达成、超时等)
- 性能监控:记录每个循环的指标便于调试
- 错误处理:在每个阶段添加异常处理
- 可观测性:记录日志方便追踪 Agent 行为
适用场景
- 实时游戏 AI
- 自主机器人控制
- 模拟系统
- 交互式系统
2. ReAct(推理+行动)
模式描述
Agent 在执行行动前先进行详细的推理,通过"思考→行动→观察"的循环来解决问题。这是大语言模型驱动的 Agent 的常用模式。
核心流程
┌──────────────────┐
│ 输入问题 │
└────────┬─────────┘
│
┌────────▼─────────┐
│ 思考/推理 │ LLM 分析问题
│ (Reasoning) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ 计划行动 │ 决定采取什么行动
│ (Planning) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ 执行行动 │ 调用工具/API
│ (Action) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ 观察结果 │ 获取行动结果
│ (Observation) │
└────────┬─────────┘
│
├─→ 目标达成? ─是─→ 输出最终答案
│
└─否─→ 回到思考阶段(反复迭代)
代码案例
Python 实现(使用 LLM API)
import json
from typing import Any, Dict, List
class ReactAgent:
def __init__(self, model_api_client, tools: Dict[str, Any]):
self.client = model_api_client
self.tools = tools # 可用的工具字典
self.max_steps = 10
self.conversation_history = []
def get_available_tools_description(self) -> str:
"""获取可用工具的描述"""
tools_desc = "Available tools:\n"
for tool_name, tool_info in self.tools.items():
tools_desc += f"- {tool_name}: {tool_info['description']}\n"
return tools_desc
def think(self, question: str, observations: List[str]) -> str:
"""推理阶段:LLM 分析问题和观察"""
tools_desc = self.get_available_tools_description()
system_prompt = f"""You are a reasoning agent. Your task is to:
1. Analyze the problem carefully
2. Consider what you've observed so far
3. Decide what action to take next
4. Use tools available to you
{tools_desc}
Respond in this format:
Thought: [Your analysis]
Action: [tool_name]
Action Input: [input_for_tool or "None"]
"""
# 构建对话历史
messages = [{"role": "user", "content": question}]
if observations:
messages.append({
"role": "user",
"content": f"Previous observations:\n" + "\n".join(observations)
})
# 调用 LLM
response = self.client.call_api(system_prompt, messages)
return response
def parse_action(self, response: str) -> tuple:
"""解析 LLM 的响应提取行动"""
lines = response.split('\n')
action = None
action_input = None
for line in lines:
if line.startswith('Action:'):
action = line.split(':', 1)[1].strip()
elif line.startswith('Action Input:'):
action_input = line.split(':', 1)[1].strip()
return action, action_input
def execute_action(self, action: str, action_input: str) -> str:
"""执行选定的行动"""
if action not in self.tools:
return f"Error: Tool '{action}' not found"
tool = self.tools[action]
try:
result = tool['function'](action_input)
return f"Observation: {result}"
except Exception as e:
return f"Error executing {action}: {str(e)}"
def run(self, question: str) -> str:
"""主循环"""
print(f"\n{'='*60}")
print(f"Question: {question}")
print(f"{'='*60}\n")
observations = []
step = 0
while step < self.max_steps:
step += 1
print(f"--- Step {step} ---")
# 推理
thinking = self.think(question, observations)
print(f"Thinking:\n{thinking}\n")
# 解析行动
action, action_input = self.parse_action(thinking)
if action is None or action == "Final Answer":
# Agent 认为已有最终答案
final_answer = thinking.split("Final Answer:")[-1].strip()
print(f"\n✓ Final Answer: {final_answer}")
return final_answer
# 执行行动
print(f"Action: {action}")
print(f"Action Input: {action_input}")
observation = self.execute_action(action, action_input)
print(f"{observation}\n")
observations.append(observation)
return f"Failed to find answer after {self.max_steps} steps"
# 工具定义
def search_wikipedia(query: str) -> str:
"""模拟维基百科搜索"""
# 实际应用中会调用真实的 API
knowledge_base = {
"Python": "Python is a high-level programming language",
"AI": "AI stands for Artificial Intelligence",
"Machine Learning": "ML is a subset of AI"
}
return knowledge_base.get(query, "No results found")
def calculate(expression: str) -> str:
"""计算器"""
try:
result = eval(expression)
return f"Result: {result}"
except:
return "Invalid expression"
def get_current_date() -> str:
"""获取当前日期"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d")
# 定义工具字典
tools = {
"search_wikipedia": {
"description": "Search for information on Wikipedia",
"function": search_wikipedia
},
"calculate": {
"description": "Perform mathematical calculations",
"function": calculate
},
"get_date": {
"description": "Get the current date",
"function": get_current_date
}
}
# 模拟 LLM 客户端
class MockLLMClient:
def call_api(self, system_prompt, messages):
# 这里应该调用实际的 LLM API
# 为演示目的,返回预定义的响应
return """Thought: I need to search for information about Python
Action: search_wikipedia
Action Input: Python"""
# 使用示例
# client = MockLLMClient()
# agent = ReactAgent(client, tools)
# result = agent.run("What is Python?")
真实 API 实现示例(使用 Anthropic Claude)
import anthropic
import json
class ClaudeReactAgent:
def __init__(self):
self.client = anthropic.Anthropic()
self.tools = [
{
"name": "search_wikipedia",
"description": "Search Wikipedia for information",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
}
},
"required": ["query"]
}
},
{
"name": "calculate",
"description": "Perform mathematical calculations",
"input_schema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression"
}
},
"required": ["expression"]
}
}
]
self.messages = []
def process_tool_call(self, tool_name: str, tool_input: dict) -> str:
"""处理工具调用"""
if tool_name == "search_wikipedia":
return f"Found information about {tool_input['query']}"
elif tool_name == "calculate":
try:
result = eval(tool_input['expression'])
return f"Calculation result: {result}"
except Exception as e:
return f"Error: {str(e)}"
return "Unknown tool"
def run(self, user_query: str, max_iterations: int = 10):
"""运行 ReAct Agent"""
self.messages = [{"role": "user", "content": user_query}]
for iteration in range(max_iterations):
print(f"\n--- Iteration {iteration + 1} ---")
# 调用 Claude
response = self.client.messages.create(
model="claude-opus-4-20250514",
max_tokens=1024,
tools=self.tools,
messages=self.messages
)
print(f"Stop Reason: {response.stop_reason}")
# 检查是否需要调用工具
if response.stop_reason == "tool_use":
# 添加助手响应
self.messages.append({
"role": "assistant",
"content": response.content
})
# 处理工具调用
tool_results = []
for block in response.content:
if block.type == "tool_use":
print(f"Tool: {block.name}")
print(f"Input: {block.input}")
result = self.process_tool_call(
block.name,
block.input
)
print(f"Result: {result}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result
})
# 添加工具结果
self.messages.append({
"role": "user",
"content": tool_results
})
else:
# 最终答案
for block in response.content:
if hasattr(block, 'text'):
print(f"\n✓ Final Answer:\n{block.text}")
return block.text
return "Max iterations reached"
# 使用
# agent = ClaudeReactAgent()
# agent.run("What is the capital of France?")
最佳实践
- 清晰的思维过程:让 Agent 显式地说出推理步骤
- 工具设计:
- 工具应该专一且明确定义
- 提供详细的使用说明
- 返回结构化的结果
- 迭代限制:设置最大步数以防止无限循环
- 观察记录:保存所有观察用于上下文
- 错误处理:优雅地处理工具失败
- 性能优化:缓存常见查询结果
适用场景
- 问答系统 (QA)
- 数据分析
- 研究辅助
- 复杂问题求解
3. Planner-Executor(计划-执行)
模式描述
将复杂任务分解为两个阶段:
- Planner:根据目标制定详细的分步计划
- Executor:按照计划逐步执行每个子任务
这避免了 Agent 逐步决策导致的高延迟和错误积累。
架构图
┌─────────────────────────────────────┐
│ 用户目标 │
└────────────────┬────────────────────┘
│
┌──────▼──────┐
│ Planner │ 分解任务
│ 规划器 │ 制定计划
└──────┬──────┘
│
┌───────▼────────┐
│ Detailed Plan │
│ ┌─────────┐ │
│ │ Step 1 │ │
│ ├─────────┤ │
│ │ Step 2 │ │
│ ├─────────┤ │
│ │ Step N │ │
│ └─────────┘ │
└───────┬────────┘
│
┌──────▼──────────┐
│ Executor │ 执行计划
│ 执行器 │
└─────────────────┘
代码案例
from typing import List, Dict
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class Task:
def __init__(self, task_id: str, description: str,
dependencies: List[str] = None, estimated_time: int = 0):
self.task_id = task_id
self.description = description
self.dependencies = dependencies or []
self.estimated_time = estimated_time
self.status = TaskStatus.PENDING
self.result = None
def __repr__(self):
return f"Task({self.task_id}, {self.status.value})"
class Plan:
def __init__(self, goal: str, tasks: List[Task]):
self.goal = goal
self.tasks = tasks
self.creation_time = None
def get_executable_tasks(self) -> List[Task]:
"""获取当前可执行的任务(依赖已满足)"""
completed_ids = {t.task_id for t in self.tasks
if t.status == TaskStatus.COMPLETED}
executable = []
for task in self.tasks:
if task.status == TaskStatus.PENDING:
# 检查所有依赖是否已完成
if all(dep in completed_ids for dep in task.dependencies):
executable.append(task)
return executable
def __str__(self):
result = f"Plan for: {self.goal}\n"
result += "=" * 60 + "\n"
for i, task in enumerate(self.tasks, 1):
result += f"{i}. [{task.status.value:12}] {task.task_id}: {task.description}\n"
if task.dependencies:
result += f" Dependencies: {', '.join(task.dependencies)}\n"
return result
class Planner:
"""规划器:制定计划"""
def plan(self, goal: str, context: Dict) -> Plan:
"""根据目标和上下文制定计划"""
print(f"\n[Planner] Planning for goal: {goal}")
print(f"[Planner] Context: {context}\n")
# 这里应该使用 LLM 或规则引擎分解任务
# 示例:构建一个网站
if "website" in goal.lower():
return self._plan_website(context)
elif "report" in goal.lower():
return self._plan_report(context)
else:
return self._plan_generic(goal, context)
def _plan_website(self, context: Dict) -> Plan:
"""网站构建计划"""
tasks = [
Task("design", "设计网站架构和UI", [], 480),
Task("frontend", "开发前端代码", ["design"], 720),
Task("backend", "开发后端API", ["design"], 840),
Task("database", "设计和实现数据库", [], 360),
Task("integrate", "前后端集成", ["frontend", "backend", "database"], 240),
Task("test", "测试整个系统", ["integrate"], 480),
Task("deploy", "部署到生产环境", ["test"], 120),
]
return Plan("Build a website", tasks)
def _plan_report(self, context: Dict) -> Plan:
"""报告生成计划"""
tasks = [
Task("collect_data", "收集数据", [], 120),
Task("analyze", "分析数据", ["collect_data"], 180),
Task("write_draft", "撰写初稿", ["analyze"], 240),
Task("review", "审阅和修改", ["write_draft"], 120),
Task("format", "格式化和排版", ["review"], 60),
]
return Plan("Generate a report", tasks)
def _plan_generic(self, goal: str, context: Dict) -> Plan:
"""通用计划"""
tasks = [
Task("understand", "理解需求", [], 60),
Task("plan", "制定策略", ["understand"], 120),
Task("execute", "执行", ["plan"], 300),
Task("review", "审查结果", ["execute"], 120),
]
return Plan(goal, tasks)
class Executor:
"""执行器:执行计划"""
def __init__(self, tools: Dict[str, callable]):
self.tools = tools
def execute(self, plan: Plan) -> bool:
"""执行计划"""
print(f"\n[Executor] Executing plan: {plan.goal}\n")
print(plan)
completed_count = 0
failed_count = 0
while True:
# 获取可执行的任务
executable_tasks = plan.get_executable_tasks()
if not executable_tasks:
# 检查是否所有任务都完成
all_completed = all(
t.status == TaskStatus.COMPLETED
for t in plan.tasks
)
if all_completed:
print(f"\n✓ Plan completed successfully!")
print(f" Completed: {completed_count}, Failed: {failed_count}")
return True
else:
# 有失败的任务
failed_tasks = [t for t in plan.tasks
if t.status == TaskStatus.FAILED]
print(f"\n✗ Plan failed!")
print(f" Failed tasks: {[t.task_id for t in failed_tasks]}")
return False
# 执行第一个可执行的任务
task = executable_tasks[0]
success = self._execute_task(task, plan)
if success:
completed_count += 1
else:
failed_count += 1
return failed_count == 0
def _execute_task(self, task: Task, plan: Plan) -> bool:
"""执行单个任务"""
print(f"\n[Task {task.task_id}] Executing: {task.description}")
print(f" Estimated time: {task.estimated_time} minutes")
task.status = TaskStatus.IN_PROGRESS
try:
# 获取对应的执行工具
if task.task_id in self.tools:
tool = self.tools[task.task_id]
result = tool()
task.result = result
task.status = TaskStatus.COMPLETED
print(f" ✓ Completed: {result}")
return True
else:
# 模拟执行
import random
success = random.random() > 0.2 # 80% 成功率
if success:
task.status = TaskStatus.COMPLETED
print(f" ✓ Completed")
return True
else:
task.status = TaskStatus.FAILED
print(f" ✗ Failed")
return False
except Exception as e:
task.status = TaskStatus.FAILED
print(f" ✗ Error: {str(e)}")
return False
class PlannerExecutor:
"""协调规划器和执行器"""
def __init__(self, tools: Dict[str, callable] = None):
self.planner = Planner()
self.executor = Executor(tools or {})
def run(self, goal: str, context: Dict = None) -> bool:
"""运行完整的计划-执行流程"""
context = context or {}
# 步骤 1: 规划
plan = self.planner.plan(goal, context)
# 步骤 2: 执行
success = self.executor.execute(plan)
return success
# 使用示例
if __name__ == "__main__":
# 定义工具
tools = {
"design": lambda: "UI designs completed",
"frontend": lambda: "Frontend code ready",
"backend": lambda: "Backend API ready",
"database": lambda: "Database schema created",
"integrate": lambda: "System integration complete",
"test": lambda: "All tests passed",
"deploy": lambda: "System deployed to production"
}
# 创建规划-执行器
pe = PlannerExecutor(tools)
# 执行计划
success = pe.run("Build a website", {"team_size": 5, "budget": 50000})
最佳实践
- 任务分解:
- 任务粒度要适当
- 避免过度分解导致执行开销大
- 避免任务过大导致难以执行
- 依赖管理:
- 清晰定义任务依赖关系
- 使用有向无环图 (DAG) 表示任务依赖
- 检查循环依赖
- 错误恢复:
- 定义失败任务的重试策略
- 支持断点恢复
- 记录执行日志
- 动态调整:
- 允许在执行过程中调整计划
- 支持暂停和恢复
- 处理运行时障碍
- 性能优化:
- 并行执行独立任务
- 优先级队列处理
- 缓存中间结果
适用场景
- 项目管理
- 数据处理管道
- 工作流自动化
- 复杂业务流程
4. Hierarchical Agent(分层代理)
模式描述
通过构建多层级的 Agent 架构,高层 Agent 负责战略决策,低层 Agent 执行具体操作。这提供了更好的可扩展性和可维护性。
架构图
┌────────────────────────────────────────┐
│ Level 1: Strategic Agent │ 战略决策
│ (目标设定、资源分配) │
└─────────────┬──────────────────────────┘
│
┌─────────┼─────────┐
│ │ │
┌───▼───┐ ┌──▼──┐ ┌───▼───┐
│ Agent │ │Agent │ │ Agent │ 具体执行
│ 2.1 │ │ 2.2 │ │ 2.3 │
└───┬───┘ └──┬──┘ └───┬───┘
│ │ │
┌─▼─┐ ┌─▼─┐ ┌─▼─┐
│3.1│ │3.2│ │3.3│ 最基层操作
└───┘ └───┘ └───┘
代码案例
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from enum import Enum
class AgentLevel(Enum):
STRATEGIC = 1
TACTICAL = 2
OPERATIONAL = 3
class HierarchicalAgent(ABC):
"""分层代理的基类"""
def __init__(self, agent_id: str, level: AgentLevel):
self.agent_id = agent_id
self.level = level
self.subordinates: List['HierarchicalAgent'] = []
self.superior: 'HierarchicalAgent' = None
self.state = {}
@abstractmethod
def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行任务"""
pass
def add_subordinate(self, agent: 'HierarchicalAgent'):
"""添加下级代理"""
agent.superior = self
self.subordinates.append(agent)
def delegate(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""委派任务给下级代理"""
if not self.subordinates:
print(f"[{self.agent_id}] No subordinates to delegate to")
return {"success": False, "error": "No subordinates"}
# 选择合适的下级代理
selected_agent = self._select_subordinate(task)
if selected_agent is None:
return {"success": False, "error": "No suitable subordinate"}
print(f"[{self.agent_id}] Delegating task to [{selected_agent.agent_id}]")
return selected_agent.execute(task)
def _select_subordinate(self, task: Dict[str, Any]) -> 'HierarchicalAgent':
"""选择合适的下级代理"""
# 简单的选择策略:选择第一个合适的
for agent in self.subordinates:
if agent.can_execute(task):
return agent
return None
def can_execute(self, task: Dict[str, Any]) -> bool:
"""检查是否能执行任务"""
return True
def report_to_superior(self, result: Dict[str, Any]) -> None:
"""向上级报告"""
if self.superior:
print(f"[{self.agent_id}] Reporting to [{self.superior.agent_id}]: {result}")
class StrategicAgent(HierarchicalAgent):
"""战略层代理"""
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentLevel.STRATEGIC)
def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""战略层执行:分解目标、分配资源"""
print(f"\n[{self.agent_id}] Strategic Planning")
print(f" Task: {task.get('objective', 'Unknown')}")
# 分解目标
subtasks = self._decompose_objective(task)
print(f" Decomposed into {len(subtasks)} subtasks")
# 分配给下级代理
results = []
for subtask in subtasks:
result = self.delegate(subtask)
results.append(result)
# 汇总结果
return {
"level": "strategic",
"original_task": task,
"subtasks_results": results,
"success": all(r.get("success", False) for r in results)
}
def _decompose_objective(self, task: Dict[str, Any]) -> List[Dict]:
"""分解目标为子任务"""
objective = task.get("objective", "")
if "defend" in objective:
return [
{"type": "defense", "task": "patrol_borders", "priority": "high"},
{"type": "defense", "task": "fortify_positions", "priority": "high"},
{"type": "defense", "task": "monitor_threats", "priority": "medium"},
]
elif "expand" in objective:
return [
{"type": "expansion", "task": "explore_territory", "priority": "high"},
{"type": "expansion", "task": "secure_resources", "priority": "high"},
{"type": "expansion", "task": "establish_outposts", "priority": "medium"},
]
else:
return [
{"type": "general", "task": "maintain_order", "priority": "medium"},
{"type": "general", "task": "gather_intelligence", "priority": "medium"},
]
class TacticalAgent(HierarchicalAgent):
"""战术层代理"""
def __init__(self, agent_id: str, specialization: str = "general"):
super().__init__(agent_id, AgentLevel.TACTICAL)
self.specialization = specialization
def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""战术层执行:制定具体方案"""
task_type = task.get("type", "")
print(f"\n[{self.agent_id}] ({self.specialization}) Tactical Planning")
print(f" Task: {task.get('task', 'Unknown')}")
print(f" Priority: {task.get('priority', 'Unknown')}")
# 根据任务类型制定计划
plan = self._create_plan(task)
print(f" Plan: {plan}")
# 委派给下级代理执行
if self.subordinates:
result = self.delegate(task)
else:
# 如果没有下级,直接执行
result = {"success": True, "action": plan}
return {
"level": "tactical",
"task": task,
"plan": plan,
"execution_result": result
}
def _create_plan(self, task: Dict) -> str:
"""根据任务创建具体计划"""
task_name = task.get("task", "")
specialization = self.specialization
plans = {
"patrol_borders": f"{specialization}: Establish patrol routes",
"fortify_positions": f"{specialization}: Reinforce defenses",
"explore_territory": f"{specialization}: Scout new areas",
"secure_resources": f"{specialization}: Allocate gathering teams",
"defend": f"{specialization}: Deploy defensive positions",
}
return plans.get(task_name, f"{specialization}: Execute task")
def can_execute(self, task: Dict) -> bool:
"""检查能否执行"""
# 根据专项进行检查
if self.specialization == "defense":
return "defense" in task.get("type", "") or \
"patrol" in task.get("task", "") or \
"fortify" in task.get("task", "")
elif self.specialization == "logistics":
return "resource" in task.get("task", "") or \
"supply" in task.get("task", "")
return True
class OperationalAgent(HierarchicalAgent):
"""操作层代理"""
def __init__(self, agent_id: str, capability: str = "general"):
super().__init__(agent_id, AgentLevel.OPERATIONAL)
self.capability = capability
def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""操作层执行:具体的单位操作"""
print(f"\n[{self.agent_id}] ({self.capability}) Operational Execution")
print(f" Executing: {task.get('task', 'Unknown')}")
# 执行具体操作
result = self._perform_action(task)
return {
"level": "operational",
"unit": self.agent_id,
"capability": self.capability,
"task": task,
"result": result,
"success": True
}
def _perform_action(self, task: Dict) -> str:
"""执行具体动作"""
action = task.get("task", "unknown")
return f"Completed action: {action} using {self.capability}"
class HierarchicalCommandStructure:
"""分层指挥结构管理"""
def __init__(self):
self.root_agent = None
self.all_agents = {}
def build_structure(self):
"""构建示例的分层结构"""
# 创建战略层
self.root_agent = StrategicAgent("Commander")
self.all_agents["Commander"] = self.root_agent
# 创建战术层
tactical_1 = TacticalAgent("DefenseCommandant", "defense")
tactical_2 = TacticalAgent("LogisticsCommandant", "logistics")
self.root_agent.add_subordinate(tactical_1)
self.root_agent.add_subordinate(tactical_2)
self.all_agents["DefenseCommandant"] = tactical_1
self.all_agents["LogisticsCommandant"] = tactical_2
# 创建操作层
op_agents_defense = [
OperationalAgent("PatrolUnit1", "patrol"),
OperationalAgent("FortificationTeam", "fortification"),
OperationalAgent("IntelligenceUnit", "reconnaissance"),
]
for agent in op_agents_defense:
tactical_1.add_subordinate(agent)
self.all_agents[agent.agent_id] = agent
op_agents_logistics = [
OperationalAgent("SupplyTeam1", "supply"),
OperationalAgent("TransportUnit", "transport"),
]
for agent in op_agents_logistics:
tactical_2.add_subordinate(agent)
self.all_agents[agent.agent_id] = agent
def execute_mission(self, objective: str):
"""执行任务"""
if not self.root_agent:
self.build_structure()
print("=" * 70)
print(f"Mission: {objective}")
print("=" * 70)
task = {"objective": objective}
result = self.root_agent.execute(task)
print("\n" + "=" * 70)
print("Mission Summary")
print("=" * 70)
print(f"Overall Success: {result.get('success')}")
print(f"Number of Subtasks: {len(result.get('subtasks_results', []))}")
# 使用示例
if __name__ == "__main__":
structure = HierarchicalCommandStructure()
structure.execute_mission("defend the kingdom")
最佳实践
- 清晰的层级划分:
- 明确每一层的职责
- 避免层级过多(通常3-5层)
- 定义清晰的通信接口
- 代理通信:
- 定义标准的消息格式
- 支持双向通信(上报、下达)
- 实现消息队列避免阻塞
- 决策分权:
- 高层做战略决策
- 底层有自主权
- 中层协调和执行
- 扩展性:
- 支持动态增删代理
- 支持重组层级
- 支持代理的热更新
- 监控和日志:
- 记录每层的决策
- 追踪任务流转
- 性能指标收集
适用场景
- 组织管理系统
- 游戏 AI(怪物群体)
- 军事指挥系统
- 企业工作流
5. Multi-Agent Collaboration(多代理协作)
模式描述
多个 Agent 通过通信和协调来完成共同目标。支持三种主要的协作模式:
- Master-Slave:主代理指挥从代理
- Peer-to-Peer:平等的代理相互通信
- Publish-Subscribe:通过事件进行通信
架构对比
Master-Slave Model:
┌──────────────┐
│ Master │
│ Agent │
└──────┬───────┘
│
┌──┴──┬────┐
│ │ │
┌─▼──┐┌──▼─┐┌──▼─┐
│Slave││Slave││Slave│
│ A ││ B ││ C │
└────┘└────┘└────┘
Peer-to-Peer Model:
┌─────────────────┐
│ Agent A │
└────────┬────────┘
│
┌──────┼──────┐
│ │ │
┌──▼──┐──┘ ┌───▼──┐
│Agent│ │Agent │
│ B │─────┤ C │
└──┬──┘ └───┬──┘
│ │
└────────────┘
Publish-Subscribe Model:
┌─────────┐
│ Agent A │
└────┬────┘
│ publish
│
┌────▼────────────────┐
│ Event Bus / Broker │
└────┬─────────────────┘
│
┌────┴─────────┬──────────┐
│ subscribe │subscribe │
│ │ │
┌──▼──┐ ┌────▼───┐ ┌──▼──┐
│Agent│ │ Agent │ │Agent │
│ B │ │ C │ │ D │
└─────┘ └────────┘ └──────┘
代码案例
1. Master-Slave 模式
from typing import List, Dict, Any
from enum import Enum
from abc import ABC, abstractmethod
class TaskType(Enum):
IMAGE_PROCESSING = "image_processing"
DATA_ANALYSIS = "data_analysis"
TEXT_PROCESSING = "text_processing"
class SlaveAgent:
"""从代理"""
def __init__(self, agent_id: str,
capabilities: List[TaskType]):
self.agent_id = agent_id
self.capabilities = capabilities
self.is_busy = False
self.current_task = None
def can_handle(self, task_type: TaskType) -> bool:
"""检查是否能处理任务"""
return task_type in self.capabilities
def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行任务"""
self.is_busy = True
self.current_task = task
print(f"[{self.agent_id}] Executing task: {task['id']}")
# 模拟任务执行
result = {
"task_id": task['id'],
"agent_id": self.agent_id,
"status": "completed",
"result": f"Processed {task.get('data', 'unknown')}"
}
self.is_busy = False
self.current_task = None
return result
class MasterAgent:
"""主代理"""
def __init__(self):
self.slaves: List[SlaveAgent] = []
self.task_queue: List[Dict[str, Any]] = []
self.results: List[Dict[str, Any]] = []
def register_slave(self, slave: SlaveAgent):
"""注册从代理"""
self.slaves.append(slave)
print(f"[Master] Registered slave: {slave.agent_id}")
def submit_task(self, task: Dict[str, Any]):
"""提交任务"""
self.task_queue.append(task)
print(f"[Master] Task queued: {task['id']}")
def assign_tasks(self):
"""分配任务给从代理"""
while self.task_queue:
task = self.task_queue.pop(0)
task_type = TaskType[task['type'].upper()]
# 找一个能处理该任务的闲置从代理
slave = self._find_available_slave(task_type)
if slave is None:
# 没有可用的从代理,重新加入队列
self.task_queue.insert(0, task)
break
# 分配任务
result = slave.execute_task(task)
self.results.append(result)
print(f"[Master] Task {task['id']} assigned to {slave.agent_id}")
def _find_available_slave(self, task_type: TaskType) -> SlaveAgent:
"""找一个可用的从代理"""
for slave in self.slaves:
if not slave.is_busy and slave.can_handle(task_type):
return slave
return None
def get_results(self) -> List[Dict[str, Any]]:
"""获取所有结果"""
return self.results
# 使用示例
print("=" * 70)
print("Master-Slave Model Example")
print("=" * 70 + "\n")
master = MasterAgent()
# 创建并注册从代理
slave1 = SlaveAgent("ImageProcessor1", [TaskType.IMAGE_PROCESSING])
slave2 = SlaveAgent("DataAnalyzer1", [TaskType.DATA_ANALYSIS])
slave3 = SlaveAgent("TextProcessor1", [TaskType.TEXT_PROCESSING])
master.register_slave(slave1)
master.register_slave(slave2)
master.register_slave(slave3)
# 提交任务
tasks = [
{"id": "task_001", "type": "image_processing", "data": "image.jpg"},
{"id": "task_002", "type": "data_analysis", "data": "data.csv"},
{"id": "task_003", "type": "text_processing", "data": "document.txt"},
{"id": "task_004", "type": "image_processing", "data": "photo.png"},
]
print("\nSubmitting tasks...")
for task in tasks:
master.submit_task(task)
print("\nAssigning tasks...\n")
master.assign_tasks()
print("\nResults:")
for result in master.get_results():
print(f" {result['task_id']}: {result['status']} (by {result['agent_id']})")
2. Peer-to-Peer 模式
from typing import Callable, List, Dict, Any
import json
class PeerAgent:
"""对等代理"""
def __init__(self, agent_id: str, role: str):
self.agent_id = agent_id
self.role = role
self.peers: List['PeerAgent'] = []
self.message_queue = []
self.knowledge_base = {}
def connect_to_peer(self, peer: 'PeerAgent'):
"""连接到另一个对等体"""
if peer not in self.peers:
self.peers.append(peer)
peer.peers.append(self)
print(f"[{self.agent_id}] Connected to {peer.agent_id}")
def send_message(self, recipient_id: str, message: Dict[str, Any]):
"""发送消息给另一个代理"""
for peer in self.peers:
if peer.agent_id == recipient_id:
peer.receive_message(self.agent_id, message)
return
print(f"[{self.agent_id}] Error: Peer {recipient_id} not found")
def broadcast_message(self, message: Dict[str, Any]):
"""广播消息给所有连接的对等体"""
for peer in self.peers:
peer.receive_message(self.agent_id, message)
def receive_message(self, sender_id: str, message: Dict[str, Any]):
"""接收消息"""
self.message_queue.append({
"from": sender_id,
"message": message,
"timestamp": len(self.message_queue)
})
print(f"[{self.agent_id}] Received message from {sender_id}: "
f"{message.get('content', 'unknown')}")
def process_messages(self):
"""处理消息队列"""
while self.message_queue:
msg_item = self.message_queue.pop(0)
sender_id = msg_item['from']
message = msg_item['message']
self._handle_message(sender_id, message)
def _handle_message(self, sender_id: str, message: Dict[str, Any]):
"""处理消息"""
msg_type = message.get('type', 'unknown')
if msg_type == 'query':
response = self._answer_query(message.get('query', ''))
self.send_message(sender_id, {
"type": "response",
"query": message.get('query'),
"answer": response
})
elif msg_type == 'share_knowledge':
self._update_knowledge(message.get('knowledge', {}))
print(f"[{self.agent_id}] Knowledge updated from {sender_id}")
def _answer_query(self, query: str) -> str:
"""回答查询"""
if query in self.knowledge_base:
return self.knowledge_base[query]
return f"Unknown query: {query}"
def _update_knowledge(self, knowledge: Dict):
"""更新知识库"""
self.knowledge_base.update(knowledge)
def add_knowledge(self, key: str, value: Any):
"""添加知识"""
self.knowledge_base[key] = value
# 使用示例
print("\n" + "=" * 70)
print("Peer-to-Peer Model Example")
print("=" * 70 + "\n")
# 创建对等代理
agent_alice = PeerAgent("Alice", "researcher")
agent_bob = PeerAgent("Bob", "analyst")
agent_charlie = PeerAgent("Charlie", "engineer")
# 建立连接
agent_alice.connect_to_peer(agent_bob)
agent_bob.connect_to_peer(agent_charlie)
# 添加知识
agent_alice.add_knowledge("machine_learning", "ML is a subset of AI")
agent_bob.add_knowledge("data_science", "DS involves data analysis")
agent_charlie.add_knowledge("software_engineering", "SE is about building systems")
# 交互
print("Alice querying Bob about data science:")
agent_alice.send_message("Bob", {
"type": "query",
"query": "data_science"
})
print("\nBob processing messages:")
agent_bob.process_messages()
print("\nAlice sharing knowledge with all peers:")
agent_alice.broadcast_message({
"type": "share_knowledge",
"knowledge": {"research": "Latest findings on AI"}
})
print("\nBob and Charlie processing broadcast:")
agent_bob.process_messages()
agent_charlie.process_messages()
3. Publish-Subscribe 模式
from typing import Callable, List, Dict, Any, Set
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Event:
"""事件类"""
event_type: str
source: str
data: Dict[str, Any]
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().timestamp()
class EventBus:
"""事件总线"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.event_history: List[Event] = []
def subscribe(self, event_type: str, callback: Callable):
"""订阅事件"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(callback)
print(f"Subscribed to {event_type}")
def publish(self, event: Event):
"""发布事件"""
print(f"\n[EventBus] Publishing event: {event.event_type} from {event.source}")
print(f" Data: {event.data}")
self.event_history.append(event)
# 通知所有订阅者
if event.event_type in self.subscribers:
for callback in self.subscribers[event.event_type]:
callback(event)
else:
print(f" No subscribers for {event.event_type}")
def get_history(self, event_type: str = None) -> List[Event]:
"""获取事件历史"""
if event_type is None:
return self.event_history
return [e for e in self.event_history if e.event_type == event_type]
class PubSubAgent:
"""发布-订阅代理"""
def __init__(self, agent_id: str, event_bus: EventBus):
self.agent_id = agent_id
self.event_bus = event_bus
self.state = {}
def publish_event(self, event_type: str, data: Dict[str, Any]):
"""发布事件"""
event = Event(
event_type=event_type,
source=self.agent_id,
data=data
)
self.event_bus.publish(event)
def subscribe_to_event(self, event_type: str):
"""订阅事件"""
self.event_bus.subscribe(event_type, self._on_event)
def _on_event(self, event: Event):
"""处理事件"""
print(f" → [{self.agent_id}] Received event: {event.event_type}")
self._process_event(event)
def _process_event(self, event: Event):
"""处理事件的业务逻辑"""
pass
class SensorAgent(PubSubAgent):
"""传感器代理"""
def __init__(self, agent_id: str, event_bus: EventBus):
super().__init__(agent_id, event_bus)
def detect_event(self, event_type: str, data: Dict):
"""检测事件"""
print(f"[{self.agent_id}] Detecting: {event_type}")
self.publish_event(event_type, data)
def _process_event(self, event: Event):
"""处理其他代理发送的事件"""
if event.source != self.agent_id: # 不处理自己发送的事件
print(f" → [{self.agent_id}] Updating sensor based on {event.event_type}")
class AnalysisAgent(PubSubAgent):
"""分析代理"""
def __init__(self, agent_id: str, event_bus: EventBus):
super().__init__(agent_id, event_bus)
def _process_event(self, event: Event):
"""处理事件"""
print(f" → [{self.agent_id}] Analyzing: {event.data}")
# 发布分析结果
analysis_result = {
"original_event": event.event_type,
"source": event.source,
"analysis": f"Analyzed data from {event.source}",
"severity": "high" if event.data.get("value", 0) > 50 else "low"
}
self.publish_event("analysis_result", analysis_result)
class AlertAgent(PubSubAgent):
"""告警代理"""
def __init__(self, agent_id: str, event_bus: EventBus):
super().__init__(agent_id, event_bus)
def _process_event(self, event: Event):
"""处理事件"""
if event.event_type == "analysis_result":
severity = event.data.get("severity", "unknown")
if severity == "high":
print(f" → [{self.agent_id}] ⚠️ ALERT! High severity detected: "
f"{event.data.get('analysis')}")
else:
print(f" → [{self.agent_id}] ℹ️ Info: {event.data.get('analysis')}")
# 使用示例
print("\n" + "=" * 70)
print("Publish-Subscribe Model Example")
print("=" * 70)
# 创建事件总线
event_bus = EventBus()
# 创建代理
sensor = SensorAgent("TemperatureSensor", event_bus)
analyzer = AnalysisAgent("DataAnalyzer", event_bus)
alerter = AlertAgent("AlertSystem", event_bus)
# 订阅事件
sensor.subscribe_to_event("temperature_reading")
analyzer.subscribe_to_event("temperature_reading")
alerter.subscribe_to_event("analysis_result")
# 模拟传感器检测到温度升高
print("\n1. Sensor detects high temperature:")
sensor.detect_event("temperature_reading", {"value": 85, "unit": "celsius"})
print("\n2. Sensor detects normal temperature:")
sensor.detect_event("temperature_reading", {"value": 25, "unit": "celsius"})
最佳实践
- Master-Slave 模式:
- 适合有中央控制的场景
- 主代理应该是可靠且性能好的
- 支持故障转移和负载均衡
- 实现超时和重试机制
- Peer-to-Peer 模式:
- 避免循环通信
- 实现消息去重机制
- 定义清晰的通信协议
- 处理网络分区问题
- Publish-Subscribe 模式:
- 使用有序队列保证消息顺序
- 支持消息持久化
- 实现消息过滤和路由
- 处理事件溢出
- 通用最佳实践:
- 定义标准的消息格式
- 实现消息序列化/反序列化
- 添加日志和监控
- 处理网络延迟和故障
- 实现优雅关闭
适用场景
- 分布式系统
- 微服务架构
- 实时协作系统
- IoT 系统
- 游戏多人交互
6. Tool Use / Function Calling
模式描述
Agent 通过调用外部工具或函数来扩展自身的能力。这是现代 LLM-based Agent 的关键特性。
架构图
┌─────────────────┐
│ Agent │
│ (LLM-based) │
└────────┬────────┘
│
┌────▼─────────────────────┐
│ Analyze Task │
│ Decide which tools to use│
└────┬──────────────────────┘
│
┌────▼──────────┐
│ Tool Selection│
└────┬──────────┘
│
┌────┴────────────────────┐
│ │
┌───▼────┐ ┌──────┐ ┌──▼──┐
│ Tool 1 │ │Tool 2│ │Tool3│
│ API │ │ Web │ │ DB │
└────────┘ └──────┘ └─────┘
代码案例
import json
from typing import Any, Dict, List, Callable
from dataclasses import dataclass
@dataclass
class ToolDefinition:
"""工具定义"""
name: str
description: str
parameters: Dict[str, Any]
function: Callable
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self.tools: Dict[str, ToolDefinition] = {}
def register_tool(self, tool_def: ToolDefinition):
"""注册工具"""
self.tools[tool_def.name] = tool_def
print(f"Tool registered: {tool_def.name}")
def get_tool(self, name: str) -> ToolDefinition:
"""获取工具"""
return self.tools.get(name)
def list_tools(self) -> List[str]:
"""列出所有工具"""
return list(self.tools.keys())
def get_tools_schema(self) -> List[Dict[str, Any]]:
"""获取工具的 JSON Schema"""
schema = []
for tool in self.tools.values():
schema.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
})
return schema
class ToolUsingAgent:
"""使用工具的代理"""
def __init__(self, tool_registry: ToolRegistry):
self.tool_registry = tool_registry
self.execution_history = []
def select_and_execute_tool(self,
tool_name: str,
tool_args: Dict[str, Any]) -> Any:
"""选择并执行工具"""
tool = self.tool_registry.get_tool(tool_name)
if tool is None:
return {"error": f"Tool '{tool_name}' not found"}
print(f"\n[Agent] Executing tool: {tool_name}")
print(f" Arguments: {tool_args}")
try:
result = tool.function(**tool_args)
# 记录执行历史
self.execution_history.append({
"tool": tool_name,
"args": tool_args,
"result": result,
"status": "success"
})
print(f" Result: {result}")
return result
except Exception as e:
error_msg = str(e)
self.execution_history.append({
"tool": tool_name,
"args": tool_args,
"error": error_msg,
"status": "failed"
})
print(f" Error: {error_msg}")
return {"error": error_msg}
def get_execution_history(self) -> List[Dict]:
"""获取执行历史"""
return self.execution_history
# 定义具体的工具函数
def search_web(query: str) -> str:
"""Web 搜索"""
# 模拟 Web 搜索
results = {
"python": "Python is a high-level programming language",
"ai": "AI stands for Artificial Intelligence",
"machine learning": "ML is a subset of AI focused on learning from data"
}
return results.get(query.lower(), "No results found")
def calculate(expression: str) -> float:
"""数学计算"""
try:
result = eval(expression)
return float(result)
except Exception as e:
raise ValueError(f"Invalid expression: {expression}")
def get_current_weather(city: str) -> Dict[str, Any]:
"""获取天气"""
# 模拟天气 API
weather_data = {
"seattle": {"temp": 15, "condition": "rainy"},
"london": {"temp": 10, "condition": "cloudy"},
"beijing": {"temp": 25, "condition": "sunny"}
}
return weather_data.get(city.lower(), {"error": "City not found"})
def query_database(sql: str) -> List[Dict]:
"""查询数据库"""
# 模拟数据库查询
if "users" in sql.lower():
return [
{"id": 1, "name": "Alice", "email": "[email protected]"},
{"id": 2, "name": "Bob", "email": "[email protected]"}
]
return []
# 创建工具注册表
registry = ToolRegistry()
# 注册工具
registry.register_tool(ToolDefinition(
name="search_web",
description="Search the web for information",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
}
},
"required": ["query"]
},
function=search_web
))
registry.register_tool(ToolDefinition(
name="calculate",
description="Perform mathematical calculations",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression (e.g., '2+2' or 'sqrt(16)')"
}
},
"required": ["expression"]
},
function=calculate
))
registry.register_tool(ToolDefinition(
name="get_weather",
description="Get current weather for a city",
parameters={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name"
}
},
"required": ["city"]
},
function=get_current_weather
))
registry.register_tool(ToolDefinition(
name="query_db",
description="Query the database",
parameters={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query"
}
},
"required": ["sql"]
},
function=query_database
))
# 使用示例
print("=" * 70)
print("Tool Use / Function Calling Example")
print("=" * 70)
agent = ToolUsingAgent(registry)
print("\nAvailable tools:")
for tool_name in agent.tool_registry.list_tools():
print(f" - {tool_name}")
print("\n" + "-" * 70)
print("Tool Invocations:")
print("-" * 70)
# 调用不同的工具
agent.select_and_execute_tool("search_web", {"query": "Python"})
agent.select_and_execute_tool("calculate", {"expression": "2**10"})
agent.select_and_execute_tool("get_weather", {"city": "Seattle"})
agent.select_and_execute_tool("query_db", {"sql": "SELECT * FROM users"})
print("\n" + "-" * 70)
print("Execution History:")
print("-" * 70)
for i, entry in enumerate(agent.get_execution_history(), 1):
print(f"{i}. Tool: {entry['tool']}")
print(f" Status: {entry['status']}")
if entry['status'] == 'success':
print(f" Result: {entry['result']}")
else:
print(f" Error: {entry['error']}")
最佳实践
- 工具设计:
- 功能单一明确
- 参数定义清晰
- 返回结构化数据
- 提供明确的错误信息
- 工具管理:
- 维护工具注册表
- 支持工具的热加载
- 版本管理
- 文档完整
- 调用管理:
- 实现重试机制
- 设置超时限制
- 记录调用历史
- 性能监控
- 安全性:
- 验证输入参数
- 限制工具权限
- 审计敏感操作
- 防止注入攻击
适用场景
- AI 助手
- 自动化系统
- 数据处理管道
- 集成系统
7. Memory Patterns(记忆模式)
模式描述
Agent 管理不同类型的记忆来保存和利用历史信息:
- 短期记忆:当前任务相关
- 长期记忆:历史经验和知识
- 情节记忆:重要事件记录
记忆层级
┌──────────────────────────────────┐
│ Working Memory (短期工作记忆) │
│ - 当前对话内容 │
│ - 当前任务信息 │
│ - 临时变量 │
│ (容量小,衰减快) │
└──────────────────────────────────┘
↓ 提炼
┌──────────────────────────────────┐
│ Episodic Memory (情节记忆) │
│ - 重要事件 │
│ - 成功/失败案例 │
│ - 用户交互历史 │
│ (中等容量,中等衰减) │
└──────────────────────────────────┘
↓ 归纳
┌──────────────────────────────────┐
│ Semantic Memory (语义记忆) │
│ - 事实和概念 │
│ - 规则和约束 │
│ - 领域知识 │
│ (大容量,不衰减) │
└──────────────────────────────────┘
代码案例
from typing import Any, Dict, List
from datetime import datetime
from collections import deque
class WorkingMemory:
"""短期工作记忆"""
def __init__(self, capacity: int = 10):
self.capacity = capacity
self.buffer = deque(maxlen=capacity)
self.variables = {}
def add(self, item: Dict[str, Any]):
"""添加项目"""
item['timestamp'] = datetime.now().isoformat()
self.buffer.append(item)
def get_recent(self, n: int = 5) -> List[Dict]:
"""获取最近的 n 项"""
return list(self.buffer)[-n:]
def set_variable(self, key: str, value: Any):
"""设置变量"""
self.variables[key] = value
def get_variable(self, key: str) -> Any:
"""获取变量"""
return self.variables.get(key)
def clear(self):
"""清空"""
self.buffer.clear()
self.variables.clear()
class EpisodicMemory:
"""情节记忆"""
def __init__(self):
self.episodes = []
def record_episode(self, event: str, details: Dict[str, Any]):
"""记录事件"""
episode = {
"event": event,
"details": details,
"timestamp": datetime.now().isoformat(),
"episode_id": len(self.episodes)
}
self.episodes.append(episode)
print(f"[EpisodicMemory] Recorded episode: {event}")
def recall_similar_episodes(self,
event_type: str,
limit: int = 5) -> List[Dict]:
"""回忆相似的事件"""
similar = [e for e in self.episodes
if event_type.lower() in e['event'].lower()]
return similar[-limit:]
def get_success_examples(self) -> List[Dict]:
"""获取成功的例子"""
return [e for e in self.episodes
if e.get('details', {}).get('success')]
def get_failure_examples(self) -> List[Dict]:
"""获取失败的例子"""
return [e for e in self.episodes
if not e.get('details', {}).get('success')]
class SemanticMemory:
"""语义记忆"""
def __init__(self):
self.facts = {}
self.rules = []
self.concepts = {}
def add_fact(self, subject: str, predicate: str, obj: str):
"""添加事实"""
key = f"{subject}:{predicate}"
self.facts[key] = obj
print(f"[SemanticMemory] Fact added: {subject} {predicate} {obj}")
def add_rule(self, condition: str, action: str):
"""添加规则"""
self.rules.append({"condition": condition, "action": action})
print(f"[SemanticMemory] Rule added: IF {condition} THEN {action}")
def add_concept(self, name: str, definition: str, properties: Dict):
"""添加概念"""
self.concepts[name] = {
"definition": definition,
"properties": properties
}
print(f"[SemanticMemory] Concept added: {name}")
def query_fact(self, subject: str, predicate: str) -> str:
"""查询事实"""
key = f"{subject}:{predicate}"
return self.facts.get(key, "Unknown")
def get_applicable_rules(self, condition: str) -> List[Dict]:
"""获取适用的规则"""
return [r for r in self.rules
if condition.lower() in r['condition'].lower()]
class MemoryManager:
"""内存管理器"""
def __init__(self):
self.working_memory = WorkingMemory()
self.episodic_memory = EpisodicMemory()
self.semantic_memory = SemanticMemory()
def update_context(self, context: Dict[str, Any]):
"""更新上下文"""
for key, value in context.items():
self.working_memory.set_variable(key, value)
def record_interaction(self, interaction_type: str,
details: Dict[str, Any]):
"""记录交互"""
self.working_memory.add({
"type": interaction_type,
"details": details
})
# 如果是重要事件,记录到情节记忆
if details.get('important', False):
self.episodic_memory.record_episode(interaction_type, details)
def get_context_summary(self) -> str:
"""获取上下文摘要"""
recent = self.working_memory.get_recent(3)
context = "Current context:\n"
for item in recent:
context += f" - {item['type']}: {item.get('details', {})}\n"
return context
def learn_from_experience(self, success: bool,
event_details: Dict[str, Any]):
"""从经验中学习"""
event_details['success'] = success
self.episodic_memory.record_episode(
"experience",
event_details
)
# 从成功的经验中提取规则
if success:
condition = event_details.get('condition', 'Unknown')
action = event_details.get('action', 'Unknown')
self.semantic_memory.add_rule(condition, action)
class MemoryAwareAgent:
"""具有记忆功能的代理"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.memory = MemoryManager()
def think_and_act(self, situation: Dict[str, Any]) -> str:
"""思考和行动"""
print(f"\n[{self.agent_id}] Thinking and acting...")
# 记录当前情况
self.memory.record_interaction("situation", situation)
# 从短期记忆获取上下文
context_summary = self.memory.get_context_summary()
print(context_summary)
# 从长期记忆回忆相似的情节
event_type = situation.get('type', 'unknown')
similar_episodes = self.memory.episodic_memory.recall_similar_episodes(
event_type
)
if similar_episodes:
print(f"\nRecalled {len(similar_episodes)} similar episodes:")
for ep in similar_episodes[-2:]: # 显示最近的 2 个
print(f" - {ep['event']}: {ep['details']}")
# 应用语义规则做决策
applicable_rules = self.memory.semantic_memory.get_applicable_rules(
event_type
)
if applicable_rules:
print(f"\nApplying {len(applicable_rules)} applicable rules:")
for rule in applicable_rules:
print(f" IF {rule['condition']} THEN {rule['action']}")
# 做出决策和行动
decision = self._make_decision(situation, similar_episodes, applicable_rules)
print(f"Decision: {decision}")
# 记录学习
success = True # 模拟成功
self.memory.learn_from_experience(success, {
"condition": event_type,
"action": decision
})
return decision
def _make_decision(self, situation, episodes, rules):
"""做出决策"""
# 简化的决策逻辑
if rules:
return f"Execute: {rules[0]['action']}"
elif episodes and episodes[-1].get('details', {}).get('success'):
return f"Repeat successful action from similar episode"
else:
return "Explore new action"
# 使用示例
print("=" * 70)
print("Memory Patterns Example")
print("=" * 70)
# 创建代理
agent = MemoryAwareAgent("LearningAgent")
# 初始化语义知识
agent.memory.semantic_memory.add_fact("Python", "is_language", "programming")
agent.memory.semantic_memory.add_fact("AI", "contains", "Machine Learning")
agent.memory.semantic_memory.add_concept(
"Problem Solving",
"Process of finding solutions",
{"steps": ["analyze", "plan", "execute", "review"]}
)
print("\n" + "-" * 70)
print("Scenario 1: Handle a problem")
print("-" * 70)
agent.think_and_act({
"type": "problem_solving",
"problem": "User asked about Python",
"urgency": "high"
})
print("\n" + "-" * 70)
print("Scenario 2: Handle similar situation again")
print("-" * 70)
agent.think_and_act({
"type": "problem_solving",
"problem": "User asked about AI",
"urgency": "medium"
})
print("\n" + "-" * 70)
print("Scenario 3: Handle different situation")
print("-" * 70)
agent.think_and_act({
"type": "customer_service",
"problem": "Customer complaint",
"urgency": "high"
})
print("\n" + "-" * 70)
print("Memory Summary")
print("-" * 70)
print("\nWorking Memory Variables:")
for key, value in agent.memory.working_memory.variables.items():
print(f" {key}: {value}")
print("\nEpisodic Memory (experiences):")
for ep in agent.memory.episodic_memory.episodes[-3:]:
print(f" - {ep['event']} (Success: {ep['details'].get('success')})")
print("\nSemantic Memory (rules):")
for rule in agent.memory.semantic_memory.rules:
print(f" IF {rule['condition']} THEN {rule['action']}")
最佳实践
- 短期记忆:
- 维持合理大小(避免过大导致干扰)
- 自动遗忘旧项目
- 快速访问
- 存储当前任务状态
- 情节记忆:
- 记录关键事件和结果
- 支持相似性搜索
- 成功/失败分类
- 定期清理过旧数据
- 语义记忆:
- 存储抽象知识和规则
- 支持快速查询
- 版本控制
- 定期审查和更新
- 集成策略:
- 定期从短期转入长期
- 支持遗忘和更新
- 优先级管理
- 一致性检查
适用场景
- 对话系统
- 个人助手
- 学习系统
- 长期交互
8. State Machine(状态机)
模式描述
Agent 在不同状态之间转换,每个状态有明确的进入条件、行为和退出条件。这提供了清晰的行为模式和容易的调试。
状态图示例
┌──────────────┐
│ IDLE │ (初始状态)
└──────┬───────┘
│ receive_task
│
┌──────▼──────────────┐
│ ANALYZING │ (分析阶段)
└──────┬──────────────┘
│ analysis_complete
│
┌──────▼──────────────┐
│ PLANNING │ (规划阶段)
└──────┬──────────────┘
│ plan_ready
│
┌──────▼──────────────┐
│ EXECUTING │ (执行阶段)
└──────┬──────────────┘
│ execution_complete
│
┌──────▼──────────────┐
│ REVIEWING │ (审查阶段)
└──────┬──────────────┘
│ review_complete
│
┌──────▼──────────────┐
│ COMPLETED │ (完成)
└─────────────────────┘
代码案例
from enum import Enum
from typing import Callable, Dict, Any, Optional
from dataclasses import dataclass
class State(Enum):
"""状态枚举"""
IDLE = "idle"
ANALYZING = "analyzing"
PLANNING = "planning"
EXECUTING = "executing"
REVIEWING = "reviewing"
COMPLETED = "completed"
ERROR = "error"
@dataclass
class Transition:
"""状态转换"""
from_state: State
to_state: State
trigger: str
condition: Optional[Callable] = None
action: Optional[Callable] = None
class StateMachine:
"""状态机"""
def __init__(self, initial_state: State):
self.current_state = initial_state
self.transitions: Dict[str, Transition] = {}
self.state_handlers: Dict[State, Dict[str, Callable]] = {}
self.history = [initial_state]
def add_transition(self, transition: Transition):
"""添加状态转换"""
key = f"{transition.from_state.value}:{transition.trigger}"
self.transitions[key] = transition
def add_state_handler(self, state: State,
handler_type: str,
handler: Callable):
"""添加状态处理器"""
if state not in self.state_handlers:
self.state_handlers[state] = {}
self.state_handlers[state][handler_type] = handler
def trigger_event(self, event: str) -> bool:
"""触发事件"""
key = f"{self.current_state.value}:{event}"
if key not in self.transitions:
print(f"[StateMachine] No transition for event '{event}' "
f"in state '{self.current_state.value}'")
return False
transition = self.transitions[key]
# 检查条件
if transition.condition and not transition.condition():
print(f"[StateMachine] Transition condition failed")
return False
# 执行退出处理
self._execute_state_handler(self.current_state, 'on_exit')
# 执行转换动作
if transition.action:
transition.action()
# 转换状态
old_state = self.current_state
self.current_state = transition.to_state
self.history.append(self.current_state)
print(f"[StateMachine] Transitioned from {old_state.value} "
f"to {self.current_state.value}")
# 执行进入处理
self._execute_state_handler(self.current_state, 'on_enter')
return True
def _execute_state_handler(self, state: State, handler_type: str):
"""执行状态处理器"""
if state in self.state_handlers:
handler = self.state_handlers[state].get(handler_type)
if handler:
handler()
def get_current_state(self) -> State:
"""获取当前状态"""
return self.current_state
def get_history(self) -> list:
"""获取历史"""
return [s.value for s in self.history]
class TaskProcessingAgent:
"""任务处理代理"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.state_machine = StateMachine(State.IDLE)
self.task = None
self.analysis_result = None
self.plan = None
self.execution_result = None
self.review_result = None
self._setup_state_machine()
def _setup_state_machine(self):
"""设置状态机"""
sm = self.state_machine
# 添加转换
sm.add_transition(Transition(
State.IDLE, State.ANALYZING,
"receive_task",
action=self._on_receive_task
))
sm.add_transition(Transition(
State.ANALYZING, State.PLANNING,
"analysis_complete",
action=self._on_analysis_complete
))
sm.add_transition(Transition(
State.ANALYZING, State.ERROR,
"analysis_failed",
action=self._on_analysis_failed
))
sm.add_transition(Transition(
State.PLANNING, State.EXECUTING,
"plan_ready",
action=self._on_plan_ready
))
sm.add_transition(Transition(
State.PLANNING, State.ERROR,
"planning_failed",
action=self._on_planning_failed
))
sm.add_transition(Transition(
State.EXECUTING, State.REVIEWING,
"execution_complete",
action=self._on_execution_complete
))
sm.add_transition(Transition(
State.EXECUTING, State.ERROR,
"execution_failed",
action=self._on_execution_failed
))
sm.add_transition(Transition(
State.REVIEWING, State.COMPLETED,
"review_complete",
action=self._on_review_complete
))
# 添加状态处理器
sm.add_state_handler(State.IDLE, 'on_enter',
lambda: print(f"[{self.agent_id}] Ready for new tasks"))
sm.add_state_handler(State.ANALYZING, 'on_enter',
lambda: print(f"[{self.agent_id}] Analyzing task..."))
sm.add_state_handler(State.PLANNING, 'on_enter',
lambda: print(f"[{self.agent_id}] Planning execution..."))
sm.add_state_handler(State.EXECUTING, 'on_enter',
lambda: print(f"[{self.agent_id}] Executing plan..."))
sm.add_state_handler(State.REVIEWING, 'on_enter',
lambda: print(f"[{self.agent_id}] Reviewing results..."))
sm.add_state_handler(State.COMPLETED, 'on_enter',
lambda: print(f"[{self.agent_id}] Task completed successfully"))
sm.add_state_handler(State.ERROR, 'on_enter',
lambda: print(f"[{self.agent_id}] Error occurred during processing"))
def receive_task(self, task: Dict[str, Any]):
"""接收任务"""
self.task = task
print(f"\n[{self.agent_id}] Received task: {task.get('name')}")
self.state_machine.trigger_event("receive_task")
def _on_receive_task(self):
"""处理接收任务"""
print(f" Task details: {self.task}")
def analyze_task(self):
"""分析任务"""
print(f" Analyzing... ")
self.analysis_result = {
"complexity": "high",
"dependencies": ["resource_a", "resource_b"],
"estimated_time": 100
}
print(f" Analysis result: {self.analysis_result}")
self.state_machine.trigger_event("analysis_complete")
def _on_analysis_complete(self):
"""分析完成后的处理"""
print(f" Analysis completed successfully")
def _on_analysis_failed(self):
"""分析失败处理"""
print(f" Analysis failed")
def plan_execution(self):
"""规划执行"""
print(f" Planning... ")
self.plan = [
"Step 1: Initialize resources",
"Step 2: Process data",
"Step 3: Generate results",
"Step 4: Cleanup"
]
print(f" Plan created: {len(self.plan)} steps")
self.state_machine.trigger_event("plan_ready")
def _on_plan_ready(self):
"""计划就绪处理"""
print(f" Plan is ready for execution")
def _on_planning_failed(self):
"""规划失败处理"""
print(f" Planning failed")
def execute_plan(self):
"""执行计划"""
print(f" Executing plan...")
self.execution_result = {
"steps_completed": len(self.plan),
"success": True,
"duration": 95
}
print(f" Execution result: {self.execution_result}")
self.state_machine.trigger_event("execution_complete")
def _on_execution_complete(self):
"""执行完成处理"""
print(f" Execution completed")
def _on_execution_failed(self):
"""执行失败处理"""
print(f" Execution failed")
def review_results(self):
"""审查结果"""
print(f" Reviewing results...")
self.review_result = {
"quality": "high",
"meets_requirements": True,
"issues": []
}
print(f" Review result: Quality {self.review_result['quality']}")
self.state_machine.trigger_event("review_complete")
def _on_review_complete(self):
"""审查完成处理"""
print(f" Review completed successfully")
def run_workflow(self, task: Dict[str, Any]):
"""运行完整工作流"""
self.receive_task(task)
self.analyze_task()
self.plan_execution()
self.execute_plan()
self.review_results()
print(f"\nState transitions history: {self.state_machine.get_history()}")
print(f"Final state: {self.state_machine.get_current_state().value}")
# 使用示例
print("=" * 70)
print("State Machine Pattern Example")
print("=" * 70)
agent = TaskProcessingAgent("ProcessingAgent1")
task = {
"name": "Data Processing Task",
"description": "Process customer data",
"priority": "high"
}
agent.run_workflow(task)
最佳实践
- 状态定义:
- 状态应该明确且互斥
- 避免过多状态(通常不超过 10 个)
- 使用枚举定义状态
- 转换管理:
- 明确定义所有可能的转换
- 提供保护条件
- 实现超时处理
- 错误处理:
- 添加 ERROR 状态
- 实现错误恢复机制
- 记录状态转换日志
- 监控和调试:
- 记录状态转换历史
- 添加状态转换验证
- 可视化状态图
- 性能优化:
- 缓存常用转换
- 避免状态中的长时间操作
- 使用异步转换
适用场景
- 工作流管理
- 用户交互管理
- 设备控制
- 游戏逻辑
- 会话管理
总结与对比
| 模式 | 复杂度 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| Agentic Loop | 低 | 实时系统 | 简单、直观 | 无法处理复杂任务 |
| ReAct | 中 | 问答系统 | 推理明确、可解释 | 需要 LLM 支持 |
| Planner-Executor | 中 | 项目管理 | 任务分解清晰 | 计划可能不适应变化 |
| Hierarchical | 高 | 大规模系统 | 可扩展、可维护 | 通信复杂 |
| Multi-Agent | 高 | 分布式系统 | 灵活、容错 | 同步困难 |
| Tool Use | 中 | 集成系统 | 扩展能力 | 工具管理复杂 |
| Memory | 中 | 长期交互 | 学习能力 | 内存管理复杂 |
| State Machine | 低 | 流程系统 | 清晰、易调试 | 扩展性有限 |
选择建议
- 简单任务:使用 Agentic Loop 或 State Machine
- 复杂推理:使用 ReAct
- 大规模任务:使用 Planner-Executor
- 分布式系统:使用 Hierarchical 或 Multi-Agent
- 需要学习:添加 Memory
- 需要集成:使用 Tool Use
通常实际应用会结合多个模式使用,形成混合架构。