35 min read

Agent 设计模式有哪些?

Agent 设计模式有哪些?
Agent 设计模式有哪些?

Agent 设计模式详细指南

1. Agentic Loop(代理循环)

模式描述

最基础的 Agent 架构,Agent 在感知-决策-执行-反馈的循环中不断运作。

核心流程

┌─────────────┐
│   感知      │  收集环境信息
└──────┬──────┘
       │
┌──────▼──────┐
│   决策      │  分析信息,选择行动
└──────┬──────┘
       │
┌──────▼──────┐
│   执行      │  执行选定的行动
└──────┬──────┘
       │
┌──────▼──────┐
│   反馈      │  评估结果,更新状态
└──────┬──────┘
       │
       └─────────────┘(循环)

代码案例

Python 实现

class Agent:
    def __init__(self, name):
        self.name = name
        self.state = {}
        self.max_iterations = 10
    
    def perceive(self, environment):
        """感知环境"""
        observations = {
            'time': environment.current_time,
            'location': environment.agent_location,
            'resources': environment.available_resources,
            'threats': environment.detect_threats()
        }
        return observations
    
    def decide(self, observations):
        """做出决策"""
        if observations['threats']:
            action = 'escape'
        elif observations['resources'] < 20:
            action = 'gather_resources'
        else:
            action = 'explore'
        return action
    
    def execute(self, action, environment):
        """执行行动"""
        result = environment.perform_action(action)
        return result
    
    def feedback(self, result):
        """反馈和学习"""
        self.state['last_action'] = result['action']
        self.state['success'] = result['success']
        if result['success']:
            self.state['reward'] = result['reward']
    
    def run(self, environment):
        """主循环"""
        for iteration in range(self.max_iterations):
            print(f"\n--- Iteration {iteration + 1} ---")
            
            # 感知
            observations = self.perceive(environment)
            print(f"Observations: {observations}")
            
            # 决策
            action = self.decide(observations)
            print(f"Decision: {action}")
            
            # 执行
            result = self.execute(action, environment)
            print(f"Result: {result}")
            
            # 反馈
            self.feedback(result)
            
            # 检查是否完成
            if environment.is_goal_reached():
                print(f"Goal reached!")
                break

环境模拟

class Environment:
    def __init__(self):
        self.current_time = 0
        self.agent_location = (0, 0)
        self.available_resources = 100
        self.threats = []
    
    def perform_action(self, action):
        self.current_time += 1
        result = {'action': action, 'success': True, 'reward': 0}
        
        if action == 'gather_resources':
            self.available_resources += 30
            result['reward'] = 1
        elif action == 'explore':
            self.agent_location = (self.agent_location[0] + 1, 
                                   self.agent_location[1] + 1)
            result['reward'] = 0.5
        elif action == 'escape':
            self.threats = []
            result['reward'] = 0.3
        
        # 随机生成威胁
        if self.current_time % 3 == 0:
            self.threats.append('enemy')
        
        return result
    
    def detect_threats(self):
        return len(self.threats)
    
    def is_goal_reached(self):
        return self.current_time >= 10

# 使用案例
env = Environment()
agent = Agent("MainAgent")
agent.run(env)

最佳实践

  1. 明确定义状态:确保状态能准确反映 Agent 的当前情况
  2. 灵活的循环控制:提供退出条件(目标达成、超时等)
  3. 性能监控:记录每个循环的指标便于调试
  4. 错误处理:在每个阶段添加异常处理
  5. 可观测性:记录日志方便追踪 Agent 行为

适用场景

  • 实时游戏 AI
  • 自主机器人控制
  • 模拟系统
  • 交互式系统

2. ReAct(推理+行动)

模式描述

Agent 在执行行动前先进行详细的推理,通过"思考→行动→观察"的循环来解决问题。这是大语言模型驱动的 Agent 的常用模式。

核心流程

┌──────────────────┐
│   输入问题        │
└────────┬─────────┘
         │
┌────────▼─────────┐
│  思考/推理        │  LLM 分析问题
│  (Reasoning)     │
└────────┬─────────┘
         │
┌────────▼─────────┐
│  计划行动         │  决定采取什么行动
│  (Planning)      │
└────────┬─────────┘
         │
┌────────▼─────────┐
│  执行行动         │  调用工具/API
│  (Action)        │
└────────┬─────────┘
         │
┌────────▼─────────┐
│  观察结果         │  获取行动结果
│  (Observation)   │
└────────┬─────────┘
         │
         ├─→ 目标达成? ─是─→ 输出最终答案
         │
         └─否─→ 回到思考阶段(反复迭代)

代码案例

Python 实现(使用 LLM API)

import json
from typing import Any, Dict, List

class ReactAgent:
    def __init__(self, model_api_client, tools: Dict[str, Any]):
        self.client = model_api_client
        self.tools = tools  # 可用的工具字典
        self.max_steps = 10
        self.conversation_history = []
    
    def get_available_tools_description(self) -> str:
        """获取可用工具的描述"""
        tools_desc = "Available tools:\n"
        for tool_name, tool_info in self.tools.items():
            tools_desc += f"- {tool_name}: {tool_info['description']}\n"
        return tools_desc
    
    def think(self, question: str, observations: List[str]) -> str:
        """推理阶段:LLM 分析问题和观察"""
        tools_desc = self.get_available_tools_description()
        
        system_prompt = f"""You are a reasoning agent. Your task is to:
1. Analyze the problem carefully
2. Consider what you've observed so far
3. Decide what action to take next
4. Use tools available to you

{tools_desc}

Respond in this format:
Thought: [Your analysis]
Action: [tool_name]
Action Input: [input_for_tool or "None"]
"""
        
        # 构建对话历史
        messages = [{"role": "user", "content": question}]
        if observations:
            messages.append({
                "role": "user", 
                "content": f"Previous observations:\n" + "\n".join(observations)
            })
        
        # 调用 LLM
        response = self.client.call_api(system_prompt, messages)
        return response
    
    def parse_action(self, response: str) -> tuple:
        """解析 LLM 的响应提取行动"""
        lines = response.split('\n')
        action = None
        action_input = None
        
        for line in lines:
            if line.startswith('Action:'):
                action = line.split(':', 1)[1].strip()
            elif line.startswith('Action Input:'):
                action_input = line.split(':', 1)[1].strip()
        
        return action, action_input
    
    def execute_action(self, action: str, action_input: str) -> str:
        """执行选定的行动"""
        if action not in self.tools:
            return f"Error: Tool '{action}' not found"
        
        tool = self.tools[action]
        try:
            result = tool['function'](action_input)
            return f"Observation: {result}"
        except Exception as e:
            return f"Error executing {action}: {str(e)}"
    
    def run(self, question: str) -> str:
        """主循环"""
        print(f"\n{'='*60}")
        print(f"Question: {question}")
        print(f"{'='*60}\n")
        
        observations = []
        step = 0
        
        while step < self.max_steps:
            step += 1
            print(f"--- Step {step} ---")
            
            # 推理
            thinking = self.think(question, observations)
            print(f"Thinking:\n{thinking}\n")
            
            # 解析行动
            action, action_input = self.parse_action(thinking)
            
            if action is None or action == "Final Answer":
                # Agent 认为已有最终答案
                final_answer = thinking.split("Final Answer:")[-1].strip()
                print(f"\n✓ Final Answer: {final_answer}")
                return final_answer
            
            # 执行行动
            print(f"Action: {action}")
            print(f"Action Input: {action_input}")
            observation = self.execute_action(action, action_input)
            print(f"{observation}\n")
            
            observations.append(observation)
        
        return f"Failed to find answer after {self.max_steps} steps"


# 工具定义
def search_wikipedia(query: str) -> str:
    """模拟维基百科搜索"""
    # 实际应用中会调用真实的 API
    knowledge_base = {
        "Python": "Python is a high-level programming language",
        "AI": "AI stands for Artificial Intelligence",
        "Machine Learning": "ML is a subset of AI"
    }
    return knowledge_base.get(query, "No results found")

def calculate(expression: str) -> str:
    """计算器"""
    try:
        result = eval(expression)
        return f"Result: {result}"
    except:
        return "Invalid expression"

def get_current_date() -> str:
    """获取当前日期"""
    from datetime import datetime
    return datetime.now().strftime("%Y-%m-%d")

# 定义工具字典
tools = {
    "search_wikipedia": {
        "description": "Search for information on Wikipedia",
        "function": search_wikipedia
    },
    "calculate": {
        "description": "Perform mathematical calculations",
        "function": calculate
    },
    "get_date": {
        "description": "Get the current date",
        "function": get_current_date
    }
}

# 模拟 LLM 客户端
class MockLLMClient:
    def call_api(self, system_prompt, messages):
        # 这里应该调用实际的 LLM API
        # 为演示目的,返回预定义的响应
        return """Thought: I need to search for information about Python
Action: search_wikipedia
Action Input: Python"""

# 使用示例
# client = MockLLMClient()
# agent = ReactAgent(client, tools)
# result = agent.run("What is Python?")

真实 API 实现示例(使用 Anthropic Claude)

import anthropic
import json

