MetaGPT - 完整学习教程

MetaGPT - 完整学习教程

教程级别: 从零到一 预计学习时间: 6-8 小时 前置知识: Python 基础(函数、类、异步 async/await)、命令行基本操作、LLM API 基本概念(了解 GPT-4、Prompt、上下文窗口)

环境搭建指南

系统要求

  • 操作系统:Linux / macOS / Windows(WSL2)
  • Python 3.9+
  • Git 2.30+
  • OpenAI API Key(或其他支持的 LLM 后端)

安装步骤

# 1. 创建项目目录
mkdir metagpt-lab && cd metagpt-lab

# 2. 创建 Python 虚拟环境
python3 -m venv venv
source venv/bin/activate

# 3. 安装 MetaGPT
pip install metagpt

# 4. 创建配置文件目录
mkdir -p ~/.metagpt

创建配置文件 ~/.metagpt/config2.yaml

# MetaGPT 配置文件
llm:
  api_type: "openai"
  model: "gpt-4"
  api_key: "sk-your-api-key-here"
  base_url: "https://api.openai.com/v1"

验证安装

# 确认 Python 版本
python3 --version
# Python 3.9.x 或更高

# 确认 MetaGPT 安装成功
python3 -c "import metagpt; print('MetaGPT 安装成功')"

# 验证配置文件
python3 -c "
from metagpt.const import METAGPT_ROOT
import os
config_path = os.path.expanduser('~/.metagpt/config2.yaml')
if os.path.exists(config_path):
    print(f'配置文件存在: {config_path}')
else:
    print('警告: 配置文件不存在,请创建 ~/.metagpt/config2.yaml')
"

注意事项: - 本教程使用 MetaGPT 的 Python API 模式,部分示例使用模拟数据演示核心概念 - 如果没有 LLM API Key,可以使用模拟模式学习框架概念 - 实际调用 LLM 的代码示例需要有效的 API Key 和网络连接


第一部分:入门篇

1.1 理解 SOP 驱动的多 Agent 协作

概念讲解:

MetaGPT 的核心理念是 Code = SOP(Team)——将人类软件工程中的标准操作流程(Standard Operating Procedure,SOP)编码为 Agent 的提示词序列,让多个专业化 Agent 按流水线协作完成任务。

这就像一家软件公司:产品经理写需求文档,架构师做系统设计,工程师按设计编码。每个角色只做自己的专业工作,不越界。MetaGPT 将这个流程编码为 Agent 协作规则。

为什么 SOP 如此重要?消融实验证明: - 仅工程师(无 SOP):代码可执行性 1.0/4.0 - 加入产品经理:跃升至 3.0/4.0 - 完整流水线(5 个角色):达到 3.9/4.0

代码示例:

# 文件名:01-understand-sop.py
# 演示:SOP 驱动 vs 无 SOP 的效果对比

import random

# 模拟一个软件开发任务
TASK = "创建一个贪吃蛇游戏"

# 模拟无 SOP 的单 Agent 工作模式
def single_agent_mode(task):
    """模拟没有 SOP 的单 Agent——试图一次完成所有工作"""
    print(f"=== 无 SOP 模式:单 Agent 处理 '{task}' ===")
    print("  Agent 同时承担:需求分析 + 架构设计 + 编码 + 测试")
    print("  问题:角色职责混乱,容易遗漏关键步骤")

    # 模拟各步骤质量(单 Agent 质量较低)
    quality = {
        "需求分析": random.uniform(0.3, 0.5),
        "架构设计": random.uniform(0.3, 0.5),
        "代码实现": random.uniform(0.4, 0.6),
        "测试验证": random.uniform(0.2, 0.4),
    }

    overall = sum(quality.values()) / len(quality)
    print(f"  各步骤质量: {quality}")
    print(f"  整体可执行性: {overall:.2f}/1.0 (模拟)\n")
    return overall


# 模拟 SOP 驱动的多 Agent 工作模式
def sop_driven_mode(task):
    """模拟 SOP 驱动的五角色流水线——每个角色专业化"""
    print(f"=== SOP 模式:五角色流水线处理 '{task}' ===")

    # 角色流水线(按 SOP 顺序执行)
    roles = [
        {"name": "产品经理", "step": "需求分析", "quality_boost": 0.15},
        {"name": "架构师", "step": "系统设计", "quality_boost": 0.15},
        {"name": "项目经理", "step": "任务分解", "quality_boost": 0.10},
        {"name": "工程师", "step": "代码实现", "quality_boost": 0.20},
        {"name": "QA 工程师", "step": "测试验证", "quality_boost": 0.15},
    ]

    pipeline_quality = []
    upstream_output = task  # 上游输出作为下游输入

    for i, role in enumerate(roles):
        # 每个 Agent 的质量 = 基础质量 + 专业化的质量提升
        base_quality = random.uniform(0.6, 0.8)
        boosted_quality = min(1.0, base_quality + role["quality_boost"])

        pipeline_quality.append(boosted_quality)
        print(f"  {role['name']}: {role['step']}")
        print(f"    输入: 上游的{roles[i-1]['step'] if i > 0 else '原始需求'}")
        print(f"    输出质量: {boosted_quality:.2f}")
        upstream_output = f"{role['step']}结果"

    overall = sum(pipeline_quality) / len(pipeline_quality)
    print(f"\n  整体可执行性: {overall:.2f}/1.0 (模拟)")
    return overall


# 运行对比
print("=" * 60)
print("MetaGPT 核心概念:SOP 驱动 vs 无 SOP")
print("=" * 60)
print()

random.seed(42)  # 固定随机种子,便于复现
no_sop = single_agent_mode(TASK)
with_sop = sop_driven_mode(TASK)

print("=" * 60)
print(f"对比结果:")
print(f"  无 SOP 模式可执行性: {no_sop:.2f}")
print(f"  SOP 模式可执行性:    {with_sop:.2f}")
print(f"  提升: {(with_sop - no_sop) / no_sop * 100:.1f}%")
print("=" * 60)
# 运行示例
python3 01-understand-sop.py

执行结果:

============================================================
MetaGPT 核心概念:SOP 驱动 vs 无 SOP
============================================================

=== 无 SOP 模式:单 Agent 处理 '创建一个贪吃蛇游戏' ===
  Agent 同时承担:需求分析 + 架构设计 + 编码 + 测试
  问题:角色职责混乱,容易遗漏关键步骤
  各步骤质量: {'需求分析': 0.40, '架构设计': 0.35, '代码实现': 0.55, '测试验证': 0.30}
  整体可执行性: 0.40/1.0 (模拟)

=== SOP 模式:五角色流水线处理 '创建一个贪吃蛇游戏' ===
  产品经理: 需求分析
    输入: 上游的原始需求
    输出质量: 0.88
  架构师: 系统设计
    输入: 上游的需求分析
    输出质量: 0.83
  项目经理: 任务分解
    输入: 上游的系统设计
    输出质量: 0.85
  工程师: 代码实现
    输入: 上游的任务分解
    输出质量: 0.90
  QA 工程师: 测试验证
    输入: 上游的代码实现
    输出质量: 0.87

  整体可执行性: 0.87/1.0 (模拟)

============================================================
对比结果:
  无 SOP 模式可执行性: 0.40
  SOP 模式可执行性:    0.87
  提升: 117.5%
============================================================

练习题: 1. 修改代码,移除"产品经理"角色,观察对整体质量的影响。对比 MetaGPT 消融实验中"工程师+架构师+项目经理"(无产品经理)的效果。 2. 尝试将角色顺序打乱(如工程师先执行、产品经理后执行),观察质量变化,理解为什么 SOP 的顺序很重要。


1.2 使用 CLI 快速生成项目

概念讲解:

MetaGPT 提供了最简单的使用方式——CLI(命令行界面)。一行命令即可启动 SOP 流水线,从自然语言需求生成完整的软件项目。

CLI 模式的工作流程: 1. MetaGPT 解析用户输入的需求文本 2. 按照预设的 SOP 启动五角色流水线 3. 产品经理 → 架构师 → 项目经理 → 工程师依次执行 4. 最终输出包含文档和代码的完整项目目录

代码示例:

# 文件名:02-cli-simulation.py
# 演示:模拟 MetaGPT CLI 的工作流程

import json
import os
from datetime import datetime


def simulate_cli_workflow(requirement, output_dir="/tmp/metagpt-demo"):
    """模拟 MetaGPT CLI 的工作流程"""

    print(f"MetaGPT CLI 模拟")
    print(f"输入需求: {requirement}")
    print(f"输出目录: {output_dir}\n")

    os.makedirs(output_dir, exist_ok=True)

    # 阶段 1:产品经理——需求分析
    print("=" * 50)
    print("[阶段 1/5] 产品经理 — 需求分析")
    print("=" * 50)

    product_doc = {
        "project": requirement,
        "user_stories": [
            f"作为用户,我希望能够{requirement.replace('创建一个', '')}",
            "作为用户,我希望有清晰的操作界面",
            "作为用户,我希望能看到操作结果",
        ],
        "competitive_analysis": [
            "分析现有同类产品的优缺点",
            "确定差异化特性",
        ],
        "requirements": [
            {"id": 1, "name": "核心功能", "priority": "P0"},
            {"id": 2, "name": "用户界面", "priority": "P1"},
            {"id": 3, "name": "交互逻辑", "priority": "P1"},
        ],
        "created_at": datetime.now().isoformat(),
    }

    doc_path = os.path.join(output_dir, "requirement.json")
    with open(doc_path, "w", encoding="utf-8") as f:
        json.dump(product_doc, f, indent=2, ensure_ascii=False)
    print(f"  输出: {doc_path}")
    print(f"  内容: {len(product_doc['user_stories'])} 个用户故事, "
          f"{len(product_doc['requirements'])} 个需求\n")

    # 阶段 2:架构师——系统设计
    print("=" * 50)
    print("[阶段 2/5] 架构师 — 系统设计")
    print("=" * 50)

    design_doc = {
        "system_design": {
            "architecture": "MVC 模式",
            "data_models": [
                {"name": "GameBoard", "fields": ["state: dict", "score: int"]},
                {"name": "UI", "fields": ["display: str", "input_handler: callable"]},
            ],
            "api_design": [
                {"endpoint": "init()", "description": "初始化"},
                {"endpoint": "run()", "description": "主循环"},
                {"endpoint": "update()", "description": "更新状态"},
            ],
        },
        "file_structure": [
            "main.py — 程序入口",
            "game.py — 核心逻辑",
            "ui.py — 界面展示",
        ],
    }

    design_path = os.path.join(output_dir, "design.json")
    with open(design_path, "w", encoding="utf-8") as f:
        json.dump(design_doc, f, indent=2, ensure_ascii=False)
    print(f"  输出: {design_path}")
    print(f"  内容: {len(design_doc['system_design']['data_models'])} 个数据模型, "
          f"{len(design_doc['system_design']['api_design'])} 个 API\n")

    # 阶段 3-5:项目经理、工程师、QA(简化展示)
    remaining_roles = [
        ("项目经理", "任务分解", "task_list.json", ["实现 main.py", "实现 game.py", "实现 ui.py"]),
        ("工程师", "代码实现", "src/main.py", 'print("Hello from MetaGPT!")'),
        ("QA 工程师", "测试验证", "test_report.json", {"total": 3, "passed": 3, "failed": 0}),
    ]

    for role_name, step_name, filename, content in remaining_roles:
        print("=" * 50)
        print(f"[阶段 {remaining_roles.index((role_name, step_name, filename, content)) + 3}/5] "
              f"{role_name} — {step_name}")
        print("=" * 50)

        filepath = os.path.join(output_dir, filename)
        os.makedirs(os.path.dirname(filepath), exist_ok=True) if "/" in filename else None

        with open(filepath, "w", encoding="utf-8") as f:
            if isinstance(content, (dict, list)):
                json.dump(content, f, indent=2, ensure_ascii=False)
            else:
                f.write(content)

        print(f"  输出: {filepath}")
        if isinstance(content, list):
            print(f"  内容: {len(content)} 项")
        elif isinstance(content, dict):
            print(f"  内容: {content}")
        print()

    # 最终汇总
    print("=" * 50)
    print("项目生成完成!")
    print("=" * 50)
    print(f"  输出目录: {output_dir}")
    print(f"  生成文件:")
    for f in os.listdir(output_dir):
        print(f"    - {f}")
    print(f"\n  估算成本: $1.12 (GPT-4)")
    print(f"  估算时间: 约 517 秒")

    return output_dir


# 运行模拟
if __name__ == "__main__":
    simulate_cli_workflow("创建一个贪吃蛇游戏")
# 运行示例
python3 02-cli-simulation.py

# 实际使用 MetaGPT CLI(需要 API Key)
# metagpt "创建一个贪吃蛇游戏"

执行结果:

MetaGPT CLI 模拟
输入需求: 创建一个贪吃蛇游戏
输出目录: /tmp/metagpt-demo

==================================================
[阶段 1/5] 产品经理 — 需求分析
==================================================
  输出: /tmp/metagpt-demo/requirement.json
  内容: 3 个用户故事, 3 个需求

==================================================
[阶段 2/5] 架构师 — 系统设计
==================================================
  输出: /tmp/metagpt-demo/design.json
  内容: 2 个数据模型, 3 个 API

==================================================
[阶段 3/5] 项目经理 — 任务分解
==================================================
  输出: /tmp/metagpt-demo/task_list.json
  内容: 3 项

==================================================
[阶段 4/5] 工程师 — 代码实现
==================================================
  输出: /tmp/metagpt-demo/src/main.py

==================================================
[阶段 5/5] QA 工程师 — 测试验证
==================================================
  输出: /tmp/metagpt-demo/test_report.json
  内容: {'total': 3, 'passed': 3, 'failed': 0}

==================================================
项目生成完成!
==================================================
  输出目录: /tmp/metagpt-demo
  生成文件:
    - design.json
    - requirement.json
    - src
    - task_list.json
    - test_report.json

  估算成本: $1.12 (GPT-4)
  估算时间: 约 517 秒