class ClaudeReactAgent:
    def __init__(self):
        self.client = anthropic.Anthropic()
        self.tools = [
            {
                "name": "search_wikipedia",
                "description": "Search Wikipedia for information",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "Search query"
                        }
                    },
                    "required": ["query"]
                }
            },
            {
                "name": "calculate",
                "description": "Perform mathematical calculations",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "expression": {
                            "type": "string",
                            "description": "Mathematical expression"
                        }
                    },
                    "required": ["expression"]
                }
            }
        ]
        self.messages = []
    
    def process_tool_call(self, tool_name: str, tool_input: dict) -> str:
        """处理工具调用"""
        if tool_name == "search_wikipedia":
            return f"Found information about {tool_input['query']}"
        elif tool_name == "calculate":
            try:
                result = eval(tool_input['expression'])
                return f"Calculation result: {result}"
            except Exception as e:
                return f"Error: {str(e)}"
        return "Unknown tool"
    
    def run(self, user_query: str, max_iterations: int = 10):
        """运行 ReAct Agent"""
        self.messages = [{"role": "user", "content": user_query}]
        
        for iteration in range(max_iterations):
            print(f"\n--- Iteration {iteration + 1} ---")
            
            # 调用 Claude
            response = self.client.messages.create(
                model="claude-opus-4-20250514",
                max_tokens=1024,
                tools=self.tools,
                messages=self.messages
            )
            
            print(f"Stop Reason: {response.stop_reason}")
            
            # 检查是否需要调用工具
            if response.stop_reason == "tool_use":
                # 添加助手响应
                self.messages.append({
                    "role": "assistant",
                    "content": response.content
                })
                
                # 处理工具调用
                tool_results = []
                for block in response.content:
                    if block.type == "tool_use":
                        print(f"Tool: {block.name}")
                        print(f"Input: {block.input}")
                        
                        result = self.process_tool_call(
                            block.name, 
                            block.input
                        )
                        print(f"Result: {result}")
                        
                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": result
                        })
                
                # 添加工具结果
                self.messages.append({
                    "role": "user",
                    "content": tool_results
                })
            
            else:
                # 最终答案
                for block in response.content:
                    if hasattr(block, 'text'):
                        print(f"\n✓ Final Answer:\n{block.text}")
                        return block.text
        
        return "Max iterations reached"

# 使用
# agent = ClaudeReactAgent()
# agent.run("What is the capital of France?")

最佳实践

  1. 清晰的思维过程:让 Agent 显式地说出推理步骤
  2. 工具设计
    • 工具应该专一且明确定义
    • 提供详细的使用说明
    • 返回结构化的结果
  3. 迭代限制:设置最大步数以防止无限循环
  4. 观察记录:保存所有观察用于上下文
  5. 错误处理:优雅地处理工具失败
  6. 性能优化:缓存常见查询结果

适用场景

  • 问答系统 (QA)
  • 数据分析
  • 研究辅助
  • 复杂问题求解

3. Planner-Executor(计划-执行)

模式描述

将复杂任务分解为两个阶段:

  1. Planner:根据目标制定详细的分步计划
  2. Executor:按照计划逐步执行每个子任务

这避免了 Agent 逐步决策导致的高延迟和错误积累。

架构图

┌─────────────────────────────────────┐
│          用户目标                     │
└────────────────┬────────────────────┘
                 │
          ┌──────▼──────┐
          │   Planner   │  分解任务
          │   规划器    │  制定计划
          └──────┬──────┘
                 │
         ┌───────▼────────┐
         │  Detailed Plan  │
         │  ┌─────────┐   │
         │  │ Step 1  │   │
         │  ├─────────┤   │
         │  │ Step 2  │   │
         │  ├─────────┤   │
         │  │ Step N  │   │
         │  └─────────┘   │
         └───────┬────────┘
                 │
          ┌──────▼──────────┐
          │    Executor     │  执行计划
          │    执行器       │
          └─────────────────┘

代码案例

from typing import List, Dict
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

class Task:
    def __init__(self, task_id: str, description: str, 
                 dependencies: List[str] = None, estimated_time: int = 0):
        self.task_id = task_id
        self.description = description
        self.dependencies = dependencies or []
        self.estimated_time = estimated_time
        self.status = TaskStatus.PENDING
        self.result = None
    
    def __repr__(self):
        return f"Task({self.task_id}, {self.status.value})"

class Plan:
    def __init__(self, goal: str, tasks: List[Task]):
        self.goal = goal
        self.tasks = tasks
        self.creation_time = None
    
    def get_executable_tasks(self) -> List[Task]:
        """获取当前可执行的任务(依赖已满足)"""
        completed_ids = {t.task_id for t in self.tasks 
                        if t.status == TaskStatus.COMPLETED}
        
        executable = []
        for task in self.tasks:
            if task.status == TaskStatus.PENDING:
                # 检查所有依赖是否已完成
                if all(dep in completed_ids for dep in task.dependencies):
                    executable.append(task)
        
        return executable
    
    def __str__(self):
        result = f"Plan for: {self.goal}\n"
        result += "=" * 60 + "\n"
        for i, task in enumerate(self.tasks, 1):
            result += f"{i}. [{task.status.value:12}] {task.task_id}: {task.description}\n"
            if task.dependencies:
                result += f"   Dependencies: {', '.join(task.dependencies)}\n"
        return result

class Planner:
    """规划器:制定计划"""
    
    def plan(self, goal: str, context: Dict) -> Plan:
        """根据目标和上下文制定计划"""
        print(f"\n[Planner] Planning for goal: {goal}")
        print(f"[Planner] Context: {context}\n")
        
        # 这里应该使用 LLM 或规则引擎分解任务
        # 示例:构建一个网站
        if "website" in goal.lower():
            return self._plan_website(context)
        elif "report" in goal.lower():
            return self._plan_report(context)
        else:
            return self._plan_generic(goal, context)
    
    def _plan_website(self, context: Dict) -> Plan:
        """网站构建计划"""
        tasks = [
            Task("design", "设计网站架构和UI", [], 480),
            Task("frontend", "开发前端代码", ["design"], 720),
            Task("backend", "开发后端API", ["design"], 840),
            Task("database", "设计和实现数据库", [], 360),
            Task("integrate", "前后端集成", ["frontend", "backend", "database"], 240),
            Task("test", "测试整个系统", ["integrate"], 480),
            Task("deploy", "部署到生产环境", ["test"], 120),
        ]
        return Plan("Build a website", tasks)
    
    def _plan_report(self, context: Dict) -> Plan:
        """报告生成计划"""
        tasks = [
            Task("collect_data", "收集数据", [], 120),
            Task("analyze", "分析数据", ["collect_data"], 180),
            Task("write_draft", "撰写初稿", ["analyze"], 240),
            Task("review", "审阅和修改", ["write_draft"], 120),
            Task("format", "格式化和排版", ["review"], 60),
        ]
        return Plan("Generate a report", tasks)
    
    def _plan_generic(self, goal: str, context: Dict) -> Plan:
        """通用计划"""
        tasks = [
            Task("understand", "理解需求", [], 60),
            Task("plan", "制定策略", ["understand"], 120),
            Task("execute", "执行", ["plan"], 300),
            Task("review", "审查结果", ["execute"], 120),
        ]
        return Plan(goal, tasks)

class Executor:
    """执行器:执行计划"""
    
    def __init__(self, tools: Dict[str, callable]):
        self.tools = tools
    
    def execute(self, plan: Plan) -> bool:
        """执行计划"""
        print(f"\n[Executor] Executing plan: {plan.goal}\n")
        print(plan)
        
        completed_count = 0
        failed_count = 0
        
        while True:
            # 获取可执行的任务
            executable_tasks = plan.get_executable_tasks()
            
            if not executable_tasks:
                # 检查是否所有任务都完成
                all_completed = all(
                    t.status == TaskStatus.COMPLETED 
                    for t in plan.tasks
                )
                if all_completed:
                    print(f"\n✓ Plan completed successfully!")
                    print(f"  Completed: {completed_count}, Failed: {failed_count}")
                    return True
                else:
                    # 有失败的任务
                    failed_tasks = [t for t in plan.tasks 
                                   if t.status == TaskStatus.FAILED]
                    print(f"\n✗ Plan failed!")
                    print(f"  Failed tasks: {[t.task_id for t in failed_tasks]}")
                    return False
            
            # 执行第一个可执行的任务
            task = executable_tasks[0]
            success = self._execute_task(task, plan)
            
            if success:
                completed_count += 1
            else:
                failed_count += 1
        
        return failed_count == 0
    
    def _execute_task(self, task: Task, plan: Plan) -> bool:
        """执行单个任务"""
        print(f"\n[Task {task.task_id}] Executing: {task.description}")
        print(f"  Estimated time: {task.estimated_time} minutes")
        
        task.status = TaskStatus.IN_PROGRESS
        
        try:
            # 获取对应的执行工具
            if task.task_id in self.tools:
                tool = self.tools[task.task_id]
                result = tool()
                task.result = result
                task.status = TaskStatus.COMPLETED
                print(f"  ✓ Completed: {result}")
                return True
            else:
                # 模拟执行
                import random
                success = random.random() > 0.2  # 80% 成功率
                
                if success:
                    task.status = TaskStatus.COMPLETED
                    print(f"  ✓ Completed")
                    return True
                else:
                    task.status = TaskStatus.FAILED
                    print(f"  ✗ Failed")
                    return False
        
        except Exception as e:
            task.status = TaskStatus.FAILED
            print(f"  ✗ Error: {str(e)}")
            return False