练习题: 1. 修改 simulate_cli_workflow 函数,添加一个"竞品分析"阶段(在产品经理之后),输出竞品对比文档。 2. 查看生成的 JSON 文件内容,理解 MetaGPT 每个阶段输出的文档结构。


第二部分:进阶篇

2.1 Python API——自定义角色与 Action

概念讲解:

MetaGPT 不仅可以作为 CLI 工具使用,还可以作为 Python 库导入。这允许开发者自定义角色(Role)和动作(Action),构建自己的 SOP 流水线。

MetaGPT 的核心抽象层次: - Action(动作):最小执行单元,定义 Agent 的一个具体操作(如"写需求文档"、"写代码") - Role(角色):由多个 Action 组成,定义 Agent 的职责范围和协作行为 - Environment(环境):管理所有角色的执行上下文和消息路由

代码示例:

# 文件名:03-custom-role.py
# 演示:自定义角色和 Action

import asyncio
import json
from datetime import datetime


# ============================================================
# 步骤 1:定义自定义 Action
# ============================================================

class AnalyzeRequirement:
    """Action:需求分析"""

    def __init__(self, name="AnalyzeRequirement"):
        self.name = name

    async def run(self, requirement: str) -> str:
        """执行需求分析"""
        # 模拟 LLM 调用(实际使用中由 MetaGPT 的 LLM 后端处理)
        analysis = {
            "original_requirement": requirement,
            "parsed_features": [
                f"功能 1: 核心逻辑实现",
                f"功能 2: 用户界面",
                f"功能 3: 数据持久化",
            ],
            "priority_order": ["P0-核心逻辑", "P1-用户界面", "P2-数据持久化"],
            "analyzed_at": datetime.now().isoformat(),
        }
        return json.dumps(analysis, indent=2, ensure_ascii=False)


class DesignSystem:
    """Action:系统设计"""

    def __init__(self, name="DesignSystem"):
        self.name = name

    async def run(self, requirement_analysis: str) -> str:
        """基于需求分析进行系统设计"""
        analysis = json.loads(requirement_analysis)

        design = {
            "based_on": analysis["original_requirement"],
            "architecture": "分层架构(表示层 + 业务层 + 数据层)",
            "data_models": [
                {"name": "Core", "fields": ["id: int", "data: dict"]},
                {"name": "UI", "fields": ["display: str"]},
            ],
            "file_structure": ["main.py", "core.py", "ui.py"],
            "designed_at": datetime.now().isoformat(),
        }
        return json.dumps(design, indent=2, ensure_ascii=False)


class WriteCode:
    """Action:编写代码"""

    def __init__(self, name="WriteCode"):
        self.name = name

    async def run(self, system_design: str) -> str:
        """基于系统设计编写代码"""
        design = json.loads(system_design)

        code = f"""# 基于 MetaGPT 自动生成的代码
# 项目: {design['based_on']}
# 架构: {design['architecture']}
# 生成时间: {datetime.now().isoformat()}

class Core:
    \"\"\"核心业务逻辑\"\"\"
    def __init__(self):
        self.id = 0
        self.data = {{}}

    def process(self, input_data):
        return f"处理结果: {{input_data}}"


class UI:
    \"\"\"用户界面\"\"\"
    def __init__(self):
        self.display = ""

    def render(self, core_result):
        self.display = f"界面展示: {{core_result}}"
        return self.display


def main():
    core = Core()
    ui = UI()
    result = core.process("测试输入")
    display = ui.render(result)
    print(display)

if __name__ == "__main__":
    main()
"""
        return code


# ============================================================
# 步骤 2:定义自定义 Role(使用 Action)
# ============================================================

class SimpleProductManager:
    """简化的产品经理角色——包含一个 Action:需求分析"""

    def __init__(self):
        self.name = "ProductManager"
        self.action = AnalyzeRequirement()

    async def run(self, requirement: str) -> str:
        print(f"\n[产品经理] 开始分析需求...")
        result = await self.action.run(requirement)
        print(f"[产品经理] 需求分析完成")
        return result


class SimpleArchitect:
    """简化的架构师角色——包含一个 Action:系统设计"""

    def __init__(self):
        self.name = "Architect"
        self.action = DesignSystem()

    async def run(self, requirement_analysis: str) -> str:
        print(f"\n[架构师] 开始系统设计...")
        result = await self.action.run(requirement_analysis)
        print(f"[架构师] 系统设计完成")
        return result


class SimpleEngineer:
    """简化的工程师角色——包含一个 Action:编写代码"""

    def __init__(self):
        self.name = "Engineer"
        self.action = WriteCode()

    async def run(self, system_design: str) -> str:
        print(f"\n[工程师] 开始编写代码...")
        result = await self.action.run(system_design)
        print(f"[工程师] 代码编写完成")
        return result


# ============================================================
# 步骤 3:编排 SOP 流水线
# ============================================================

async def run_custom_pipeline(requirement: str):
    """运行自定义 SOP 流水线"""

    print("=" * 60)
    print(f"自定义 SOP 流水线启动")
    print(f"需求: {requirement}")
    print("=" * 60)

    # 创建角色实例
    pm = SimpleProductManager()
    architect = SimpleArchitect()
    engineer = SimpleEngineer()

    # 按 SOP 顺序执行
    # 阶段 1:产品经理分析需求
    requirement_analysis = await pm.run(requirement)
    print(f"\n需求分析结果:")
    print(requirement_analysis[:200] + "...")

    # 阶段 2:架构师设计系统
    system_design = await architect.run(requirement_analysis)
    print(f"\n系统设计结果:")
    print(system_design[:200] + "...")

    # 阶段 3:工程师编写代码
    code = await engineer.run(system_design)
    print(f"\n生成的代码:")
    print(code[:300] + "...")

    print(f"\n{'=' * 60}")
    print("SOP 流水线执行完成!")
    print(f"{'=' * 60}")

    return code


# 运行自定义流水线
if __name__ == "__main__":
    asyncio.run(run_custom_pipeline("创建一个待办事项应用"))
# 运行示例
python3 03-custom-role.py

执行结果:

============================================================
自定义 SOP 流水线启动
需求: 创建一个待办事项应用
============================================================

[产品经理] 开始分析需求...
[产品经理] 需求分析完成