class PlannerExecutor:
    """协调规划器和执行器"""
    
    def __init__(self, tools: Dict[str, callable] = None):
        self.planner = Planner()
        self.executor = Executor(tools or {})
    
    def run(self, goal: str, context: Dict = None) -> bool:
        """运行完整的计划-执行流程"""
        context = context or {}
        
        # 步骤 1: 规划
        plan = self.planner.plan(goal, context)
        
        # 步骤 2: 执行
        success = self.executor.execute(plan)
        
        return success

# 使用示例
if __name__ == "__main__":
    # 定义工具
    tools = {
        "design": lambda: "UI designs completed",
        "frontend": lambda: "Frontend code ready",
        "backend": lambda: "Backend API ready",
        "database": lambda: "Database schema created",
        "integrate": lambda: "System integration complete",
        "test": lambda: "All tests passed",
        "deploy": lambda: "System deployed to production"
    }
    
    # 创建规划-执行器
    pe = PlannerExecutor(tools)
    
    # 执行计划
    success = pe.run("Build a website", {"team_size": 5, "budget": 50000})

最佳实践

  1. 任务分解
    • 任务粒度要适当
    • 避免过度分解导致执行开销大
    • 避免任务过大导致难以执行
  2. 依赖管理
    • 清晰定义任务依赖关系
    • 使用有向无环图 (DAG) 表示任务依赖
    • 检查循环依赖
  3. 错误恢复
    • 定义失败任务的重试策略
    • 支持断点恢复
    • 记录执行日志
  4. 动态调整
    • 允许在执行过程中调整计划
    • 支持暂停和恢复
    • 处理运行时障碍
  5. 性能优化
    • 并行执行独立任务
    • 优先级队列处理
    • 缓存中间结果

适用场景

  • 项目管理
  • 数据处理管道
  • 工作流自动化
  • 复杂业务流程

4. Hierarchical Agent(分层代理)

模式描述

通过构建多层级的 Agent 架构,高层 Agent 负责战略决策,低层 Agent 执行具体操作。这提供了更好的可扩展性和可维护性。

架构图

┌────────────────────────────────────────┐
│    Level 1: Strategic Agent             │  战略决策
│    (目标设定、资源分配)                  │
└─────────────┬──────────────────────────┘
              │
    ┌─────────┼─────────┐
    │         │         │
┌───▼───┐ ┌──▼──┐ ┌───▼───┐
│ Agent │ │Agent │ │ Agent │  具体执行
│  2.1  │ │ 2.2 │ │ 2.3   │
└───┬───┘ └──┬──┘ └───┬───┘
    │        │        │
  ┌─▼─┐    ┌─▼─┐    ┌─▼─┐
  │3.1│    │3.2│    │3.3│    最基层操作
  └───┘    └───┘    └───┘

代码案例

from abc import ABC, abstractmethod
from typing import List, Dict, Any
from enum import Enum

class AgentLevel(Enum):
    STRATEGIC = 1
    TACTICAL = 2
    OPERATIONAL = 3

class HierarchicalAgent(ABC):
    """分层代理的基类"""
    
    def __init__(self, agent_id: str, level: AgentLevel):
        self.agent_id = agent_id
        self.level = level
        self.subordinates: List['HierarchicalAgent'] = []
        self.superior: 'HierarchicalAgent' = None
        self.state = {}
    
    @abstractmethod
    def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """执行任务"""
        pass
    
    def add_subordinate(self, agent: 'HierarchicalAgent'):
        """添加下级代理"""
        agent.superior = self
        self.subordinates.append(agent)
    
    def delegate(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """委派任务给下级代理"""
        if not self.subordinates:
            print(f"[{self.agent_id}] No subordinates to delegate to")
            return {"success": False, "error": "No subordinates"}
        
        # 选择合适的下级代理
        selected_agent = self._select_subordinate(task)
        
        if selected_agent is None:
            return {"success": False, "error": "No suitable subordinate"}
        
        print(f"[{self.agent_id}] Delegating task to [{selected_agent.agent_id}]")
        return selected_agent.execute(task)
    
    def _select_subordinate(self, task: Dict[str, Any]) -> 'HierarchicalAgent':
        """选择合适的下级代理"""
        # 简单的选择策略:选择第一个合适的
        for agent in self.subordinates:
            if agent.can_execute(task):
                return agent
        return None
    
    def can_execute(self, task: Dict[str, Any]) -> bool:
        """检查是否能执行任务"""
        return True
    
    def report_to_superior(self, result: Dict[str, Any]) -> None:
        """向上级报告"""
        if self.superior:
            print(f"[{self.agent_id}] Reporting to [{self.superior.agent_id}]: {result}")

class StrategicAgent(HierarchicalAgent):
    """战略层代理"""
    
    def __init__(self, agent_id: str):
        super().__init__(agent_id, AgentLevel.STRATEGIC)
    
    def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """战略层执行:分解目标、分配资源"""
        print(f"\n[{self.agent_id}] Strategic Planning")
        print(f"  Task: {task.get('objective', 'Unknown')}")
        
        # 分解目标
        subtasks = self._decompose_objective(task)
        print(f"  Decomposed into {len(subtasks)} subtasks")
        
        # 分配给下级代理
        results = []
        for subtask in subtasks:
            result = self.delegate(subtask)
            results.append(result)
        
        # 汇总结果
        return {
            "level": "strategic",
            "original_task": task,
            "subtasks_results": results,
            "success": all(r.get("success", False) for r in results)
        }
    
    def _decompose_objective(self, task: Dict[str, Any]) -> List[Dict]:
        """分解目标为子任务"""
        objective = task.get("objective", "")
        
        if "defend" in objective:
            return [
                {"type": "defense", "task": "patrol_borders", "priority": "high"},
                {"type": "defense", "task": "fortify_positions", "priority": "high"},
                {"type": "defense", "task": "monitor_threats", "priority": "medium"},
            ]
        elif "expand" in objective:
            return [
                {"type": "expansion", "task": "explore_territory", "priority": "high"},
                {"type": "expansion", "task": "secure_resources", "priority": "high"},
                {"type": "expansion", "task": "establish_outposts", "priority": "medium"},
            ]
        else:
            return [
                {"type": "general", "task": "maintain_order", "priority": "medium"},
                {"type": "general", "task": "gather_intelligence", "priority": "medium"},
            ]

class TacticalAgent(HierarchicalAgent):
    """战术层代理"""
    
    def __init__(self, agent_id: str, specialization: str = "general"):
        super().__init__(agent_id, AgentLevel.TACTICAL)
        self.specialization = specialization
    
    def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """战术层执行:制定具体方案"""
        task_type = task.get("type", "")
        
        print(f"\n[{self.agent_id}] ({self.specialization}) Tactical Planning")
        print(f"  Task: {task.get('task', 'Unknown')}")
        print(f"  Priority: {task.get('priority', 'Unknown')}")
        
        # 根据任务类型制定计划
        plan = self._create_plan(task)
        print(f"  Plan: {plan}")
        
        # 委派给下级代理执行
        if self.subordinates:
            result = self.delegate(task)
        else:
            # 如果没有下级,直接执行
            result = {"success": True, "action": plan}
        
        return {
            "level": "tactical",
            "task": task,
            "plan": plan,
            "execution_result": result
        }
    
    def _create_plan(self, task: Dict) -> str:
        """根据任务创建具体计划"""
        task_name = task.get("task", "")
        specialization = self.specialization
        
        plans = {
            "patrol_borders": f"{specialization}: Establish patrol routes",
            "fortify_positions": f"{specialization}: Reinforce defenses",
            "explore_territory": f"{specialization}: Scout new areas",
            "secure_resources": f"{specialization}: Allocate gathering teams",
            "defend": f"{specialization}: Deploy defensive positions",
        }
        
        return plans.get(task_name, f"{specialization}: Execute task")
    
    def can_execute(self, task: Dict) -> bool:
        """检查能否执行"""
        # 根据专项进行检查
        if self.specialization == "defense":
            return "defense" in task.get("type", "") or \
                   "patrol" in task.get("task", "") or \
                   "fortify" in task.get("task", "")
        elif self.specialization == "logistics":
            return "resource" in task.get("task", "") or \
                   "supply" in task.get("task", "")
        return True

class OperationalAgent(HierarchicalAgent):
    """操作层代理"""
    
    def __init__(self, agent_id: str, capability: str = "general"):
        super().__init__(agent_id, AgentLevel.OPERATIONAL)
        self.capability = capability
    
    def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """操作层执行:具体的单位操作"""
        print(f"\n[{self.agent_id}] ({self.capability}) Operational Execution")
        print(f"  Executing: {task.get('task', 'Unknown')}")
        
        # 执行具体操作
        result = self._perform_action(task)
        
        return {
            "level": "operational",
            "unit": self.agent_id,
            "capability": self.capability,
            "task": task,
            "result": result,
            "success": True
        }
    
    def _perform_action(self, task: Dict) -> str:
        """执行具体动作"""
        action = task.get("task", "unknown")
        return f"Completed action: {action} using {self.capability}"

class HierarchicalCommandStructure:
    """分层指挥结构管理"""
    
    def __init__(self):
        self.root_agent = None
        self.all_agents = {}
    
    def build_structure(self):
        """构建示例的分层结构"""
        # 创建战略层
        self.root_agent = StrategicAgent("Commander")
        self.all_agents["Commander"] = self.root_agent
        
        # 创建战术层
        tactical_1 = TacticalAgent("DefenseCommandant", "defense")
        tactical_2 = TacticalAgent("LogisticsCommandant", "logistics")
        
        self.root_agent.add_subordinate(tactical_1)
        self.root_agent.add_subordinate(tactical_2)
        
        self.all_agents["DefenseCommandant"] = tactical_1
        self.all_agents["LogisticsCommandant"] = tactical_2
        
        # 创建操作层
        op_agents_defense = [
            OperationalAgent("PatrolUnit1", "patrol"),
            OperationalAgent("FortificationTeam", "fortification"),
            OperationalAgent("IntelligenceUnit", "reconnaissance"),
        ]
        
        for agent in op_agents_defense:
            tactical_1.add_subordinate(agent)
            self.all_agents[agent.agent_id] = agent
        
        op_agents_logistics = [
            OperationalAgent("SupplyTeam1", "supply"),
            OperationalAgent("TransportUnit", "transport"),
        ]
        
        for agent in op_agents_logistics:
            tactical_2.add_subordinate(agent)
            self.all_agents[agent.agent_id] = agent
    
    def execute_mission(self, objective: str):
        """执行任务"""
        if not self.root_agent:
            self.build_structure()
        
        print("=" * 70)
        print(f"Mission: {objective}")
        print("=" * 70)
        
        task = {"objective": objective}
        result = self.root_agent.execute(task)
        
        print("\n" + "=" * 70)
        print("Mission Summary")
        print("=" * 70)
        print(f"Overall Success: {result.get('success')}")
        print(f"Number of Subtasks: {len(result.get('subtasks_results', []))}")

# 使用示例
if __name__ == "__main__":
    structure = HierarchicalCommandStructure()
    structure.execute_mission("defend the kingdom")

最佳实践

  1. 清晰的层级划分
    • 明确每一层的职责
    • 避免层级过多(通常3-5层)
    • 定义清晰的通信接口
  2. 代理通信
    • 定义标准的消息格式
    • 支持双向通信(上报、下达)
    • 实现消息队列避免阻塞
  3. 决策分权
    • 高层做战略决策
    • 底层有自主权
    • 中层协调和执行
  4. 扩展性
    • 支持动态增删代理
    • 支持重组层级
    • 支持代理的热更新
  5. 监控和日志
    • 记录每层的决策
    • 追踪任务流转
    • 性能指标收集

适用场景

  • 组织管理系统
  • 游戏 AI(怪物群体)
  • 军事指挥系统
  • 企业工作流

5. Multi-Agent Collaboration(多代理协作)

模式描述

多个 Agent 通过通信和协调来完成共同目标。支持三种主要的协作模式:

  • Master-Slave:主代理指挥从代理
  • Peer-to-Peer:平等的代理相互通信
  • Publish-Subscribe:通过事件进行通信

架构对比

Master-Slave Model:
┌──────────────┐
│  Master      │
│  Agent       │
└──────┬───────┘
       │
    ┌──┴──┬────┐
    │     │    │
  ┌─▼──┐┌──▼─┐┌──▼─┐
  │Slave││Slave││Slave│
  │  A  ││  B  ││  C  │
  └────┘└────┘└────┘

Peer-to-Peer Model:
    ┌─────────────────┐
    │   Agent A       │
    └────────┬────────┘
             │
      ┌──────┼──────┐
      │      │      │
   ┌──▼──┐──┘  ┌───▼──┐
   │Agent│     │Agent  │
   │  B  │─────┤  C    │
   └──┬──┘     └───┬──┘
      │            │
      └────────────┘

Publish-Subscribe Model:
    ┌─────────┐
    │ Agent A │
    └────┬────┘
         │ publish
         │
    ┌────▼────────────────┐
    │  Event Bus / Broker  │
    └────┬─────────────────┘
         │
    ┌────┴─────────┬──────────┐
    │ subscribe    │subscribe │
    │              │          │
 ┌──▼──┐      ┌────▼───┐  ┌──▼──┐
 │Agent│      │ Agent  │  │Agent │
 │  B  │      │  C     │  │  D   │
 └─────┘      └────────┘  └──────┘

代码案例

1. Master-Slave 模式

from typing import List, Dict, Any
from enum import Enum
from abc import ABC, abstractmethod

class TaskType(Enum):
    IMAGE_PROCESSING = "image_processing"
    DATA_ANALYSIS = "data_analysis"
    TEXT_PROCESSING = "text_processing"

class SlaveAgent:
    """从代理"""
    
    def __init__(self, agent_id: str, 
                 capabilities: List[TaskType]):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.is_busy = False
        self.current_task = None
    
    def can_handle(self, task_type: TaskType) -> bool:
        """检查是否能处理任务"""
        return task_type in self.capabilities
    
    def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """执行任务"""
        self.is_busy = True
        self.current_task = task
        
        print(f"[{self.agent_id}] Executing task: {task['id']}")
        
        # 模拟任务执行
        result = {
            "task_id": task['id'],
            "agent_id": self.agent_id,
            "status": "completed",
            "result": f"Processed {task.get('data', 'unknown')}"
        }
        
        self.is_busy = False
        self.current_task = None
        return result

class MasterAgent:
    """主代理"""
    
    def __init__(self):
        self.slaves: List[SlaveAgent] = []
        self.task_queue: List[Dict[str, Any]] = []
        self.results: List[Dict[str, Any]] = []
    
    def register_slave(self, slave: SlaveAgent):
        """注册从代理"""
        self.slaves.append(slave)
        print(f"[Master] Registered slave: {slave.agent_id}")
    
    def submit_task(self, task: Dict[str, Any]):
        """提交任务"""
        self.task_queue.append(task)
        print(f"[Master] Task queued: {task['id']}")
    
    def assign_tasks(self):
        """分配任务给从代理"""
        while self.task_queue:
            task = self.task_queue.pop(0)
            task_type = TaskType[task['type'].upper()]
            
            # 找一个能处理该任务的闲置从代理
            slave = self._find_available_slave(task_type)
            
            if slave is None:
                # 没有可用的从代理,重新加入队列
                self.task_queue.insert(0, task)
                break
            
            # 分配任务
            result = slave.execute_task(task)
            self.results.append(result)
            
            print(f"[Master] Task {task['id']} assigned to {slave.agent_id}")
    
    def _find_available_slave(self, task_type: TaskType) -> SlaveAgent:
        """找一个可用的从代理"""
        for slave in self.slaves:
            if not slave.is_busy and slave.can_handle(task_type):
                return slave
        return None
    
    def get_results(self) -> List[Dict[str, Any]]:
        """获取所有结果"""
        return self.results

# 使用示例
print("=" * 70)
print("Master-Slave Model Example")
print("=" * 70 + "\n")

master = MasterAgent()

# 创建并注册从代理
slave1 = SlaveAgent("ImageProcessor1", [TaskType.IMAGE_PROCESSING])
slave2 = SlaveAgent("DataAnalyzer1", [TaskType.DATA_ANALYSIS])
slave3 = SlaveAgent("TextProcessor1", [TaskType.TEXT_PROCESSING])

master.register_slave(slave1)
master.register_slave(slave2)
master.register_slave(slave3)

# 提交任务
tasks = [
    {"id": "task_001", "type": "image_processing", "data": "image.jpg"},
    {"id": "task_002", "type": "data_analysis", "data": "data.csv"},
    {"id": "task_003", "type": "text_processing", "data": "document.txt"},
    {"id": "task_004", "type": "image_processing", "data": "photo.png"},
]

print("\nSubmitting tasks...")
for task in tasks:
    master.submit_task(task)

print("\nAssigning tasks...\n")
master.assign_tasks()

print("\nResults:")
for result in master.get_results():
    print(f"  {result['task_id']}: {result['status']} (by {result['agent_id']})")

2. Peer-to-Peer 模式

from typing import Callable, List, Dict, Any
import json

class PeerAgent:
    """对等代理"""
    
    def __init__(self, agent_id: str, role: str):
        self.agent_id = agent_id
        self.role = role
        self.peers: List['PeerAgent'] = []
        self.message_queue = []
        self.knowledge_base = {}
    
    def connect_to_peer(self, peer: 'PeerAgent'):
        """连接到另一个对等体"""
        if peer not in self.peers:
            self.peers.append(peer)
            peer.peers.append(self)
            print(f"[{self.agent_id}] Connected to {peer.agent_id}")
    
    def send_message(self, recipient_id: str, message: Dict[str, Any]):
        """发送消息给另一个代理"""
        for peer in self.peers:
            if peer.agent_id == recipient_id:
                peer.receive_message(self.agent_id, message)
                return
        print(f"[{self.agent_id}] Error: Peer {recipient_id} not found")
    
    def broadcast_message(self, message: Dict[str, Any]):
        """广播消息给所有连接的对等体"""
        for peer in self.peers:
            peer.receive_message(self.agent_id, message)
    
    def receive_message(self, sender_id: str, message: Dict[str, Any]):
        """接收消息"""
        self.message_queue.append({
            "from": sender_id,
            "message": message,
            "timestamp": len(self.message_queue)
        })
        print(f"[{self.agent_id}] Received message from {sender_id}: "
              f"{message.get('content', 'unknown')}")
    
    def process_messages(self):
        """处理消息队列"""
        while self.message_queue:
            msg_item = self.message_queue.pop(0)
            sender_id = msg_item['from']
            message = msg_item['message']
            
            self._handle_message(sender_id, message)
    
    def _handle_message(self, sender_id: str, message: Dict[str, Any]):
        """处理消息"""
        msg_type = message.get('type', 'unknown')
        
        if msg_type == 'query':
            response = self._answer_query(message.get('query', ''))
            self.send_message(sender_id, {
                "type": "response",
                "query": message.get('query'),
                "answer": response
            })
        elif msg_type == 'share_knowledge':
            self._update_knowledge(message.get('knowledge', {}))
            print(f"[{self.agent_id}] Knowledge updated from {sender_id}")
    
    def _answer_query(self, query: str) -> str:
        """回答查询"""
        if query in self.knowledge_base:
            return self.knowledge_base[query]
        return f"Unknown query: {query}"
    
    def _update_knowledge(self, knowledge: Dict):
        """更新知识库"""
        self.knowledge_base.update(knowledge)
    
    def add_knowledge(self, key: str, value: Any):
        """添加知识"""
        self.knowledge_base[key] = value

# 使用示例
print("\n" + "=" * 70)
print("Peer-to-Peer Model Example")
print("=" * 70 + "\n")

# 创建对等代理
agent_alice = PeerAgent("Alice", "researcher")
agent_bob = PeerAgent("Bob", "analyst")
agent_charlie = PeerAgent("Charlie", "engineer")

# 建立连接
agent_alice.connect_to_peer(agent_bob)
agent_bob.connect_to_peer(agent_charlie)

# 添加知识
agent_alice.add_knowledge("machine_learning", "ML is a subset of AI")
agent_bob.add_knowledge("data_science", "DS involves data analysis")
agent_charlie.add_knowledge("software_engineering", "SE is about building systems")

# 交互
print("Alice querying Bob about data science:")
agent_alice.send_message("Bob", {
    "type": "query",
    "query": "data_science"
})

print("\nBob processing messages:")
agent_bob.process_messages()

print("\nAlice sharing knowledge with all peers:")
agent_alice.broadcast_message({
    "type": "share_knowledge",
    "knowledge": {"research": "Latest findings on AI"}
})

print("\nBob and Charlie processing broadcast:")
agent_bob.process_messages()
agent_charlie.process_messages()

3. Publish-Subscribe 模式

from typing import Callable, List, Dict, Any, Set
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Event:
    """事件类"""
    event_type: str
    source: str
    data: Dict[str, Any]
    timestamp: float = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now().timestamp()

class EventBus:
    """事件总线"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}
        self.event_history: List[Event] = []
    
    def subscribe(self, event_type: str, callback: Callable):
        """订阅事件"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        
        self.subscribers[event_type].append(callback)
        print(f"Subscribed to {event_type}")
    
    def publish(self, event: Event):
        """发布事件"""
        print(f"\n[EventBus] Publishing event: {event.event_type} from {event.source}")
        print(f"  Data: {event.data}")
        
        self.event_history.append(event)
        
        # 通知所有订阅者
        if event.event_type in self.subscribers:
            for callback in self.subscribers[event.event_type]:
                callback(event)
        else:
            print(f"  No subscribers for {event.event_type}")
    
    def get_history(self, event_type: str = None) -> List[Event]:
        """获取事件历史"""
        if event_type is None:
            return self.event_history
        return [e for e in self.event_history if e.event_type == event_type]

class PubSubAgent:
    """发布-订阅代理"""
    
    def __init__(self, agent_id: str, event_bus: EventBus):
        self.agent_id = agent_id
        self.event_bus = event_bus
        self.state = {}
    
    def publish_event(self, event_type: str, data: Dict[str, Any]):
        """发布事件"""
        event = Event(
            event_type=event_type,
            source=self.agent_id,
            data=data
        )
        self.event_bus.publish(event)
    
    def subscribe_to_event(self, event_type: str):
        """订阅事件"""
        self.event_bus.subscribe(event_type, self._on_event)
    
    def _on_event(self, event: Event):
        """处理事件"""
        print(f"  → [{self.agent_id}] Received event: {event.event_type}")
        self._process_event(event)
    
    def _process_event(self, event: Event):
        """处理事件的业务逻辑"""
        pass

class SensorAgent(PubSubAgent):
    """传感器代理"""
    
    def __init__(self, agent_id: str, event_bus: EventBus):
        super().__init__(agent_id, event_bus)
    
    def detect_event(self, event_type: str, data: Dict):
        """检测事件"""
        print(f"[{self.agent_id}] Detecting: {event_type}")
        self.publish_event(event_type, data)
    
    def _process_event(self, event: Event):
        """处理其他代理发送的事件"""
        if event.source != self.agent_id:  # 不处理自己发送的事件
            print(f"  → [{self.agent_id}] Updating sensor based on {event.event_type}")

class AnalysisAgent(PubSubAgent):
    """分析代理"""
    
    def __init__(self, agent_id: str, event_bus: EventBus):
        super().__init__(agent_id, event_bus)
    
    def _process_event(self, event: Event):
        """处理事件"""
        print(f"  → [{self.agent_id}] Analyzing: {event.data}")
        
        # 发布分析结果
        analysis_result = {
            "original_event": event.event_type,
            "source": event.source,
            "analysis": f"Analyzed data from {event.source}",
            "severity": "high" if event.data.get("value", 0) > 50 else "low"
        }
        self.publish_event("analysis_result", analysis_result)

class AlertAgent(PubSubAgent):
    """告警代理"""
    
    def __init__(self, agent_id: str, event_bus: EventBus):
        super().__init__(agent_id, event_bus)
    
    def _process_event(self, event: Event):
        """处理事件"""
        if event.event_type == "analysis_result":
            severity = event.data.get("severity", "unknown")
            if severity == "high":
                print(f"  → [{self.agent_id}] ⚠️  ALERT! High severity detected: "
                      f"{event.data.get('analysis')}")
            else:
                print(f"  → [{self.agent_id}] ℹ️  Info: {event.data.get('analysis')}")

# 使用示例
print("\n" + "=" * 70)
print("Publish-Subscribe Model Example")
print("=" * 70)

# 创建事件总线
event_bus = EventBus()

# 创建代理
sensor = SensorAgent("TemperatureSensor", event_bus)
analyzer = AnalysisAgent("DataAnalyzer", event_bus)
alerter = AlertAgent("AlertSystem", event_bus)

# 订阅事件
sensor.subscribe_to_event("temperature_reading")
analyzer.subscribe_to_event("temperature_reading")
alerter.subscribe_to_event("analysis_result")

# 模拟传感器检测到温度升高
print("\n1. Sensor detects high temperature:")
sensor.detect_event("temperature_reading", {"value": 85, "unit": "celsius"})

print("\n2. Sensor detects normal temperature:")
sensor.detect_event("temperature_reading", {"value": 25, "unit": "celsius"})

最佳实践

  1. Master-Slave 模式
    • 适合有中央控制的场景
    • 主代理应该是可靠且性能好的
    • 支持故障转移和负载均衡
    • 实现超时和重试机制
  2. Peer-to-Peer 模式
    • 避免循环通信
    • 实现消息去重机制
    • 定义清晰的通信协议
    • 处理网络分区问题
  3. Publish-Subscribe 模式
    • 使用有序队列保证消息顺序
    • 支持消息持久化
    • 实现消息过滤和路由
    • 处理事件溢出
  4. 通用最佳实践
    • 定义标准的消息格式
    • 实现消息序列化/反序列化
    • 添加日志和监控
    • 处理网络延迟和故障
    • 实现优雅关闭

适用场景

  • 分布式系统
  • 微服务架构
  • 实时协作系统
  • IoT 系统
  • 游戏多人交互

6. Tool Use / Function Calling

模式描述

Agent 通过调用外部工具或函数来扩展自身的能力。这是现代 LLM-based Agent 的关键特性。

架构图

┌─────────────────┐
│    Agent        │
│  (LLM-based)    │
└────────┬────────┘
         │
    ┌────▼─────────────────────┐
    │ Analyze Task             │
    │ Decide which tools to use│
    └────┬──────────────────────┘
         │
    ┌────▼──────────┐
    │ Tool Selection│
    └────┬──────────┘
         │
    ┌────┴────────────────────┐
    │                         │
┌───▼────┐    ┌──────┐   ┌──▼──┐
│ Tool 1  │    │Tool 2│   │Tool3│
│  API   │    │ Web  │   │ DB  │
└────────┘    └──────┘   └─────┘

代码案例

import json
from typing import Any, Dict, List, Callable
from dataclasses import dataclass

@dataclass
class ToolDefinition:
    """工具定义"""
    name: str
    description: str
    parameters: Dict[str, Any]
    function: Callable

class ToolRegistry:
    """工具注册表"""
    
    def __init__(self):
        self.tools: Dict[str, ToolDefinition] = {}
    
    def register_tool(self, tool_def: ToolDefinition):
        """注册工具"""
        self.tools[tool_def.name] = tool_def
        print(f"Tool registered: {tool_def.name}")
    
    def get_tool(self, name: str) -> ToolDefinition:
        """获取工具"""
        return self.tools.get(name)
    
    def list_tools(self) -> List[str]:
        """列出所有工具"""
        return list(self.tools.keys())
    
    def get_tools_schema(self) -> List[Dict[str, Any]]:
        """获取工具的 JSON Schema"""
        schema = []
        for tool in self.tools.values():
            schema.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters
                }
            })
        return schema