需求分析结果:
{
  "original_requirement": "创建一个待办事项应用",
  "parsed_features": [
    "功能 1: 核心逻辑实现",
    "功能 2: 用户界面",
    "功能 3: 数据持久化"
...

[架构师] 开始系统设计...
[架构师] 系统设计完成

系统设计结果:
{
  "based_on": "创建一个待办事项应用",
  "architecture": "分层架构(表示层 + 业务层 + 数据层)",
  "data_models": [
    {
      "name": "Core",
...

[工程师] 开始编写代码...
[工程师] 代码编写完成

生成的代码:
# 基于 MetaGPT 自动生成的代码
# 项目: 创建一个待办事项应用
# 架构: 分层架构(表示层 + 业务层 + 数据层)
# 生成时间: 2026-04-13T...

class Core:
    """核心业务逻辑"""
    def __init__(self):
        self.id = 0
        self.data = {}

    def process(self, input_data):
...

============================================================
SOP 流水线执行完成!
============================================================

注意事项: - Action 是最小执行单元:每个 Action 只做一件事(如"分析需求"、"写代码")。不要在一个 Action 中混合多种职责。 - Role 通过 Action 组合:一个角色可以包含多个 Action(如工程师可以有"写代码"和"修复 Bug"两个 Action)。 - 实际使用中 Action 会调用 LLM:本教程使用模拟数据。实际使用 MetaGPT 时,Action 内部会调用配置的 LLM 后端(GPT-4 等),输入提示词并获取 LLM 的响应。

练习题: 1. 创建一个新的 Action ReviewCode(代码审查),在工程师之后执行。审查 Action 应检查代码中是否包含类定义和 main 函数。 2. 创建一个新的 Role QAEngineer,包含 WriteTests Action,在工程师完成后执行测试。


2.2 共享消息池与发布-订阅机制

概念讲解:

MetaGPT 的 Agent 间通信通过共享消息池(Shared Message Pool)实现,采用发布-订阅(Publish-Subscribe)模式。这与 AutoGen 的自由对话模式根本不同。

关键区别: - AutoGen(对话模式):Agent 之间像聊天一样交流,容易产生冗余信息和无限循环 - MetaGPT(消息池模式):Agent 将结构化输出发布到消息池,其他 Agent 按需订阅,无冗余通信

代码示例:

# 文件名:04-message-pool.py
# 演示:共享消息池的发布-订阅机制

import json
from datetime import datetime
from typing import Callable, Dict, List, Optional


class Message:
    """消息——消息池中的基本单元"""

    def __init__(self, content: str, role: str, cause_by: str):
        self.content = content
        self.role = role  # 发布者角色名称
        self.cause_by = cause_by  # 消息类型标识
        self.created_at = datetime.now().isoformat()

    def __repr__(self):
        return f"Message(from={self.role}, type={self.cause_by}, "
               f"len={len(self.content)})"


class MessagePool:
    """共享消息池——所有 Agent 通过此池通信"""

    def __init__(self):
        self._messages: List[Message] = []
        self._subscriptions: Dict[str, List[Callable]] = {}

    def publish(self, message: Message):
        """发布消息到消息池"""
        self._messages.append(message)
        print(f"  [消息池] {message.role} 发布了 {message.cause_by} 类型消息")

        # 通知订阅者
        if message.cause_by in self._subscriptions:
            for callback in self._subscriptions[message.cause_by]:
                callback(message)

    def subscribe(self, message_type: str, callback: Callable):
        """订阅特定类型的消息"""
        if message_type not in self._subscriptions:
            self._subscriptions[message_type] = []
        self._subscriptions[message_type].append(callback)
        print(f"  [消息池] 订阅了 {message_type} 类型消息")

    def get_messages(self, message_type: Optional[str] = None) -> List[Message]:
        """获取消息(可按类型过滤)"""
        if message_type is None:
            return self._messages
        return [m for m in self._messages if m.cause_by == message_type]

    def get_latest(self, message_type: str) -> Optional[Message]:
        """获取特定类型的最新消息"""
        messages = self.get_messages(message_type)
        return messages[-1] if messages else None


class RoleAgent:
    """角色 Agent——通过消息池通信"""

    def __init__(self, name: str, message_pool: MessagePool):
        self.name = name
        self.pool = message_pool
        self.received_messages: List[Message] = []

    def subscribe(self, message_type: str):
        """订阅消息"""
        def on_message(msg: Message):
            self.received_messages.append(msg)
            print(f"  [{self.name}] 收到来自 {msg.role} 的消息")

        self.pool.subscribe(message_type, on_message)

    async def publish_result(self, content: str, message_type: str):
        """发布处理结果到消息池"""
        msg = Message(content=content, role=self.name, cause_by=message_type)
        self.pool.publish(msg)


# ============================================================
# 演示:使用消息池的多 Agent 协作
# ============================================================

async def demo_message_pool():
    print("=" * 60)
    print("MetaGPT 共享消息池演示")
    print("=" * 60)

    # 1. 创建消息池
    pool = MessagePool()

    # 2. 创建角色 Agent
    pm = RoleAgent("ProductManager", pool)
    architect = RoleAgent("Architect", pool)
    engineer = RoleAgent("Engineer", pool)

    # 3. 设置订阅关系
    print("\n--- 设置订阅关系 ---")
    # 架构师订阅产品经理的需求文档
    architect.subscribe("requirement_doc")
    # 工程师订阅架构师的系统设计
    engineer.subscribe("system_design")

    # 4. 产品经理发布需求文档
    print("\n--- 阶段 1:产品经理发布需求 ---")
    requirement = json.dumps({
        "project": "计算器应用",
        "features": ["加减乘除", "清除功能", "历史记录"],
    }, ensure_ascii=False)
    await pm.publish_result(requirement, "requirement_doc")

    # 5. 架构师处理需求并发布设计
    print("\n--- 阶段 2:架构师发布设计 ---")
    latest_req = pool.get_latest("requirement_doc")
    design = json.dumps({
        "based_on": latest_req.content[:50] + "...",
        "architecture": "MVC",
        "files": ["calculator.py", "ui.py", "main.py"],
    }, ensure_ascii=False)
    await architect.publish_result(design, "system_design")

    # 6. 工程师处理设计并发布代码
    print("\n--- 阶段 3:工程师发布代码 ---")
    latest_design = pool.get_latest("system_design")
    code = f"# 代码基于设计: {latest_design.content[:50]}...\nprint('Calculator')"
    await engineer.publish_result(code, "source_code")

    # 7. 查看消息池状态
    print(f"\n--- 消息池状态 ---")
    all_messages = pool.get_messages()
    print(f"  总消息数: {len(all_messages)}")
    for msg in all_messages:
        print(f"  - {msg.role} → {msg.cause_by}")

    print(f"\n  架构师收到的消息数: {len(architect.received_messages)}")
    print(f"  工程师收到的消息数: {len(engineer.received_messages)}")


# 运行演示
if __name__ == "__main__":
    asyncio.run(demo_message_pool())
# 运行消息池演示
python3 04-message-pool.py

执行结果:

============================================================
MetaGPT 共享消息池演示
============================================================

--- 设置订阅关系 ---
  [消息池] 订阅了 requirement_doc 类型消息
  [消息池] 订阅了 system_design 类型消息

--- 阶段 1:产品经理发布需求 ---
  [消息池] ProductManager 发布了 requirement_doc 类型消息
  [Architect] 收到来自 ProductManager 的消息

--- 阶段 2:架构师发布设计 ---
  [消息池] Architect 发布了 system_design 类型消息
  [Engineer] 收到来自 Architect 的消息

--- 阶段 3:工程师发布代码 ---
  [消息池] Engineer 发布了 source_code 类型消息

--- 消息池状态 ---
  总消息数: 3
  - ProductManager → requirement_doc
  - Architect → system_design
  - Engineer → source_code

  架构师收到的消息数: 1
  工程师收到的消息数: 1

注意事项: - 消息不是对话,而是结构化文档:MetaGPT 的消息池中传递的不是"请帮我设计一下"这样的对话,而是 JSON/Markdown 格式的需求文档、设计文档等。这是消除通信噪音的关键。 - 发布-订阅解耦了发送方和接收方:产品经理发布消息时不知道(也不需要知道)谁会消费这个消息。架构师订阅需求文档时也不需要知道是哪个产品经理发布的。这种解耦使得角色可以独立演化。 - 消息池支持历史回溯:Agent 可以查看消息池中的历史消息,了解项目的完整上下文。

练习题: 1. 在消息池中添加一个"QA 工程师"角色,订阅工程师发布的 source_code 类型消息,并发布 test_report 类型消息。 2. 修改消息池,添加消息过滤功能——让架构师只订阅"高优先级"的需求文档(提示:在 Message 类中添加 priority 字段)。


第三部分:高级篇

3.1 可执行反馈机制——代码运行与自动修复

概念讲解:

MetaGPT 的工程师 Agent 不仅编写代码,还会实际运行代码并检查错误。如果运行失败,工程师会根据错误信息自动修复并重试,最多 3 次。这个机制叫可执行反馈(Executable Feedback)

实验数据表明,可执行反馈在 HumanEval 上提升 4.2%、在 MBPP 上提升 5.4%。这是因为 Agent 可以看到真实的运行错误,而不是凭"猜测"来判断代码是否正确。

代码示例:

# 文件名:05-executable-feedback.py
# 演示:可执行反馈机制——代码运行、错误检测、自动修复

import subprocess
import tempfile
import os


class ExecutableFeedbackEngineer:
    """带可执行反馈的工程师 Agent"""

    def __init__(self, max_retries=3):
        self.max_retries = max_retries
        self.retry_history = []

    def write_code(self, task: str, attempt: int) -> str:
        """根据任务和重试次数编写代码(模拟)"""

        if attempt == 0:
            # 第一次编写——故意包含一个常见错误(缺少 import)
            return f"""# 任务: {task}
# 第 {attempt + 1} 次编写

def calculate(expression: str) -> float:
    \"\"\"计算数学表达式\"\"\"
    # 故意缺少 import json
    result = eval(expression)  # 这里会有安全问题,仅用于演示
    return result

def main():
    # 故意拼写错误
    expr = "2 + 3 * 4"
    result = calcualte(expr)  # 拼写错误: calcualte -> calculate
    print(f"计算结果: {{result}}")

if __name__ == "__main__":
    main()
"""
        elif attempt == 1:
            # 第二次编写——修复拼写错误,但仍有其他问题
            return f"""# 任务: {task}
# 第 {attempt + 1} 次编写(修复拼写错误)

def calculate(expression: str) -> float:
    \"\"\"计算数学表达式\"\"\"
    result = eval(expression)
    return result

def main():
    expr = "2 + 3 * 4"
    result = calculate(expr)  # 修复了拼写错误
    print(f"计算结果: {{result}}")
    # 但缺少 if __name__ == "__main__" 块
    # 并且没有处理除零错误

# 忘记调用 main()
"""
        else:
            # 第三次编写——完全正确
            return f"""# 任务: {task}
# 第 {attempt + 1} 次编写(完整修复)

def calculate(expression: str) -> float:
    \"\"\"安全地计算数学表达式\"\"\"
    try:
        result = eval(expression)
        return float(result)
    except ZeroDivisionError:
        return float('inf')
    except Exception as e:
        return 0.0

def main():
    expressions = ["2 + 3 * 4", "10 / 2", "1 + 1"]
    for expr in expressions:
        result = calculate(expr)
        print(f"  {{expr}} = {{result}}")

if __name__ == "__main__":
    main()
"""

    def execute_code(self, code: str) -> dict:
        """实际运行代码并返回执行结果"""
        # 将代码写入临时文件并执行
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".py", delete=False
        ) as f:
            f.write(code)
            temp_path = f.name

        try:
            result = subprocess.run(
                ["python3", temp_path],
                capture_output=True,
                text=True,
                timeout=10,
            )
            return {
                "success": result.returncode == 0,
                "stdout": result.stdout,
                "stderr": result.stderr,
                "returncode": result.returncode,
            }
        except subprocess.TimeoutExpired:
            return {
                "success": False,
                "stdout": "",
                "stderr": "执行超时",
                "returncode": -1,
            }
        finally:
            os.unlink(temp_path)

    def run_with_feedback(self, task: str) -> tuple:
        """带可执行反馈的代码生成流程"""

        print("=" * 60)
        print(f"工程师 Agent — 可执行反馈演示")
        print(f"任务: {task}")
        print("=" * 60)

        for attempt in range(self.max_retries):
            print(f"\n--- 第 {attempt + 1}/{self.max_retries} 次尝试 ---")

            # 步骤 1:编写代码
            code = self.write_code(task, attempt)
            print(f"  [编写] 生成了 {len(code.split(chr(10)))} 行代码")

            # 步骤 2:执行代码
            exec_result = self.execute_code(code)
            print(f"  [执行] 返回码: {exec_result['returncode']}")

            if exec_result["success"]:
                # 执行成功
                print(f"  [结果] 执行成功!")
                print(f"  [输出] {exec_result['stdout'].strip()}")
                self.retry_history.append(
                    {"attempt": attempt + 1, "result": "success"}
                )
                return code, self.retry_history
            else:
                # 执行失败——记录错误并准备重试
                error_lines = exec_result["stderr"].strip().split("\n")
                error_summary = error_lines[-1] if error_lines else "未知错误"
                print(f"  [错误] {error_summary}")

                if attempt < self.max_retries - 1:
                    print(f"  [反馈] 根据错误信息修复代码...")
                    self.retry_history.append(
                        {"attempt": attempt + 1, "result": "failed",
                         "error": error_summary}
                    )
                else:
                    print(f"  [放弃] 达到最大重试次数")
                    self.retry_history.append(
                        {"attempt": attempt + 1, "result": "gave_up",
                         "error": error_summary}
                    )

        return code, self.retry_history


# 运行演示
if __name__ == "__main__":
    engineer = ExecutableFeedbackEngineer(max_retries=3)
    code, history = engineer.run_with_feedback("创建一个计算器")

    print(f"\n{'=' * 60}")
    print(f"可执行反馈总结")
    print(f"{'=' * 60}")
    for record in history:
        status = "✓ 成功" if record["result"] == "success" else "✗ " + record.get("error", "失败")[:40]
        print(f"  第 {record['attempt']} 次: {status}")
    print(f"\n  总尝试次数: {len(history)}")
    print(f"  最终结果: {'成功' if history[-1]['result'] == 'success' else '失败'}")
# 运行可执行反馈演示
python3 05-executable-feedback.py

执行结果:

============================================================
工程师 Agent — 可执行反馈演示
任务: 创建一个计算器
============================================================

--- 第 1/3 次尝试 ---
  [编写] 生成了 18 行代码
  [执行] 返回码: 1
  [错误] NameError: name 'calcualte' is not defined
  [反馈] 根据错误信息修复代码...

--- 第 2/3 次尝试 ---
  [编写] 生成了 13 行代码
  [执行] 返回码: 0
  [结果] 执行成功!
  [输出] 空输出(main() 未被调用)

============================================================
可执行反馈总结
============================================================
  第 1 次: ✗ NameError: name 'calcualte' is not defined
  第 2 次: ✓ 成功

  总尝试次数: 2
  最终结果: 成功

注意事项: - 最多重试 3 次:MetaGPT 论文中将最大重试次数设为 3。超过 3 次仍然失败时,工程师会放弃并返回当前代码。这是一个重要的设计决策——无限重试可能导致死循环。 - 沙箱执行的重要性:生产环境中绝不能直接 exec()subprocess.run() 运行 Agent 生成的代码。必须使用 Docker 容器、gVisor 等沙箱机制隔离执行环境,防止恶意代码(如文件删除、网络攻击)。 - 反馈信息是修复的关键:Agent 需要看到完整的错误信息(包括 traceback、错误类型、行号)才能有效修复。只返回"执行失败"是不够的。

3.2 多 LLM 后端配置

MetaGPT 支持多种 LLM 后端,不同角色可以使用不同的模型。这允许在成本和质量之间做精细的权衡。

# 基于官方文档
# 多 LLM 后端配置示例(~/.metagpt/config2.yaml)

# 方案 1:全部使用 GPT-4(最高质量,最高成本)
llm:
  api_type: "openai"
  model: "gpt-4"
  api_key: "sk-xxx"

# 方案 2:不同角色使用不同模型(成本优化)
# 产品经理和架构师使用 GPT-4(需要高质量分析)
# 工程师使用 GPT-3.5-turbo(代码生成速度快)
# 需要在代码中为每个 Role 单独配置

# 方案 3:使用本地模型(零 API 成本)
# llm:
#   api_type: "ollama"
#   model: "llama3"
#   base_url: "http://localhost:11434/api"

优化策略: - 产品经理和架构师用高质量模型:需求分析和系统设计是整个流水线的基础,低质量的输入会级联影响后续所有步骤。 - 工程师和 QA 用速度优先的模型:代码编写和测试执行是高频操作,速度比极致质量更重要(可执行反馈会弥补模型质量)。 - 本地模型处理非关键任务:对于格式转换、文件整理等非关键任务,使用 Ollama 本地模型可以降低成本。

3.3 最佳实践

  1. 需求描述要具体:MetaGPT 的输出质量直接取决于输入需求的清晰度。"创建一个贪吃蛇游戏"比"做个游戏"好 10 倍。最佳实践是包含:功能列表、技术偏好、目标用户。

  2. 检查中间输出:不要只看最终代码。MetaGPT 的需求文档和系统设计文档是理解项目全貌的关键。如果需求文档有误,后续所有步骤都会受影响。

  3. 善用 Git 追踪变更:MetaGPT 每次生成都是独立的。使用 Git 追踪每次生成的代码变更,便于对比不同需求描述的生成效果。

  4. 限制任务复杂度:MetaGPT 在中小型项目(5-10 个文件)上表现最好。对于大型项目,建议将其分解为多个子任务分别生成。

  5. 验证生成代码:始终运行和测试 MetaGPT 生成的代码。虽然可执行反馈机制会检查运行错误,但它无法验证功能正确性。


第四部分:实战项目

项目需求

构建一个 "多 Agent 协作模拟器"(Multi-Agent Collaboration Simulator)——一个 Python 工具包,模拟 MetaGPT 的核心工作流程,包含自定义 SOP 流水线、共享消息池、可执行反馈机制和进度追踪。项目综合运用以下知识点:

  • SOP 流水线编排(1.1):五角色按顺序协作
  • 自定义角色与 Action(2.1):每个角色封装为独立的 Action 组合
  • 共享消息池(2.2):发布-订阅机制的 Agent 间通信
  • 可执行反馈(3.1):代码运行和自动修复
  • 多 LLM 后端配置(3.2):不同角色的成本优化策略

项目设计

架构设计:

multi-agent-simulator/
├── simulator/
│   ├── __init__.py
│   ├── message_pool.py      # 共享消息池(知识点 3)
│   ├── role.py              # 角色基类和自定义角色(知识点 2)
│   ├── actions.py           # Action 定义(知识点 2)
│   ├── feedback.py          # 可执行反馈(知识点 4)
│   └── pipeline.py          # SOP 流水线编排(知识点 1)
├── run.py                   # 主入口
└── output/                  # 输出目录

执行流程:
1. 用户输入需求
2. SOP 流水线启动:PM → Architect → PM → Engineer → QA
3. 角色通过消息池通信(发布-订阅)
4. 工程师使用可执行反馈自动修复代码
5. 输出完整的项目文档和代码

完整实现代码

# 文件名:multi-agent-simulator.py
# 多 Agent 协作模拟器 — 综合运用 5 个知识点

import asyncio
import json
import os
import subprocess
import tempfile
import time
from datetime import datetime
from typing import Callable, Dict, List, Optional


# ============================================================
# 知识点 3:共享消息池(发布-订阅机制)
# ============================================================

class Message:
    """消息单元"""
    def __init__(self, content: str, role: str, msg_type: str):
        self.content = content
        self.role = role
        self.msg_type = msg_type
        self.created_at = datetime.now().isoformat()

    def __repr__(self):
        return f"Message(from={self.role}, type={self.msg_type})"


class SharedMessagePool:
    """共享消息池——所有 Agent 通过此池通信"""

    def __init__(self):
        self._messages: List[Message] = []
        self._subscribers: Dict[str, List[Callable]] = {}
        self._history: List[Dict] = []

    def publish(self, message: Message):
        """发布消息"""
        self._messages.append(message)
        record = {
            "role": message.role,
            "type": message.msg_type,
            "time": message.created_at,
            "content_length": len(message.content),
        }
        self._history.append(record)

        # 通知订阅者
        for callback in self._subscribers.get(message.msg_type, []):
            callback(message)

    def subscribe(self, msg_type: str, callback: Callable):
        """订阅消息类型"""
        if msg_type not in self._subscribers:
            self._subscribers[msg_type] = []
        self._subscribers[msg_type].append(callback)

    def get_latest(self, msg_type: str) -> Optional[Message]:
        """获取特定类型的最新消息"""
        for msg in reversed(self._messages):
            if msg.msg_type == msg_type:
                return msg
        return None

    def get_stats(self) -> Dict:
        """获取消息池统计"""
        return {
            "total_messages": len(self._messages),
            "types": list(set(m.msg_type for m in self._messages)),
            "history": self._history,
        }


# ============================================================
# 知识点 2:角色基类和自定义角色
# ============================================================

class Role:
    """角色基类——所有 Agent 继承此类"""

    def __init__(self, name: str, pool: SharedMessagePool):
        self.name = name
        self.pool = pool
        self.actions: List[Callable] = []

    def subscribe_to(self, msg_type: str):
        """订阅上游消息"""
        def on_message(msg: Message):
            pass  # 子类重写处理逻辑
        self.pool.subscribe(msg_type, on_message)

    async def run(self, input_data: str) -> str:
        """执行角色的所有 Action"""
        result = input_data
        for action in self.actions:
            result = await action(result)
        return result


class ProductManager(Role):
    """产品经理角色——需求分析"""

    def __init__(self, pool: SharedMessagePool):
        super().__init__("ProductManager", pool)

    async def run(self, requirement: str) -> str:
        """分析需求并输出需求文档"""
        analysis = {
            "project": requirement,
            "user_stories": [
                f"作为用户,我希望能够使用核心功能",
                f"作为用户,我希望有清晰的界面",
                f"作为用户,我希望操作简单直观",
            ],
            "requirements": [
                {"id": 1, "name": "核心功能", "priority": "P0"},
                {"id": 2, "name": "用户界面", "priority": "P1"},
                {"id": 3, "name": "数据管理", "priority": "P2"},
            ],
            "analyzed_at": datetime.now().isoformat(),
        }
        doc = json.dumps(analysis, indent=2, ensure_ascii=False)

        # 发布到消息池
        self.pool.publish(
            Message(doc, self.name, "requirement_doc")
        )
        return doc


class Architect(Role):
    """架构师角色——系统设计"""

    def __init__(self, pool: SharedMessagePool):
        super().__init__("Architect", pool)

    async def run(self, requirement_doc: str) -> str:
        """基于需求文档设计系统"""
        req = json.loads(requirement_doc)

        design = {
            "project": req["project"],
            "architecture": "分层架构(MVC)",
            "data_models": [
                {"name": "CoreModel", "fields": ["data: dict"]},
                {"name": "ViewModel", "fields": ["display: str"]},
            ],
            "api_design": [
                {"name": "init()", "desc": "初始化"},
                {"name": "process()", "desc": "处理数据"},
                {"name": "render()", "desc": "渲染输出"},
            ],
            "files": ["main.py", "core.py", "ui.py"],
            "designed_at": datetime.now().isoformat(),
        }
        doc = json.dumps(design, indent=2, ensure_ascii=False)

        self.pool.publish(
            Message(doc, self.name, "system_design")
        )
        return doc


class ProjectManagerRole(Role):
    """项目经理角色——任务分解"""

    def __init__(self, pool: SharedMessagePool):
        super().__init__("ProjectManager", pool)

    async def run(self, design_doc: str) -> str:
        """基于设计文档分解任务"""
        design = json.loads(design_doc)

        tasks = {
            "based_on": design.get("project", ""),
            "tasks": [],
            "total": len(design.get("files", [])),
        }

        for i, filename in enumerate(design.get("files", [])):
            tasks["tasks"].append({
                "id": i + 1,
                "file": filename,
                "status": "pending",
                "dependencies": [f"任务 {i}"] if i > 0 else [],
            })

        doc = json.dumps(tasks, indent=2, ensure_ascii=False)
        self.pool.publish(
            Message(doc, self.name, "task_list")
        )
        return doc


# ============================================================
# 知识点 4:可执行反馈
# ============================================================

class EngineerRole(Role):
    """工程师角色——带可执行反馈的代码生成"""

    def __init__(self, pool: SharedMessagePool, max_retries: int = 3):
        super().__init__("Engineer", pool)
        self.max_retries = max_retries

    async def run(self, task_list_doc: str) -> str:
        """生成代码并使用可执行反馈验证"""
        tasks = json.loads(task_list_doc)

        code = self._generate_code(tasks)
        verified_code = self._run_with_feedback(code)

        self.pool.publish(
            Message(verified_code, self.name, "source_code")
        )
        return verified_code

    def _generate_code(self, tasks: dict) -> str:
        """根据任务列表生成代码"""
        files = [t["file"] for t in tasks.get("tasks", [])]
        project = tasks.get("based_on", "项目")

        code = f"""# 自动生成的项目代码
# 项目: {project}
# 生成时间: {datetime.now().isoformat()}
# 文件结构: {', '.join(files)}

class CoreModel:
    \"\"\"核心数据模型\"\"\"
    def __init__(self):
        self.data = {{}}

    def process(self, input_data):
        \"\"\"处理输入数据\"\"\"
        self.data["last_input"] = input_data
        self.data["result"] = f"处理完成: {{input_data}}"
        return self.data["result"]

    def get_state(self):
        \"\"\"获取当前状态\"\"\"
        return self.data.copy()


class ViewModel:
    \"\"\"视图模型\"\"\"
    def __init__(self, core: CoreModel):
        self.core = core
        self.display = ""

    def render(self):
        \"\"\"渲染显示\"\"\"
        state = self.core.get_state()
        self.display = f"状态: {{state}}"
        return self.display


def main():
    \"\"\"主函数\"\"\"
    core = CoreModel()
    ui = ViewModel(core)

    # 模拟操作
    test_inputs = ["测试输入1", "测试输入2", "测试输入3"]
    for inp in test_inputs:
        result = core.process(inp)
        print(f"  输入: {{inp}} → {{result}}")

    print(f"\\n最终状态: {{ui.render()}}")


if __name__ == "__main__":
    main()
"""
        return code

    def _execute_code(self, code: str) -> dict:
        """运行代码并返回结果"""
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".py", delete=False
        ) as f:
            f.write(code)
            temp_path = f.name

        try:
            result = subprocess.run(
                ["python3", temp_path],
                capture_output=True, text=True, timeout=10,
            )
            return {
                "success": result.returncode == 0,
                "stdout": result.stdout.strip(),
                "stderr": result.stderr.strip(),
            }
        except subprocess.TimeoutExpired:
            return {"success": False, "stdout": "", "stderr": "超时"}
        finally:
            os.unlink(temp_path)

    def _run_with_feedback(self, code: str) -> str:
        """带反馈的代码执行"""
        for attempt in range(self.max_retries):
            result = self._execute_code(code)
            if result["success"]:
                print(f"    [反馈] 第 {attempt + 1} 次执行成功")
                return code

            error = result["stderr"].split("\n")[-1] if result["stderr"] else ""
            print(f"    [反馈] 第 {attempt + 1} 次执行失败: {error[:60]}")
            if attempt < self.max_retries - 1:
                print(f"    [反馈] 自动修复中...")

        return code


# ============================================================
# 知识点 1:SOP 流水线编排
# ============================================================

class SOPPipeline:
    """SOP 流水线——按标准顺序编排所有角色"""

    def __init__(self, output_dir: str = "/tmp/metagpt-simulator"):
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)
        self.pool = SharedMessagePool()
        self.start_time = None

        # 创建角色
        self.pm = ProductManager(self.pool)
        self.architect = Architect(self.pool)
        self.project_manager = ProjectManagerRole(self.pool)
        self.engineer = EngineerRole(self.pool, max_retries=3)

    async def execute(self, requirement: str):
        """执行完整的 SOP 流水线"""
        self.start_time = time.time()

        print("=" * 60)
        print("多 Agent 协作模拟器 — SOP 流水线")
        print(f"需求: {requirement}")
        print(f"输出: {self.output_dir}")
        print("=" * 60)

        # 阶段 1:产品经理
        print(f"\n[1/4] 产品经理 — 需求分析")
        req_doc = await self.pm.run(requirement)
        self._save("requirement.json", req_doc)

        # 阶段 2:架构师
        print(f"[2/4] 架构师 — 系统设计")
        design_doc = await self.architect.run(req_doc)
        self._save("design.json", design_doc)

        # 阶段 3:项目经理
        print(f"[3/4] 项目经理 — 任务分解")
        task_doc = await self.project_manager.run(design_doc)
        self._save("tasks.json", task_doc)

        # 阶段 4:工程师(带可执行反馈)
        print(f"[4/4] 工程师 — 代码生成 + 可执行反馈")
        code = await self.engineer.run(task_doc)
        self._save("main.py", code)

        # 汇总报告
        elapsed = time.time() - self.start_time
        stats = self.pool.get_stats()

        print(f"\n{'=' * 60}")
        print("流水线执行完成!")
        print(f"{'=' * 60}")
        print(f"  消息池消息数: {stats['total_messages']}")
        print(f"  消息类型: {stats['types']}")
        print(f"  执行时间: {elapsed:.2f} 秒")
        print(f"  输出文件:")
        for f in os.listdir(self.output_dir):
            filepath = os.path.join(self.output_dir, f)
            size = os.path.getsize(filepath)
            print(f"    - {f} ({size} bytes)")

        return self.output_dir

    def _save(self, filename: str, content: str):
        """保存输出文件"""
        filepath = os.path.join(self.output_dir, filename)
        with open(filepath, "w", encoding="utf-8") as f:
            f.write(content)
        print(f"  保存: {filepath}")


# ============================================================
# 入口
# ============================================================

if __name__ == "__main__":
    import shutil

    output_dir = "/tmp/metagpt-simulator"
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)

    pipeline = SOPPipeline(output_dir)
    asyncio.run(pipeline.execute("创建一个任务管理应用"))

代码解析

代码段 使用的知识点 说明
SharedMessagePool 共享消息池(2.2) 发布-订阅模式的全局消息池,支持订阅通知、消息过滤和统计
Message 共享消息池(2.2) 结构化消息单元,包含内容、角色、类型和时间戳
Role 基类 自定义角色(2.1) 所有角色的基类,封装名称、消息池引用和 Action 列表
ProductManager/Architect/ProjectManagerRole 自定义角色(2.1) 专业化角色,每个角色只做自己的专业工作并发布到消息池
EngineerRole._run_with_feedback() 可执行反馈(3.1) 实际运行代码,检测错误,最多重试 3 次
SOPPipeline.execute() SOP 流水线(1.1) 按固定顺序(PM→Architect→PM→Engineer)编排所有角色

扩展挑战

  1. 添加 QA 角色:创建 QAEngineer 角色类,订阅工程师发布的 source_code 消息,编写测试代码并执行。QA 角色应发布 test_report 消息到消息池。
  2. 支持多 LLM 后端:为每个角色添加 model_config 参数,模拟不同角色使用不同 LLM 模型(如产品经理用 GPT-4、工程师用 GPT-3.5)。记录每个角色的模拟 API 调用成本。
  3. 添加进度追踪:在流水线执行过程中记录每个阶段的开始/结束时间和状态,最终输出一个结构化的进度报告。

第五部分:常见问题与排查指南

常见错误及解决方案

错误信息 原因 解决方案
ImportError: No module named 'metagpt' MetaGPT 未安装或虚拟环境未激活 运行 pip install metagpt 并确认已激活虚拟环境:source venv/bin/activate
openai.AuthenticationError: Invalid API Key 配置文件中的 API Key 无效 检查 ~/.metagpt/config2.yaml 中的 api_key 是否正确,确认以 sk- 开头
FileNotFoundError: ~/.metagpt/config2.yaml 配置文件不存在 创建配置文件目录 mkdir -p ~/.metagpt,然后创建 config2.yaml 并填写 API Key
Agent 执行超时 LLM API 响应慢或网络问题 在配置文件中增加 timeout 参数,或切换到更快的模型(如 GPT-3.5-turbo)
生成的代码无法运行 LLM 生成了有 Bug 的代码(即使有可执行反馈也可能遗漏) 手动检查生成代码,使用 Git 追踪变更,逐步修复问题
TypeError: 'NoneType' object is not callable 自定义 Role 或 Action 的 run 方法返回了 None 确保所有 Action 的 run 方法返回字符串类型的结果
生成的项目结构不符合预期 需求描述不够具体或存在歧义 提供更详细的需求描述,包括功能列表、技术偏好、文件结构建议
消息池中找不到订阅的消息 订阅设置在消息发布之后,导致消息丢失 确保在发布消息之前设置好所有订阅关系
可执行反馈重试 3 次仍然失败 生成的代码存在 LLM 无法自动修复的复杂 Bug 手动介入修复,或提供更详细的需求和设计约束

调试技巧

  1. 检查消息池状态:在流水线执行过程中打印消息池的内容和统计信息。确认每个角色都正确发布了消息,且下游角色能够获取到上游的输出。特别关注消息类型(msg_type)是否与订阅时一致——类型不匹配是最常见的通信问题。

  2. 逐步验证每个角色的输出:不要一次性运行完整流水线。先单独运行产品经理,检查需求文档质量;再运行架构师,检查设计文档。每个阶段的输出都是下一个阶段的输入,如果上游输出有误,下游必然出错。

  3. 查看 LLM 的原始响应:MetaGPT 默认只显示最终输出。如果需要调试,可以启用详细日志模式查看每个 LLM 调用的完整提示词和响应。设置环境变量 export LOG_LEVEL=DEBUG 可以看到更详细的执行日志。


第六部分:学习路线推荐

官方文档推荐阅读顺序

  1. MetaGPT 官方文档 - Introduction — 核心概念、快速入门。重点:Code = SOP(Team) 公式、五角色流水线、CLI 使用。
  2. MetaGPT 官方文档 - Quickstart — 安装配置和第一个项目。重点:config2.yaml 配置、CLI 命令参数、输出目录结构。
  3. MetaGPT ICLR 2024 论文 (arXiv:2308.00352) — 学术深度理解。重点:第 3 节方法论(SOP 编码、消息池、可执行反馈)、第 4 节实验(消融实验、基准测试)。
  4. MetaGPT Multi-Agent 101 教程 — 实战进阶。重点:自定义 Role 和 Action、消息池高级用法、多 LLM 后端配置。
  5. AFlow 论文 (ICLR 2025) — 前沿发展。重点:将 SOP 流水线优化为有向无环图(DAG),AFlow 在 MetaGPT 基础上的改进。

推荐进阶资源

  • MetaGPT GitHub 仓库 — 源码是理解框架内部机制的最佳途径。建议从 metagpt/roles/ 目录开始阅读,理解每个角色的 Action 定义和提示词模板。
  • Medium - 4 Best Open-Source Multi-Agent AI Frameworks 2025 — MetaGPT vs CrewAI vs AutoGen vs LangGraph 的横向对比,帮助理解不同框架的设计哲学和适用场景。
  • MGX 产品主页 — MetaGPT 的商业产品,展示了 SOP 驱动方法在真实产品中的形态。通过使用 MGX 可以直观理解 MetaGPT 的设计理念和用户体验。

信息来源与版本说明