class ToolUsingAgent:
    """使用工具的代理"""
    
    def __init__(self, tool_registry: ToolRegistry):
        self.tool_registry = tool_registry
        self.execution_history = []
    
    def select_and_execute_tool(self, 
                                tool_name: str, 
                                tool_args: Dict[str, Any]) -> Any:
        """选择并执行工具"""
        tool = self.tool_registry.get_tool(tool_name)
        
        if tool is None:
            return {"error": f"Tool '{tool_name}' not found"}
        
        print(f"\n[Agent] Executing tool: {tool_name}")
        print(f"  Arguments: {tool_args}")
        
        try:
            result = tool.function(**tool_args)
            
            # 记录执行历史
            self.execution_history.append({
                "tool": tool_name,
                "args": tool_args,
                "result": result,
                "status": "success"
            })
            
            print(f"  Result: {result}")
            return result
        
        except Exception as e:
            error_msg = str(e)
            self.execution_history.append({
                "tool": tool_name,
                "args": tool_args,
                "error": error_msg,
                "status": "failed"
            })
            print(f"  Error: {error_msg}")
            return {"error": error_msg}
    
    def get_execution_history(self) -> List[Dict]:
        """获取执行历史"""
        return self.execution_history

# 定义具体的工具函数
def search_web(query: str) -> str:
    """Web 搜索"""
    # 模拟 Web 搜索
    results = {
        "python": "Python is a high-level programming language",
        "ai": "AI stands for Artificial Intelligence",
        "machine learning": "ML is a subset of AI focused on learning from data"
    }
    return results.get(query.lower(), "No results found")

def calculate(expression: str) -> float:
    """数学计算"""
    try:
        result = eval(expression)
        return float(result)
    except Exception as e:
        raise ValueError(f"Invalid expression: {expression}")

def get_current_weather(city: str) -> Dict[str, Any]:
    """获取天气"""
    # 模拟天气 API
    weather_data = {
        "seattle": {"temp": 15, "condition": "rainy"},
        "london": {"temp": 10, "condition": "cloudy"},
        "beijing": {"temp": 25, "condition": "sunny"}
    }
    return weather_data.get(city.lower(), {"error": "City not found"})

def query_database(sql: str) -> List[Dict]:
    """查询数据库"""
    # 模拟数据库查询
    if "users" in sql.lower():
        return [
            {"id": 1, "name": "Alice", "email": "[email protected]"},
            {"id": 2, "name": "Bob", "email": "[email protected]"}
        ]
    return []

# 创建工具注册表
registry = ToolRegistry()

# 注册工具
registry.register_tool(ToolDefinition(
    name="search_web",
    description="Search the web for information",
    parameters={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "The search query"
            }
        },
        "required": ["query"]
    },
    function=search_web
))

registry.register_tool(ToolDefinition(
    name="calculate",
    description="Perform mathematical calculations",
    parameters={
        "type": "object",
        "properties": {
            "expression": {
                "type": "string",
                "description": "Mathematical expression (e.g., '2+2' or 'sqrt(16)')"
            }
        },
        "required": ["expression"]
    },
    function=calculate
))

registry.register_tool(ToolDefinition(
    name="get_weather",
    description="Get current weather for a city",
    parameters={
        "type": "object",
        "properties": {
            "city": {
                "type": "string",
                "description": "City name"
            }
        },
        "required": ["city"]
    },
    function=get_current_weather
))

registry.register_tool(ToolDefinition(
    name="query_db",
    description="Query the database",
    parameters={
        "type": "object",
        "properties": {
            "sql": {
                "type": "string",
                "description": "SQL query"
            }
        },
        "required": ["sql"]
    },
    function=query_database
))

# 使用示例
print("=" * 70)
print("Tool Use / Function Calling Example")
print("=" * 70)

agent = ToolUsingAgent(registry)

print("\nAvailable tools:")
for tool_name in agent.tool_registry.list_tools():
    print(f"  - {tool_name}")

print("\n" + "-" * 70)
print("Tool Invocations:")
print("-" * 70)

# 调用不同的工具
agent.select_and_execute_tool("search_web", {"query": "Python"})
agent.select_and_execute_tool("calculate", {"expression": "2**10"})
agent.select_and_execute_tool("get_weather", {"city": "Seattle"})
agent.select_and_execute_tool("query_db", {"sql": "SELECT * FROM users"})

print("\n" + "-" * 70)
print("Execution History:")
print("-" * 70)

for i, entry in enumerate(agent.get_execution_history(), 1):
    print(f"{i}. Tool: {entry['tool']}")
    print(f"   Status: {entry['status']}")
    if entry['status'] == 'success':
        print(f"   Result: {entry['result']}")
    else:
        print(f"   Error: {entry['error']}")

最佳实践

  1. 工具设计
    • 功能单一明确
    • 参数定义清晰
    • 返回结构化数据
    • 提供明确的错误信息
  2. 工具管理
    • 维护工具注册表
    • 支持工具的热加载
    • 版本管理
    • 文档完整
  3. 调用管理
    • 实现重试机制
    • 设置超时限制
    • 记录调用历史
    • 性能监控
  4. 安全性
    • 验证输入参数
    • 限制工具权限
    • 审计敏感操作
    • 防止注入攻击

适用场景

  • AI 助手
  • 自动化系统
  • 数据处理管道
  • 集成系统

7. Memory Patterns(记忆模式)

模式描述

Agent 管理不同类型的记忆来保存和利用历史信息:

  • 短期记忆:当前任务相关
  • 长期记忆:历史经验和知识
  • 情节记忆:重要事件记录

记忆层级

┌──────────────────────────────────┐
│  Working Memory (短期工作记忆)    │
│  - 当前对话内容                  │
│  - 当前任务信息                  │
│  - 临时变量                      │
│  (容量小,衰减快)                │
└──────────────────────────────────┘
            ↓ 提炼
┌──────────────────────────────────┐
│  Episodic Memory (情节记忆)       │
│  - 重要事件                      │
│  - 成功/失败案例                 │
│  - 用户交互历史                  │
│  (中等容量,中等衰减)            │
└──────────────────────────────────┘
            ↓ 归纳
┌──────────────────────────────────┐
│  Semantic Memory (语义记忆)       │
│  - 事实和概念                    │
│  - 规则和约束                    │
│  - 领域知识                      │
│  (大容量,不衰减)                │
└──────────────────────────────────┘

代码案例

from typing import Any, Dict, List
from datetime import datetime
from collections import deque

class WorkingMemory:
    """短期工作记忆"""
    
    def __init__(self, capacity: int = 10):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)
        self.variables = {}
    
    def add(self, item: Dict[str, Any]):
        """添加项目"""
        item['timestamp'] = datetime.now().isoformat()
        self.buffer.append(item)
    
    def get_recent(self, n: int = 5) -> List[Dict]:
        """获取最近的 n 项"""
        return list(self.buffer)[-n:]
    
    def set_variable(self, key: str, value: Any):
        """设置变量"""
        self.variables[key] = value
    
    def get_variable(self, key: str) -> Any:
        """获取变量"""
        return self.variables.get(key)
    
    def clear(self):
        """清空"""
        self.buffer.clear()
        self.variables.clear()

class EpisodicMemory:
    """情节记忆"""
    
    def __init__(self):
        self.episodes = []
    
    def record_episode(self, event: str, details: Dict[str, Any]):
        """记录事件"""
        episode = {
            "event": event,
            "details": details,
            "timestamp": datetime.now().isoformat(),
            "episode_id": len(self.episodes)
        }
        self.episodes.append(episode)
        print(f"[EpisodicMemory] Recorded episode: {event}")
    
    def recall_similar_episodes(self, 
                               event_type: str, 
                               limit: int = 5) -> List[Dict]:
        """回忆相似的事件"""
        similar = [e for e in self.episodes 
                  if event_type.lower() in e['event'].lower()]
        return similar[-limit:]
    
    def get_success_examples(self) -> List[Dict]:
        """获取成功的例子"""
        return [e for e in self.episodes 
               if e.get('details', {}).get('success')]
    
    def get_failure_examples(self) -> List[Dict]:
        """获取失败的例子"""
        return [e for e in self.episodes 
               if not e.get('details', {}).get('success')]

class SemanticMemory:
    """语义记忆"""
    
    def __init__(self):
        self.facts = {}
        self.rules = []
        self.concepts = {}
    
    def add_fact(self, subject: str, predicate: str, obj: str):
        """添加事实"""
        key = f"{subject}:{predicate}"
        self.facts[key] = obj
        print(f"[SemanticMemory] Fact added: {subject} {predicate} {obj}")
    
    def add_rule(self, condition: str, action: str):
        """添加规则"""
        self.rules.append({"condition": condition, "action": action})
        print(f"[SemanticMemory] Rule added: IF {condition} THEN {action}")
    
    def add_concept(self, name: str, definition: str, properties: Dict):
        """添加概念"""
        self.concepts[name] = {
            "definition": definition,
            "properties": properties
        }
        print(f"[SemanticMemory] Concept added: {name}")
    
    def query_fact(self, subject: str, predicate: str) -> str:
        """查询事实"""
        key = f"{subject}:{predicate}"
        return self.facts.get(key, "Unknown")
    
    def get_applicable_rules(self, condition: str) -> List[Dict]:
        """获取适用的规则"""
        return [r for r in self.rules 
               if condition.lower() in r['condition'].lower()]

class MemoryManager:
    """内存管理器"""
    
    def __init__(self):
        self.working_memory = WorkingMemory()
        self.episodic_memory = EpisodicMemory()
        self.semantic_memory = SemanticMemory()
    
    def update_context(self, context: Dict[str, Any]):
        """更新上下文"""
        for key, value in context.items():
            self.working_memory.set_variable(key, value)
    
    def record_interaction(self, interaction_type: str, 
                          details: Dict[str, Any]):
        """记录交互"""
        self.working_memory.add({
            "type": interaction_type,
            "details": details
        })
        
        # 如果是重要事件,记录到情节记忆
        if details.get('important', False):
            self.episodic_memory.record_episode(interaction_type, details)
    
    def get_context_summary(self) -> str:
        """获取上下文摘要"""
        recent = self.working_memory.get_recent(3)
        context = "Current context:\n"
        for item in recent:
            context += f"  - {item['type']}: {item.get('details', {})}\n"
        return context
    
    def learn_from_experience(self, success: bool, 
                             event_details: Dict[str, Any]):
        """从经验中学习"""
        event_details['success'] = success
        self.episodic_memory.record_episode(
            "experience",
            event_details
        )
        
        # 从成功的经验中提取规则
        if success:
            condition = event_details.get('condition', 'Unknown')
            action = event_details.get('action', 'Unknown')
            self.semantic_memory.add_rule(condition, action)

class MemoryAwareAgent:
    """具有记忆功能的代理"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.memory = MemoryManager()
    
    def think_and_act(self, situation: Dict[str, Any]) -> str:
        """思考和行动"""
        print(f"\n[{self.agent_id}] Thinking and acting...")
        
        # 记录当前情况
        self.memory.record_interaction("situation", situation)
        
        # 从短期记忆获取上下文
        context_summary = self.memory.get_context_summary()
        print(context_summary)
        
        # 从长期记忆回忆相似的情节
        event_type = situation.get('type', 'unknown')
        similar_episodes = self.memory.episodic_memory.recall_similar_episodes(
            event_type
        )
        
        if similar_episodes:
            print(f"\nRecalled {len(similar_episodes)} similar episodes:")
            for ep in similar_episodes[-2:]:  # 显示最近的 2 个
                print(f"  - {ep['event']}: {ep['details']}")
        
        # 应用语义规则做决策
        applicable_rules = self.memory.semantic_memory.get_applicable_rules(
            event_type
        )
        
        if applicable_rules:
            print(f"\nApplying {len(applicable_rules)} applicable rules:")
            for rule in applicable_rules:
                print(f"  IF {rule['condition']} THEN {rule['action']}")
        
        # 做出决策和行动
        decision = self._make_decision(situation, similar_episodes, applicable_rules)
        print(f"Decision: {decision}")
        
        # 记录学习
        success = True  # 模拟成功
        self.memory.learn_from_experience(success, {
            "condition": event_type,
            "action": decision
        })
        
        return decision
    
    def _make_decision(self, situation, episodes, rules):
        """做出决策"""
        # 简化的决策逻辑
        if rules:
            return f"Execute: {rules[0]['action']}"
        elif episodes and episodes[-1].get('details', {}).get('success'):
            return f"Repeat successful action from similar episode"
        else:
            return "Explore new action"

# 使用示例
print("=" * 70)
print("Memory Patterns Example")
print("=" * 70)

# 创建代理
agent = MemoryAwareAgent("LearningAgent")

# 初始化语义知识
agent.memory.semantic_memory.add_fact("Python", "is_language", "programming")
agent.memory.semantic_memory.add_fact("AI", "contains", "Machine Learning")
agent.memory.semantic_memory.add_concept(
    "Problem Solving",
    "Process of finding solutions",
    {"steps": ["analyze", "plan", "execute", "review"]}
)

print("\n" + "-" * 70)
print("Scenario 1: Handle a problem")
print("-" * 70)

agent.think_and_act({
    "type": "problem_solving",
    "problem": "User asked about Python",
    "urgency": "high"
})

print("\n" + "-" * 70)
print("Scenario 2: Handle similar situation again")
print("-" * 70)

agent.think_and_act({
    "type": "problem_solving",
    "problem": "User asked about AI",
    "urgency": "medium"
})

print("\n" + "-" * 70)
print("Scenario 3: Handle different situation")
print("-" * 70)

agent.think_and_act({
    "type": "customer_service",
    "problem": "Customer complaint",
    "urgency": "high"
})

print("\n" + "-" * 70)
print("Memory Summary")
print("-" * 70)

print("\nWorking Memory Variables:")
for key, value in agent.memory.working_memory.variables.items():
    print(f"  {key}: {value}")

print("\nEpisodic Memory (experiences):")
for ep in agent.memory.episodic_memory.episodes[-3:]:
    print(f"  - {ep['event']} (Success: {ep['details'].get('success')})")

print("\nSemantic Memory (rules):")
for rule in agent.memory.semantic_memory.rules:
    print(f"  IF {rule['condition']} THEN {rule['action']}")

最佳实践

  1. 短期记忆
    • 维持合理大小(避免过大导致干扰)
    • 自动遗忘旧项目
    • 快速访问
    • 存储当前任务状态
  2. 情节记忆
    • 记录关键事件和结果
    • 支持相似性搜索
    • 成功/失败分类
    • 定期清理过旧数据
  3. 语义记忆
    • 存储抽象知识和规则
    • 支持快速查询
    • 版本控制
    • 定期审查和更新
  4. 集成策略
    • 定期从短期转入长期
    • 支持遗忘和更新
    • 优先级管理
    • 一致性检查

适用场景

  • 对话系统
  • 个人助手
  • 学习系统
  • 长期交互

8. State Machine(状态机)

模式描述

Agent 在不同状态之间转换,每个状态有明确的进入条件、行为和退出条件。这提供了清晰的行为模式和容易的调试。

状态图示例

┌──────────────┐
│   IDLE       │ (初始状态)
└──────┬───────┘
       │ receive_task
       │
┌──────▼──────────────┐
│   ANALYZING         │ (分析阶段)
└──────┬──────────────┘
       │ analysis_complete
       │
┌──────▼──────────────┐
│   PLANNING          │ (规划阶段)
└──────┬──────────────┘
       │ plan_ready
       │
┌──────▼──────────────┐
│   EXECUTING         │ (执行阶段)
└──────┬──────────────┘
       │ execution_complete
       │
┌──────▼──────────────┐
│   REVIEWING         │ (审查阶段)
└──────┬──────────────┘
       │ review_complete
       │
┌──────▼──────────────┐
│   COMPLETED         │ (完成)
└─────────────────────┘

代码案例

from enum import Enum
from typing import Callable, Dict, Any, Optional
from dataclasses import dataclass

class State(Enum):
    """状态枚举"""
    IDLE = "idle"
    ANALYZING = "analyzing"
    PLANNING = "planning"
    EXECUTING = "executing"
    REVIEWING = "reviewing"
    COMPLETED = "completed"
    ERROR = "error"

@dataclass
class Transition:
    """状态转换"""
    from_state: State
    to_state: State
    trigger: str
    condition: Optional[Callable] = None
    action: Optional[Callable] = None

class StateMachine:
    """状态机"""
    
    def __init__(self, initial_state: State):
        self.current_state = initial_state
        self.transitions: Dict[str, Transition] = {}
        self.state_handlers: Dict[State, Dict[str, Callable]] = {}
        self.history = [initial_state]
    
    def add_transition(self, transition: Transition):
        """添加状态转换"""
        key = f"{transition.from_state.value}:{transition.trigger}"
        self.transitions[key] = transition
    
    def add_state_handler(self, state: State, 
                         handler_type: str, 
                         handler: Callable):
        """添加状态处理器"""
        if state not in self.state_handlers:
            self.state_handlers[state] = {}
        self.state_handlers[state][handler_type] = handler
    
    def trigger_event(self, event: str) -> bool:
        """触发事件"""
        key = f"{self.current_state.value}:{event}"
        
        if key not in self.transitions:
            print(f"[StateMachine] No transition for event '{event}' "
                  f"in state '{self.current_state.value}'")
            return False
        
        transition = self.transitions[key]
        
        # 检查条件
        if transition.condition and not transition.condition():
            print(f"[StateMachine] Transition condition failed")
            return False
        
        # 执行退出处理
        self._execute_state_handler(self.current_state, 'on_exit')
        
        # 执行转换动作
        if transition.action:
            transition.action()
        
        # 转换状态
        old_state = self.current_state
        self.current_state = transition.to_state
        self.history.append(self.current_state)
        
        print(f"[StateMachine] Transitioned from {old_state.value} "
              f"to {self.current_state.value}")
        
        # 执行进入处理
        self._execute_state_handler(self.current_state, 'on_enter')
        
        return True
    
    def _execute_state_handler(self, state: State, handler_type: str):
        """执行状态处理器"""
        if state in self.state_handlers:
            handler = self.state_handlers[state].get(handler_type)
            if handler:
                handler()
    
    def get_current_state(self) -> State:
        """获取当前状态"""
        return self.current_state
    
    def get_history(self) -> list:
        """获取历史"""
        return [s.value for s in self.history]

class TaskProcessingAgent:
    """任务处理代理"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.state_machine = StateMachine(State.IDLE)
        self.task = None
        self.analysis_result = None
        self.plan = None
        self.execution_result = None
        self.review_result = None
        
        self._setup_state_machine()
    
    def _setup_state_machine(self):
        """设置状态机"""
        sm = self.state_machine
        
        # 添加转换
        sm.add_transition(Transition(
            State.IDLE, State.ANALYZING,
            "receive_task",
            action=self._on_receive_task
        ))
        
        sm.add_transition(Transition(
            State.ANALYZING, State.PLANNING,
            "analysis_complete",
            action=self._on_analysis_complete
        ))
        
        sm.add_transition(Transition(
            State.ANALYZING, State.ERROR,
            "analysis_failed",
            action=self._on_analysis_failed
        ))
        
        sm.add_transition(Transition(
            State.PLANNING, State.EXECUTING,
            "plan_ready",
            action=self._on_plan_ready
        ))
        
        sm.add_transition(Transition(
            State.PLANNING, State.ERROR,
            "planning_failed",
            action=self._on_planning_failed
        ))
        
        sm.add_transition(Transition(
            State.EXECUTING, State.REVIEWING,
            "execution_complete",
            action=self._on_execution_complete
        ))
        
        sm.add_transition(Transition(
            State.EXECUTING, State.ERROR,
            "execution_failed",
            action=self._on_execution_failed
        ))
        
        sm.add_transition(Transition(
            State.REVIEWING, State.COMPLETED,
            "review_complete",
            action=self._on_review_complete
        ))
        
        # 添加状态处理器
        sm.add_state_handler(State.IDLE, 'on_enter', 
                            lambda: print(f"[{self.agent_id}] Ready for new tasks"))
        
        sm.add_state_handler(State.ANALYZING, 'on_enter',
                            lambda: print(f"[{self.agent_id}] Analyzing task..."))
        
        sm.add_state_handler(State.PLANNING, 'on_enter',
                            lambda: print(f"[{self.agent_id}] Planning execution..."))
        
        sm.add_state_handler(State.EXECUTING, 'on_enter',
                            lambda: print(f"[{self.agent_id}] Executing plan..."))
        
        sm.add_state_handler(State.REVIEWING, 'on_enter',
                            lambda: print(f"[{self.agent_id}] Reviewing results..."))
        
        sm.add_state_handler(State.COMPLETED, 'on_enter',
                            lambda: print(f"[{self.agent_id}] Task completed successfully"))
        
        sm.add_state_handler(State.ERROR, 'on_enter',
                            lambda: print(f"[{self.agent_id}] Error occurred during processing"))
    
    def receive_task(self, task: Dict[str, Any]):
        """接收任务"""
        self.task = task
        print(f"\n[{self.agent_id}] Received task: {task.get('name')}")
        self.state_machine.trigger_event("receive_task")
    
    def _on_receive_task(self):
        """处理接收任务"""
        print(f"  Task details: {self.task}")
    
    def analyze_task(self):
        """分析任务"""
        print(f"  Analyzing... ")
        self.analysis_result = {
            "complexity": "high",
            "dependencies": ["resource_a", "resource_b"],
            "estimated_time": 100
        }
        print(f"  Analysis result: {self.analysis_result}")
        self.state_machine.trigger_event("analysis_complete")
    
    def _on_analysis_complete(self):
        """分析完成后的处理"""
        print(f"  Analysis completed successfully")
    
    def _on_analysis_failed(self):
        """分析失败处理"""
        print(f"  Analysis failed")
    
    def plan_execution(self):
        """规划执行"""
        print(f"  Planning... ")
        self.plan = [
            "Step 1: Initialize resources",
            "Step 2: Process data",
            "Step 3: Generate results",
            "Step 4: Cleanup"
        ]
        print(f"  Plan created: {len(self.plan)} steps")
        self.state_machine.trigger_event("plan_ready")
    
    def _on_plan_ready(self):
        """计划就绪处理"""
        print(f"  Plan is ready for execution")
    
    def _on_planning_failed(self):
        """规划失败处理"""
        print(f"  Planning failed")
    
    def execute_plan(self):
        """执行计划"""
        print(f"  Executing plan...")
        self.execution_result = {
            "steps_completed": len(self.plan),
            "success": True,
            "duration": 95
        }
        print(f"  Execution result: {self.execution_result}")
        self.state_machine.trigger_event("execution_complete")
    
    def _on_execution_complete(self):
        """执行完成处理"""
        print(f"  Execution completed")
    
    def _on_execution_failed(self):
        """执行失败处理"""
        print(f"  Execution failed")
    
    def review_results(self):
        """审查结果"""
        print(f"  Reviewing results...")
        self.review_result = {
            "quality": "high",
            "meets_requirements": True,
            "issues": []
        }
        print(f"  Review result: Quality {self.review_result['quality']}")
        self.state_machine.trigger_event("review_complete")
    
    def _on_review_complete(self):
        """审查完成处理"""
        print(f"  Review completed successfully")
    
    def run_workflow(self, task: Dict[str, Any]):
        """运行完整工作流"""
        self.receive_task(task)
        self.analyze_task()
        self.plan_execution()
        self.execute_plan()
        self.review_results()
        
        print(f"\nState transitions history: {self.state_machine.get_history()}")
        print(f"Final state: {self.state_machine.get_current_state().value}")

# 使用示例
print("=" * 70)
print("State Machine Pattern Example")
print("=" * 70)

agent = TaskProcessingAgent("ProcessingAgent1")

task = {
    "name": "Data Processing Task",
    "description": "Process customer data",
    "priority": "high"
}

agent.run_workflow(task)

最佳实践

  1. 状态定义
    • 状态应该明确且互斥
    • 避免过多状态(通常不超过 10 个)
    • 使用枚举定义状态
  2. 转换管理
    • 明确定义所有可能的转换
    • 提供保护条件
    • 实现超时处理
  3. 错误处理
    • 添加 ERROR 状态
    • 实现错误恢复机制
    • 记录状态转换日志
  4. 监控和调试
    • 记录状态转换历史
    • 添加状态转换验证
    • 可视化状态图
  5. 性能优化
    • 缓存常用转换
    • 避免状态中的长时间操作
    • 使用异步转换

适用场景

  • 工作流管理
  • 用户交互管理
  • 设备控制
  • 游戏逻辑
  • 会话管理

总结与对比

模式 复杂度 适用场景 优点 缺点
Agentic Loop 实时系统 简单、直观 无法处理复杂任务
ReAct 问答系统 推理明确、可解释 需要 LLM 支持
Planner-Executor 项目管理 任务分解清晰 计划可能不适应变化
Hierarchical 大规模系统 可扩展、可维护 通信复杂
Multi-Agent 分布式系统 灵活、容错 同步困难
Tool Use 集成系统 扩展能力 工具管理复杂
Memory 长期交互 学习能力 内存管理复杂
State Machine 流程系统 清晰、易调试 扩展性有限

选择建议

  1. 简单任务:使用 Agentic Loop 或 State Machine
  2. 复杂推理:使用 ReAct
  3. 大规模任务:使用 Planner-Executor
  4. 分布式系统:使用 Hierarchical 或 Multi-Agent
  5. 需要学习:添加 Memory
  6. 需要集成:使用 Tool Use

通常实际应用会结合多个模式使用,形成混合架构。