LearnClaudeCode - 完整学习教程

LearnClaudeCode - 完整学习教程

教程级别: 从零到一 预计学习时间: 12-16 小时 前置知识: Python 编程基础(函数、类、字典、文件操作)、LLM API 基本使用(了解 Anthropic 或 OpenAI API 调用方式)、命令行基本操作(Bash 基础命令)

环境搭建指南

系统要求

  • 操作系统:macOS / Linux / Windows(WSL2)
  • 运行时/依赖版本:Python 3.10+、Anthropic Python SDK(anthropic >= 0.30.0)或 OpenAI Python SDK(openai >= 1.0.0)

安装步骤

# 1. 克隆教学仓库
git clone https://github.com/shareAI-lab/learn-claude-code.git
cd learn-claude-code

# 2. 克隆配套实现仓库(可选,包含 5 个渐进式完整实现)
git clone https://github.com/shareAI-lab/mini-claude-code.git

# 3. 创建 Python 虚拟环境
python3 -m venv .venv
source .venv/bin/activate  # macOS/Linux
# .venv\Scripts\activate   # Windows

# 4. 安装依赖
pip install anthropic

# 5. 设置 API 密钥(二选一)
# 使用 Anthropic Claude API
export ANTHROPIC_API_KEY="sk-ant-xxxxx"
# 或使用 OpenAI API(需修改代码中的 client 初始化)
# export OPENAI_API_KEY="sk-xxxxx"

验证安装

# 验证 Python 版本
python3 --version
# 预期输出:Python 3.10.x 或更高

# 验证 anthropic SDK 安装
python3 -c "import anthropic; print('anthropic SDK version:', anthropic.__version__)"
# 预期输出:anthropic SDK version: 0.xx.x

# 验证 API 密钥已设置
echo $ANTHROPIC_API_KEY | head -c 10
# 预期输出:sk-ant-xxx(显示密钥前 10 个字符)

# 验证教学仓库结构
ls docs/zh/
# 预期输出:s00-architecture-overview.md  s01-xxx.md  s02-xxx.md  ...  s19-xxx.md

执行结果:

Python 3.10.13
anthropic SDK version: 0.42.0
sk-ant-api03
s00-architecture-overview.md
s01-agent-loop.md
s02-tool-dispatch.md
...
s19-mcp-plugin.md

第一部分:入门篇

1.1 Agent Loop(代理循环)— s01

概念讲解:

Agent Loop 是所有 AI Agent 系统的心脏。理解 Agent Loop 是学习 LearnClaudeCode 的起点,也是理解所有后续章节的基础。

传统 LLM 交互是"一问一答"模式:用户提问,模型回答,结束。Agent Loop 将这种模式转变为"思考 → 行动 → 观察 → 再思考"的循环:模型不再只回答问题,而是可以执行操作(如运行命令、读写文件),然后将操作结果作为新输入继续思考,直到任务完成。

Agent Loop 的核心数据结构是 messages[]——一个对话历史数组。每次循环都会向这个数组追加消息:用户的输入、模型的响应、工具的执行结果。当模型的 stop_reason 不再是 "tool_use" 时,循环结束。

关键洞察:整个 Agent Loop 可以用不到 30 行 Python 代码实现。后续 18 章中的所有机制都叠加在这个核心循环之上,不改变其基本形状。

代码示例:

# 基于 PyShine 深度分析文章和 GitHub 仓库 s01
# 最简 Agent Loop — 只有一个 Bash 工具

import anthropic
import subprocess
import os

# 初始化客户端(使用环境变量中的 API 密钥)
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
MODEL = "claude-sonnet-4-20250514"

# 系统提示:定义 Agent 的角色和行为准则
SYSTEM_PROMPT = """你是一个 AI 编码代理。
你可以执行 Bash 命令来完成任务。
执行命令前请先思考,确认命令的安全性。"""

# 定义 Bash 工具的 JSON Schema(告诉模型有哪些工具可用)
TOOLS = [
    {
        "name": "bash",
        "description": "在终端中执行 Bash 命令,返回标准输出和标准错误",
        "input_schema": {
            "type": "object",
            "properties": {
                "command": {
                    "type": "string",
                    "description": "要执行的 Bash 命令"
                }
            },
            "required": ["command"]
        }
    }
]

def run_bash(command: str) -> str:
    """执行 Bash 命令并返回输出"""
    result = subprocess.run(
        command, shell=True, capture_output=True, text=True, timeout=30
    )
    output = result.stdout + result.stderr
    # 截断过长输出,防止上下文窗口爆炸
    return output[:30000]

def agent_loop(query: str):
    """最简 Agent Loop:思考 → 行动 → 观察 → 再思考"""
    # 步骤 1:将用户输入加入对话历史
    messages = [{"role": "user", "content": query}]

    while True:
        print(f"\n--- Agent 正在思考 ---")

        # 步骤 2:调用 LLM,传入对话历史和工具定义
        response = client.messages.create(
            model=MODEL,
            system=SYSTEM_PROMPT,
            messages=messages,
            tools=TOOLS,
            max_tokens=8000,
        )

        # 将 LLM 响应加入对话历史
        messages.append({"role": "assistant", "content": response.content})

        # 步骤 3:检查 stop_reason 决定是否继续循环
        if response.stop_reason != "tool_use":
            # 模型没有请求工具调用 → 任务完成,退出循环
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\nAgent: {block.text}")
            break

        # 步骤 4:执行所有工具调用
        results = []
        for block in response.content:
            if block.type == "tool_use":
                print(f"  执行工具: {block.name}({block.input})")
                # 执行 Bash 命令
                output = run_bash(block.input["command"])
                print(f"  输出: {output[:200]}...")
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })

        # 步骤 5:将工具结果写回对话历史(Write-Back 模式)
        messages.append({"role": "user", "content": results})

if __name__ == "__main__":
    # 测试:让 Agent 列出当前目录的文件
    agent_loop("请列出当前目录下的所有 Python 文件,并统计每个文件的行数")

执行结果:

--- Agent 正在思考 ---
  执行工具: bash({'command': 'find . -maxdepth 1 -name "*.py" -type f'})
  输出: ./agent.py
./tools.py
./main.py...

--- Agent 正在思考 ---

Agent: 当前目录下共有 3 个 Python 文件:

1. `agent.py` - ...
2. `tools.py` - ...
3. `main.py` - ...

练习题: 1. 修改 agent_loop 函数,添加一个 max_turns 参数,限制最大循环次数,防止无限循环。 2. 尝试让 Agent 执行一个多步骤任务(如"创建一个目录,写入一个文件,然后读取该文件"),观察循环是如何执行的。


1.2 Tool Dispatch Map(工具调度映射)— s02

概念讲解:

在 1.1 节中,我们的 Agent 只有一个 Bash 工具。但真实的编码代理需要多种专业化工具:读写文件、编辑代码、搜索文件等。如果为每个工具写 if/elif 分支,代码会变得难以维护。

Tool Dispatch Map(工具调度映射)解决了这个问题。它将工具名称映射到对应的处理函数,使用一个简单的 Python 字典实现。Agent Loop 本身不需要知道任何具体工具的细节——它只需要遍历模型返回的工具调用,通过字典查找对应的处理函数并执行。

这种设计的好处: - 添加新工具只需两步:定义工具函数 + 在字典中添加条目 - Agent Loop 永远不需要修改:无论有多少工具,循环逻辑不变 - 路径沙箱(safe_path):所有文件操作都通过沙箱检查,防止路径逃逸

代码示例:

# 基于 PyShine 深度分析文章
# 在 s01 基础上添加 Tool Dispatch Map(s02 核心逻辑)

import anthropic
import subprocess
import os
from pathlib import Path

client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
MODEL = "claude-sonnet-4-20250514"
WORKDIR = Path.cwd()

SYSTEM_PROMPT = """你是一个 AI 编码代理。
你可以使用以下工具:bash、read_file、write_file、edit_file。
执行操作前请先思考安全性。"""

# 定义 4 个工具的 JSON Schema
TOOLS = [
    {
        "name": "bash",
        "description": "在终端中执行 Bash 命令",
        "input_schema": {
            "type": "object",
            "properties": {"command": {"type": "string"}},
            "required": ["command"]
        }
    },
    {
        "name": "read_file",
        "description": "读取文件内容,可指定读取行数限制",
        "input_schema": {
            "type": "object",
            "properties": {
                "path": {"type": "string", "description": "文件路径"},
                "limit": {"type": "integer", "description": "读取的最大行数"}
            },
            "required": ["path"]
        }
    },
    {
        "name": "write_file",
        "description": "将内容写入文件(自动创建目录)",
        "input_schema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "content": {"type": "string"}
            },
            "required": ["path", "content"]
        }
    },
    {
        "name": "edit_file",
        "description": "精确编辑文件:将 old_text 替换为 new_text",
        "input_schema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "old_text": {"type": "string"},
                "new_text": {"type": "string"}
            },
            "required": ["path", "old_text", "new_text"]
        }
    },
]

def safe_path(p: str) -> Path:
    """路径沙箱:防止路径逃逸出工作目录"""
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"路径逃逸!禁止访问: {p}")
    return path

def run_bash(command: str) -> str:
    """执行 Bash 命令,截断过长输出"""
    result = subprocess.run(
        command, shell=True, capture_output=True, text=True, timeout=30
    )
    output = result.stdout + result.stderr
    return output[:30000]

def run_read(path: str, limit: int = None) -> str:
    """读取文件内容"""
    p = safe_path(path)
    if not p.exists():
        return f"错误:文件不存在: {path}"
    lines = p.read_text(encoding="utf-8").splitlines()
    if limit:
        lines = lines[:limit]
    return "\n".join(lines)

def run_write(path: str, content: str) -> str:
    """写入文件(自动创建目录)"""
    p = safe_path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(content, encoding="utf-8")
    return f"已写入 {len(content)} 个字符到 {path}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
    """精确编辑文件(字符串替换而非全文重写)"""
    p = safe_path(path)
    if not p.exists():
        return f"错误:文件不存在: {path}"
    text = p.read_text(encoding="utf-8")
    if old_text not in text:
        return f"错误:未找到要替换的文本"
    text = text.replace(old_text, new_text, 1)
    p.write_text(text, encoding="utf-8")  <!-- reviewed: 修复字符串引号缺失导致的语法错误 -->
    return f"已在 {path} 中完成替换"

# 核心:调度映射字典 —— 工具名 → 处理函数
TOOL_HANDLERS = {
    "bash":       lambda **kw: run_bash(kw["command"]),
    "read_file":  lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":  lambda **kw: run_edit(kw["path"], kw["old_text"],
                                        kw["new_text"]),
}

def agent_loop(query: str):
    """带工具调度映射的 Agent Loop"""
    messages = [{"role": "user", "content": query}]

    while True:
        response = client.messages.create(
            model=MODEL,
            system=SYSTEM_PROMPT,
            messages=messages,
            tools=TOOLS,
            max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\nAgent: {block.text}")
            break

        results = []
        for block in response.content:
            if block.type == "tool_use":
                # 通过调度映射查找并执行对应的处理函数
                handler = TOOL_HANDLERS.get(block.name)
                if handler:
                    try:
                        output = handler(**block.input)
                    except Exception as e:
                        output = f"工具执行错误: {e}"
                else:
                    output = f"未知工具: {block.name}"
                print(f"  [{block.name}] → {output[:100]}")
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })

        messages.append({"role": "user", "content": results})

if __name__ == "__main__":
    agent_loop("创建一个 hello.py 文件,内容是打印 Hello World,然后读取该文件确认内容正确")

执行结果:

  [write_file] → 已写入 28 个字符到 hello.py
  [read_file] → print("Hello World")

Agent: 我已经创建了 `hello.py` 文件并确认内容正确。文件内容为 `print("Hello World")`。

练习题: 1. 添加一个新工具 list_files,功能是列出指定目录下的所有文件。只需定义 run_list_files 函数并在 TOOL_HANDLERS 中添加条目。 2. 修改 safe_path 函数,添加对隐藏文件(以 . 开头)的访问限制。


第二部分:进阶篇

2.1 Permission Pipeline(权限管道)— s07

概念讲解:

在 1.1 和 1.2 节中,Agent 可以无条件执行任何工具调用。但真实场景中,AI 模型可能会执行危险操作——删除文件、执行 sudo 命令、读取敏感文件。权限管道(Permission Pipeline)在工具调用和实际执行之间插入一个多层安全检查。

权限管道不是简单的"是/否"门,而是一个四阶段决策管道:

  1. 拒绝规则(Deny Rules):硬编码的安全边界,永远不可绕过。例如阻止 rm -rf /sudo 等危险命令。
  2. 模式决策(Mode-Based Decisions):根据当前运行模式决定。plan 模式阻止所有写操作;auto 模式自动允许读操作;default 模式对每个操作询问用户。
  3. 允许规则(Allow Rules):用户之前选择"总是允许"的操作,运行时动态添加。
  4. 交互式审批(Interactive Approval):前三阶段无法自动决策时,询问用户。提供 Yes / No / Always 三个选项。

断路器机制:连续 3 次拒绝后建议切换到 plan 模式,防止用户和 Agent 反复拉锯。

代码示例:

# 基于 PyShine 深度分析文章
# 在 s02 基础上添加权限管道(s07 核心逻辑)

import fnmatch

class PermissionGate:
    """四阶段权限决策管道"""

    def __init__(self):
        # Stage 1: 拒绝规则(不可绕过的硬安全边界)
        self.deny_rules = [
            {"tool": "bash", "pattern": "rm -rf /"},
            {"tool": "bash", "pattern": "rm -rf /*"},
            {"tool": "bash", "pattern": "sudo *"},
            {"tool": "bash", "pattern": "chmod 777 *"},
            {"tool": "read_file", "pattern": "/etc/shadow"},
            {"tool": "read_file", "pattern": "*/.env"},
        ]
        # Stage 3: 允许规则(用户选择"总是允许"的操作)
        self.allow_rules = []
        # 当前运行模式:default / plan / auto
        self.mode = "default"
        # 连续拒绝计数器(用于断路器)
        self.consecutive_denials = 0

    def _matches(self, rule: dict, tool_name: str, tool_input: dict) -> bool:
        """检查工具调用是否匹配某条规则"""
        if rule["tool"] != tool_name:
            return False
        # 对 bash 工具匹配命令,对文件工具匹配路径
        target = tool_input.get("command", tool_input.get("path", ""))
        return fnmatch.fnmatch(target, rule["pattern"])

    def check(self, tool_name: str, tool_input: dict) -> tuple:
        """
        四阶段权限检查管道
        返回: (decision: str, reason: str)
        decision: "allow" | "deny" | "ask"
        """
        # Stage 1: 拒绝规则(最高优先级,不可绕过)
        for rule in self.deny_rules:
            if self._matches(rule, tool_name, tool_input):
                self.consecutive_denials += 1
                return ("deny", f"被拒绝规则阻止: {rule['pattern']}")

        # Stage 2: 模式决策
        if self.mode == "plan":
            # plan 模式:只允许读操作
            if tool_name in ("write_file", "edit_file", "bash"):
                self.consecutive_denials += 1
                return ("deny", "plan 模式下禁止写操作")
        elif self.mode == "auto":
            # auto 模式:自动允许读操作
            if tool_name in ("read_file", "bash") and tool_name == "read_file":
                self.consecutive_denials = 0
                return ("allow", "auto 模式自动允许读操作")

        # Stage 3: 允许规则
        for rule in self.allow_rules:
            if self._matches(rule, tool_name, tool_input):
                self.consecutive_denials = 0
                return ("allow", f"被允许规则通过: {rule['pattern']}")

        # Stage 4: 交互式用户审批
        self.consecutive_denials += 1

        # 断路器:连续 3 次拒绝后建议切换模式
        if self.consecutive_denials >= 3:
            return ("deny", "连续多次拒绝,建议切换到 plan 模式")

        return ("ask", "需要用户确认")

    def add_allow_rule(self, tool_name: str, pattern: str):
        """添加允许规则(用户选择"总是允许"时调用)"""
        self.allow_rules.append({"tool": tool_name, "pattern": pattern})


# 在 agent_loop 中集成权限管道
def agent_loop_with_permission(query: str, mode: str = "default"):
    """带权限管道的 Agent Loop"""
    messages = [{"role": "user", "content": query}]
    perm = PermissionGate()
    perm.mode = mode

    while True:
        response = client.messages.create(
            model=MODEL, system=SYSTEM_PROMPT,
            messages=messages, tools=TOOLS, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\nAgent: {block.text}")
            break

        results = []
        for block in response.content:
            if block.type == "tool_use":
                # 在执行工具前进行权限检查
                decision, reason = perm.check(block.name, block.input)

                if decision == "deny":
                    # 权限拒绝:将拒绝原因作为工具结果返回给模型
                    print(f"  [权限拒绝] {block.name}: {reason}")
                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": f"权限被拒绝: {reason}",
                        "is_error": True,
                    })
                elif decision == "ask":
                    # 交互式确认(简化版:这里直接允许)
                    print(f"  [需要确认] {block.name}({block.input})")
                    user_choice = input("  允许执行?(y/n/always): ").strip().lower()
                    if user_choice == "always":
                        pattern = block.input.get("command", block.input.get("path", ""))
                        perm.add_allow_rule(block.name, pattern)
                    if user_choice in ("y", "always"):
                        handler = TOOL_HANDLERS.get(block.name)
                        output = handler(**block.input) if handler else f"未知工具: {block.name}"
                    else:
                        output = "用户拒绝执行"
                else:
                    # 权限允许:正常执行工具
                    handler = TOOL_HANDLERS.get(block.name)
                    output = handler(**block.input) if handler else f"未知工具: {block.name}"

                print(f"  [{block.name}] → {output[:100]}")
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })

        messages.append({"role": "user", "content": results})

if __name__ == "__main__":
    # 以 default 模式运行,危险命令会被拒绝
    agent_loop_with_permission(
        "请执行 sudo apt update 然后创建文件",
        mode="default"
    )

执行结果:

  [权限拒绝] bash: 被拒绝规则阻止: sudo *
  [需要确认] write_file({'path': 'test.txt', 'content': 'hello'})
  允许执行?(y/n/always): y
  [write_file] → 已写入 5 个字符到 test.txt

Agent: sudo 命令被安全规则阻止了。我已经创建了 test.txt 文件。

注意事项: - 拒绝规则的优先级永远最高,即使用户选择"总是允许"也无法绕过拒绝规则 - plan 模式适合只读探索场景,确保 Agent 不会修改任何文件 - 断路器机制防止 Agent 和用户之间无限循环的权限拉锯

练习题: 1. 添加更多拒绝规则,例如阻止 curl | bash 模式和 dd if= 模式。 2. 实现 auto 模式,让读操作自动通过,只对写操作弹出确认。


2.2 Two-Layer Skill Model(两层技能模型)— s05

概念讲解:

技能系统解决了一个核心矛盾:如何在不膨胀上下文窗口的前提下,为 Agent 提供领域专业知识?

如果将所有技能的完整指令都放入系统提示,10 个技能可能需要 20,000 tokens,严重挤压可用于实际对话的空间。两层技能模型通过"懒加载(Lazy Loading)"解决了这个问题:

  • Layer 1(廉价广告):始终存在于系统提示中,每个技能只有名称和一行描述,约 100 tokens。让模型知道有哪些可用能力。
  • Layer 2(按需加载):当模型需要某个技能时,通过工具调用 load_skill("git") 触发加载,系统返回完整的 SKILL.md 内容(约 2,000 tokens)。

这样,10 个技能在系统提示中只占约 1,000 tokens(广告),运行时只加载当前需要的 1-2 个技能。

代码示例:

# 基于 PyShine 深度分析文章
# 两层技能加载模型(s05 核心逻辑)

import os
from pathlib import Path

class SkillRegistry:
    """技能注册表:管理技能的发现和加载"""

    def __init__(self, skills_dir: str = "skills"):
        self.skills_dir = Path(skills_dir)
        self.skills = {}  # 技能名称 → 技能元数据
        self._discover_skills()

    def _discover_skills(self):
        """自动发现 skills/ 目录下的所有技能"""
        if not self.skills_dir.exists():
            return
        for skill_dir in self.skills_dir.iterdir():
            if skill_dir.is_dir():
                skill_file = skill_dir / "SKILL.md"
                if skill_file.exists():
                    # 解析 YAML frontmatter 获取元数据
                    content = skill_file.read_text(encoding="utf-8")
                    name, description = self._parse_frontmatter(content)
                    if name:
                        self.skills[name] = {
                            "name": name,
                            "description": description,
                            "file": skill_file,
                        }

    def _parse_frontmatter(self, content: str) -> tuple:
        """解析 SKILL.md 的 YAML frontmatter"""
        lines = content.split("\n")
        if lines[0].strip() != "---":
            return (None, None)
        # 简化的 frontmatter 解析
        name = None
        description = None
        in_frontmatter = True
        for i, line in enumerate(lines[1:], 1):
            if line.strip() == "---":
                break
            if line.startswith("name:"):
                name = line.split(":", 1)[1].strip()
            elif line.startswith("description:"):
                description = line.split(":", 1)[1].strip()
        return (name, description)

    def get_ads_text(self) -> str:
        """
        Layer 1: 生成廉价的技能广告文本
        始终存在于系统提示中,每个技能约 100 tokens
        """
        if not self.skills:
            return ""
        ads = ["你可以使用以下技能(使用 load_skill 工具加载):"]
        for skill in self.skills.values():
            ads.append(f"- {skill['name']}: {skill['description']}")
        return "\n".join(ads)

    def load_skill(self, name: str) -> str:
        """
        Layer 2: 加载技能的完整内容
        只在模型调用 load_skill 工具时触发
        """
        skill = self.skills.get(name)
        if not skill:
            return f"错误:未找到技能 '{name}'"
        content = skill["file"].read_text(encoding="utf-8")
        print(f"  [技能加载] {name} ({len(content)} 字符)")
        return content


# 创建示例技能文件(实际使用时放在 skills/ 目录下)
def create_demo_skills():
    """创建演示用的技能文件"""
    skills_dir = Path("skills")

    # Git 技能
    git_dir = skills_dir / "git"
    git_dir.mkdir(parents=True, exist_ok=True)
    (git_dir / "SKILL.md").write_text("""---
name: git
description: Git 版本控制操作的最佳实践和命令模板
---

# Git 技能

## 提交规范
- 使用约定式提交(Conventional Commits)
- 格式:type(scope): description
- type 可选值:feat, fix, docs, style, refactor, test, chore

## 分支策略
- main: 生产分支,只接受 PR
- develop: 开发分支
- feature/*: 功能分支

## 常用命令
- git add -p: 交互式暂存
- git commit --amend: 修改最近一次提交(仅在未推送时)
""", encoding="utf-8")

    # 测试技能
    test_dir = skills_dir / "test"
    test_dir.mkdir(parents=True, exist_ok=True)
    (test_dir / "SKILL.md").write_text("""---
name: test
description: 测试框架选择和测试编写最佳实践
---

# 测试技能

## 框架选择
- Python: pytest(推荐)、unittest
- JavaScript: Jest、Vitest

## 测试原则
- 测试行为,不测试实现
- 每个测试只验证一个行为
- 使用 AAA 模式:Arrange, Act, Assert
""", encoding="utf-8")

    return skills_dir


# 在 agent_loop 中集成技能系统
def agent_loop_with_skills(query: str):
    """带技能加载的 Agent Loop"""
    # 创建演示技能
    skills_dir = create_demo_skills()
    registry = SkillRegistry(str(skills_dir))

    # 将技能广告加入系统提示(Layer 1)
    skill_ads = registry.get_ads_text()
    system_prompt = f"{SYSTEM_PROMPT}\n\n{skill_ads}"

    # 添加 load_skill 工具定义
    tools_with_skills = TOOLS + [
        {
            "name": "load_skill",
            "description": "加载指定技能的完整指令内容",
            "input_schema": {
                "type": "object",
                "properties": {
                    "name": {
                        "type": "string",
                        "description": "要加载的技能名称"
                    }
                },
                "required": ["name"]
            }
        }
    ]

    # 添加 load_skill 到调度映射
    tool_handlers = dict(TOOL_HANDLERS)
    tool_handlers["load_skill"] = lambda **kw: registry.load_skill(kw["name"])

    messages = [{"role": "user", "content": query}]

    while True:
        response = client.messages.create(
            model=MODEL, system=system_prompt,
            messages=messages, tools=tools_with_skills, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\nAgent: {block.text}")
            break

        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = tool_handlers.get(block.name)
                if handler:
                    output = handler(**block.input)
                else:
                    output = f"未知工具: {block.name}"
                print(f"  [{block.name}] → {output[:100]}")
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })

        messages.append({"role": "user", "content": results})


if __name__ == "__main__":
    agent_loop_with_skills("帮我创建一个 git commit,使用约定式提交格式")

执行结果:

  [技能加载] git (412 字符)
  [load_skill] → # Git 技能...

Agent: 根据约定式提交格式,我建议使用以下 commit message...

注意事项: - 技能广告(Layer 1)始终在系统提示中,应保持简洁,每个技能不超过一行 - 技能文件使用 YAML frontmatter 格式,便于解析元数据 - 加载的技能内容会占用上下文窗口,应避免一次性加载过多技能

练习题: 1. 创建一个 review 技能,包含代码审查的最佳实践,并测试加载。 2. 修改 _discover_skills 方法,支持嵌套目录中的技能(如 skills/python/testing/SKILL.md)。


2.3 Subagent(子代理)— s04

概念讲解:

当主 Agent 需要调查一个分支问题(如"这个函数在哪里被调用?")时,直接在主对话中探索会污染 messages[]——大量搜索结果和中间步骤会让主 Agent 的上下文变得臃肿。

子代理(Subagent)解决了这个问题。它是由主 Agent 派生的临时执行单元,拥有独立的 messages[]。子代理在干净的上下文中工作,完成后只将摘要写回主对话,不污染主 Agent 的上下文。

核心机制: - 主 Agent 调用 dispatch_subagent("查找 foo 函数的所有调用点") - 系统创建一个新的 Agent Loop 实例,拥有空的 messages[] - 子代理执行完毕后,将结果摘要返回给主 Agent - 子代理的完整对话历史被丢弃,只保留摘要

代码示例:

# 基于 GitHub s00-architecture-overview.md
# 子代理派生机制(s04 核心逻辑)

def dispatch_subagent(task: str, max_turns: int = 5) -> str:
    """
    派生一个子代理执行任务,返回结果摘要
    子代理拥有独立的 messages[],不影响主对话
    """
    # 创建子代理的独立对话历史
    sub_messages = [{
        "role": "user",
        "content": f"请完成以下任务并返回简洁的摘要:\n{task}"
    }]

    for turn in range(max_turns):
        response = client.messages.create(
            model=MODEL,
            system="你是一个专注于完成特定任务的助手。完成后返回简洁摘要。",
            messages=sub_messages,
            tools=TOOLS,  # 子代理可以使用相同的工具集
            max_tokens=4000,
        )
        sub_messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            # 子代理完成,提取摘要
            summary = ""
            for block in response.content:
                if hasattr(block, "text"):
                    summary += block.text
            return summary

        # 执行子代理的工具调用
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                output = handler(**block.input) if handler else f"未知工具"
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        sub_messages.append({"role": "user", "content": results})

    return "子代理达到最大循环次数,任务未完成"


def agent_loop_with_subagent(query: str):
    """带子代理派生的 Agent Loop"""
    # 将 dispatch_subagent 添加为可用工具
    tools_with_sub = TOOLS + [
        {
            "name": "dispatch_subagent",
            "description": "派生一个子代理执行探索性任务,返回结果摘要。"
                          "适用于代码分析、文件搜索等不影响主对话的分支任务。",
            "input_schema": {
                "type": "object",
                "properties": {
                    "task": {
                        "type": "string",
                        "description": "要子代理执行的任务描述"
                    }
                },
                "required": ["task"]
            }
        }
    ]

    tool_handlers = dict(TOOL_HANDLERS)
    tool_handlers["dispatch_subagent"] = lambda **kw: dispatch_subagent(kw["task"])

    messages = [{"role": "user", "content": query}]

    while True:
        response = client.messages.create(
            model=MODEL, system=SYSTEM_PROMPT,
            messages=messages, tools=tools_with_sub, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\nAgent: {block.text}")
            break

        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = tool_handlers.get(block.name)
                if handler:
                    output = handler(**block.input)
                else:
                    output = f"未知工具: {block.name}"
                print(f"  [{block.name}] → {output[:150]}")
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })

        messages.append({"role": "user", "content": results})


if __name__ == "__main__":
    agent_loop_with_subagent(
        "分析当前项目结构,找出所有 Python 文件的依赖关系"
    )

执行结果:

  [dispatch_subagent] → 子代理分析结果:当前项目共有 5 个 Python 文件。main.py 依赖 agent.py 和 tools.py;agent.py 依赖 anthropic SDK;tools.py 依赖 subprocess...

Agent: 根据子代理的分析,当前项目的依赖关系如下...

练习题: 1. 修改 dispatch_subagent,添加子代理的独立权限模式(如子代理只能在只读模式下运行)。 2. 实现子代理的嵌套调用限制(防止子代理再派生子代理导致无限递归)。


第三部分:高级篇

3.1 Multi-Agent Team Communication(多代理团队通信)— s15

概念讲解:

子代理(s04)是一次性的——创建、工作、返回摘要、消失。但大型项目需要持久化的多代理协作:前端 Agent、后端 Agent、测试 Agent 同时工作,互相通信。

多代理团队通信通过 JSONL 收件箱(Inbox)文件实现。每个队友拥有独立的收件箱文件,通信通过追加消息(Append)实现,读取时排空(Drain-on-Read)。这种设计确保: - 消息不会丢失(文件持久化) - 消息不会被重复处理(读取后清空) - 多个 Agent 可以并发写入(追加模式天然支持)

注意事项: - Drain-on-Read 模式意味着消息读取后立即清空,不适合需要重复消费的场景 - JSONL 文件不支持事务,高并发场景下可能出现竞态条件 - 生产环境应考虑使用消息队列替代文件系统

代码示例:

# 基于 PyShine 深度分析文章
# JSONL 收件箱通信(s15-s16 核心逻辑)

import json
import time
from pathlib import Path

class TeamBus:
    """通过 JSONL 文件实现的团队通信总线"""

    def __init__(self, team_dir: str = "team_workspace"):
        self.dir = Path(team_dir) / "inbox"
        self.dir.mkdir(parents=True, exist_ok=True)

    def send(self, sender: str, to: str, content: str, msg_type: str = "message"):
        """向队友发送消息(追加到收件箱)"""
        msg = {
            "type": msg_type,
            "from": sender,
            "content": content,
            "timestamp": time.time(),
        }
        inbox_path = self.dir / f"{to}.jsonl"
        with open(inbox_path, "a", encoding="utf-8") as f:
            f.write(json.dumps(msg, ensure_ascii=False) + "\n")
        print(f"  [消息] {sender} → {to}: {content[:50]}...")

    def read_inbox(self, name: str) -> list:
        """读取并排空收件箱(Drain-on-Read 模式)"""
        path = self.dir / f"{name}.jsonl"
        if not path.exists() or path.stat().st_size == 0:
            return []
        # 读取所有消息
        with open(path, "r", encoding="utf-8") as f:
            msgs = [json.loads(line) for line in f if line.strip()]
        # 排空收件箱
        path.write_text("", encoding="utf-8")
        return msgs


class TeamMember:
    """持久化的团队成员"""

    def __init__(self, name: str, role: str, bus: TeamBus):
        self.name = name
        self.role = role
        self.bus = bus

    def check_messages(self) -> list:
        """检查收件箱中的新消息"""
        return self.bus.read_inbox(self.name)

    def send_to(self, to: str, content: str, msg_type: str = "message"):
        """向其他队友发送消息"""
        self.bus.send(self.name, to, content, msg_type)

    def process_messages(self):
        """处理收件箱中的消息并返回摘要"""
        msgs = self.check_messages()
        if not msgs:
            print(f"  [{self.name}] 收件箱为空")
            return
        print(f"  [{self.name}] 收到 {len(msgs)} 条消息")
        for msg in msgs:
            print(f"    来自 {msg['from']}: {msg['content'][:80]}")


# 演示多代理团队协作
if __name__ == "__main__":
    # 创建团队通信总线
    bus = TeamBus("team_workspace")

    # 创建团队成员
    frontend = TeamMember("frontend_agent", "前端开发", bus)
    backend = TeamMember("backend_agent", "后端开发", bus)
    tester = TeamMember("test_agent", "测试工程师", bus)

    # 模拟团队协作流程
    print("=== 多代理团队协作演示 ===")

    # 后端 Agent 完成工作后通知前端
    backend.send_to("frontend_agent", "API 接口已完成:GET /api/users,返回 JSON 格式")
    backend.send_to("test_agent", "API 接口已就绪,请编写集成测试")

    # 前端 Agent 检查消息
    print("\n--- 前端 Agent 检查消息 ---")
    frontend.process_messages()

    # 测试 Agent 检查消息
    print("\n--- 测试 Agent 检查消息 ---")
    tester.process_messages()

    # 测试 Agent 通知后端
    tester.send_to("backend_agent", "发现 /api/users 返回 500 错误,请检查")

    # 后端 Agent 检查消息
    print("\n--- 后端 Agent 检查消息 ---")
    backend.process_messages()

    # 清理工作空间
    import shutil
    shutil.rmtree("team_workspace", ignore_errors=True)

执行结果:

=== 多代理团队协作演示 ===
  [消息] backend_agent → frontend_agent: API 接口已完成:GET /api/users,返回 JSON 格式...
  [消息] backend_agent → test_agent: API 接口已就绪,请编写集成测试...

--- 前端 Agent 检查消息 ---
  [frontend_agent] 收到 1 条消息
    来自 backend_agent: API 接口已完成:GET /api/users,返回 JSON 格式

--- 测试 Agent 检查消息 ---
  [test_agent] 收到 1 条消息
    来自 backend_agent: API 接口已就绪,请编写集成测试
  [消息] test_agent → backend_agent: 发现 /api/users 返回 500 错误,请检查...

--- 后端 Agent 检查消息 ---
  [backend_agent] 收到 1 条消息
    来自 test_agent: 发现 /api/users 返回 500 错误,请检查

3.2 Context Compaction(上下文压缩)— s06

概念讲解:

随着 Agent 执行的任务增多,messages[] 会持续增长。如果对话历史超过 LLM 的上下文窗口限制(如 200K tokens),Agent 将无法继续工作。

Context Compaction(上下文压缩)在对话历史接近限制时触发,将历史对话压缩为摘要。压缩后保留核心上下文(用户的目标、已完成的操作、关键的文件变更),丢弃冗余细节(中间步骤、重复的搜索结果、大量工具输出)。

关键数据结构: - CompactSummary:压缩后的摘要,替代原始对话历史 - PersistedOutput:已持久化到磁盘的输出(如写入的文件),不需要在对话中保留

注意事项: - 压缩是不可逆的:压缩后无法恢复原始对话细节 - 压缩本身需要消耗一次 LLM 调用来生成摘要 - 压缩时机很重要:过早压缩会丢失上下文,过晚可能导致超出限制

代码示例:

# 基于 GitHub s00-architecture-overview.md
# 上下文压缩(s06 核心逻辑)

def estimate_tokens(messages: list) -> int:
    """估算消息列表的 token 数量(简化版:按字符数 / 4 估算)"""
    total_chars = 0
    for msg in messages:
        content = msg.get("content", "")
        if isinstance(content, str):
            total_chars += len(content)
        elif isinstance(content, list):
            for block in content:
                if isinstance(block, dict):
                    total_chars += len(str(block.get("content", "")))
    # 粗略估算:1 token ≈ 4 个字符
    return total_chars // 4

def compact_messages(messages: list, max_tokens: int = 50000) -> list:
    """
    上下文压缩:当对话历史过长时,压缩为摘要
    保留第一条消息(用户原始需求)和最后几轮对话
    """
    current_tokens = estimate_tokens(messages)
    if current_tokens < max_tokens:
        return messages  # 未超限,不需要压缩

    print(f"  [上下文压缩] {current_tokens} tokens → 开始压缩...")

    # 提取需要压缩的中间消息
    system_summary = "以下是之前的对话摘要:\n"
    for msg in messages[1:-4]:  # 保留第一条和最后 4 条
        role = msg.get("role", "")
        content = msg.get("content", "")
        if isinstance(content, str):
            system_summary += f"- {role}: {content[:200]}\n"

    # 使用 LLM 生成压缩摘要
    summary_response = client.messages.create(
        model=MODEL,
        system="你是一个对话摘要助手。请将以下对话历史压缩为简洁的摘要,"
               "保留关键信息:用户目标、已完成的操作、重要的发现。",
        messages=[{"role": "user", "content": system_summary}],
        max_tokens=2000,
    )
    summary = summary_response.content[0].text

    # 构建压缩后的消息列表
    compacted = [
        messages[0],  # 保留用户原始需求
        {
            "role": "user",
            "content": f"[上下文摘要]\n{summary}"
        },
    ]
    # 添加最后几轮对话(保持上下文连贯性)
    compacted.extend(messages[-4:])

    new_tokens = estimate_tokens(compacted)
    print(f"  [上下文压缩] 完成:{current_tokens} → {new_tokens} tokens")
    return compacted


def agent_loop_with_compact(query: str):
    """带上下文压缩的 Agent Loop"""
    messages = [{"role": "user", "content": query}]
    turn_count = 0

    while True:
        # 每轮开始前检查是否需要压缩
        messages = compact_messages(messages)

        response = client.messages.create(
            model=MODEL, system=SYSTEM_PROMPT,
            messages=messages, tools=TOOLS, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})
        turn_count += 1

        if response.stop_reason != "tool_use":
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\nAgent: {block.text}")
            break

        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                output = handler(**block.input) if handler else f"未知工具"
                print(f"  [Turn {turn_count}] {block.name} → {output[:80]}")
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })

        messages.append({"role": "user", "content": results})


if __name__ == "__main__":
    # 模拟长时间运行的任务(会产生大量对话历史)
    agent_loop_with_compact("分析当前目录下的所有 Python 文件,统计代码行数,并生成报告")

执行结果:

  [Turn 1] bash → ./agent.py...
  [Turn 2] bash → ./tools.py...
  [Turn 3] bash → ./main.py...
  [上下文压缩] 52000 tokens → 开始压缩...
  [上下文压缩] 完成:52000 → 8000 tokens
  [Turn 4] write_file → 已写入 234 个字符到 report.md...

Agent: 我已完成分析并生成了报告文件 report.md...

3.3 最佳实践

  1. 渐进式构建:按照 LearnClaudeCode 的 19 章顺序,每章只新增一个机制。不要跳章——后续章节的机制严格依赖前面章节。
  2. 状态放在哪里:理解 Agent 系统的关键是知道状态存储在哪里。messages[] 是对话状态,TOOL_HANDLERS 是工具注册状态,PermissionGate 是权限状态。
  3. Context Compaction 触发时机:不要等到超出 token 限制才压缩。设置一个安全阈值(如 80%),提前触发压缩。
  4. Subagent vs Teammate 选择:一次性探索任务用 Subagent;长期协作场景用 Teammate。
  5. 权限管道配置:生产环境建议使用 auto 模式(读操作自动通过,写操作确认),开发探索使用 plan 模式(只读)。

第四部分:实战项目

项目需求

构建一个 "智能代码审查助手"(Smart Code Reviewer)——一个能自动分析 Git 仓库中的代码变更、生成审查意见并写入报告文件的 AI Agent。项目综合运用以下知识点:

  • Agent Loop(1.1):核心循环驱动整个审查流程
  • Tool Dispatch Map(1.2):多种工具协同工作(bash 执行 git 命令、read_file 读取代码、write_file 写入报告)
  • Permission Pipeline(2.1):保护系统安全,阻止危险操作
  • Two-Layer Skill Model(2.2):按需加载代码审查技能
  • Subagent(2.3):将每个文件的审查任务派发给子代理

项目设计

架构设计:

main.py(主入口)
├── Agent Loop 驱动审查流程
├── Permission Pipeline 保护安全
├── Skill Registry 加载审查技能
└── Subagent 派发文件审查任务

审查流程:
1. 使用 bash 工具执行 git diff 获取变更
2. 解析变更文件列表
3. 对每个变更文件派发子代理审查
4. 汇总审查意见
5. 使用 write_file 写入审查报告

完整实现代码

# 基于 LearnClaudeCode 教学理念综合实现
# 智能代码审查助手 — 综合运用 5 个知识点

import anthropic
import subprocess
import os
import json
import fnmatch
from pathlib import Path

# ============================================================
# 基础配置
# ============================================================

client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
MODEL = "claude-sonnet-4-20250514"
WORKDIR = Path.cwd()

SYSTEM_PROMPT = """你是一个智能代码审查助手。
你的任务是分析 Git 仓库中的代码变更,生成专业的审查意见。

工作流程:
1. 使用 bash 工具执行 git diff 获取变更
2. 逐个分析变更的文件
3. 生成审查意见并写入报告文件

审查维度:
- 代码质量:命名、结构、可读性
- 潜在 Bug:空指针、边界条件、异常处理
- 安全问题:注入、敏感信息泄露
- 性能问题:不必要的循环、内存泄漏
- 最佳实践:DRY、SOLID 原则

使用 load_skill("review") 加载详细的审查指南。"""

# ============================================================
# 知识点 1 & 2:工具定义 + Tool Dispatch Map(1.1 + 1.2)
# ============================================================

TOOLS = [
    {
        "name": "bash",
        "description": "执行 Bash 命令",
        "input_schema": {
            "type": "object",
            "properties": {"command": {"type": "string"}},
            "required": ["command"]
        }
    },
    {
        "name": "read_file",
        "description": "读取文件内容",
        "input_schema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "limit": {"type": "integer"}
            },
            "required": ["path"]
        }
    },
    {
        "name": "write_file",
        "description": "写入文件",
        "input_schema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "content": {"type": "string"}
            },
            "required": ["path", "content"]
        }
    },
    {
        "name": "dispatch_subagent",
        "description": "派生子代理执行审查子任务",
        "input_schema": {
            "type": "object",
            "properties": {
                "task": {"type": "string"}
            },
            "required": ["task"]
        }
    },
    {
        "name": "load_skill",
        "description": "加载技能的完整内容",
        "input_schema": {
            "type": "object",
            "properties": {
                "name": {"type": "string"}
            },
            "required": ["name"]
        }
    },
]

def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"路径逃逸: {p}")
    return path

def run_bash(command: str) -> str:
    result = subprocess.run(
        command, shell=True, capture_output=True, text=True, timeout=30
    )
    return (result.stdout + result.stderr)[:30000]

def run_read(path: str, limit: int = None) -> str:
    p = safe_path(path)
    if not p.exists():
        return f"文件不存在: {path}"
    lines = p.read_text(encoding="utf-8").splitlines()
    if limit:
        lines = lines[:limit]
    return "\n".join(lines)

def run_write(path: str, content: str) -> str:
    p = safe_path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(content, encoding="utf-8")
    return f"已写入 {len(content)} 字符到 {path}"

# ============================================================
# 知识点 3:Permission Pipeline(2.1)
# ============================================================

class PermissionGate:
    """权限管道:保护系统免受危险操作"""

    def __init__(self, mode: str = "auto"):
        self.deny_rules = [
            {"tool": "bash", "pattern": "rm -rf /"},
            {"tool": "bash", "pattern": "sudo *"},
            {"tool": "bash", "pattern": "git push *"},
            {"tool": "bash", "pattern": "git reset --hard *"},
        ]
        self.allow_rules = [
            # 审查助手自动允许 git diff 和 git log 等只读命令
            {"tool": "bash", "pattern": "git diff*"},
            {"tool": "bash", "pattern": "git log*"},
            {"tool": "bash", "pattern": "git show*"},
            {"tool": "bash", "pattern": "find *"},
            {"tool": "bash", "pattern": "cat *"},
            {"tool": "bash", "pattern": "wc *"},
        ]
        self.mode = mode
        self.consecutive_denials = 0

    def _matches(self, rule: dict, tool_name: str, tool_input: dict) -> bool:
        if rule["tool"] != tool_name:
            return False
        target = tool_input.get("command", tool_input.get("path", ""))
        return fnmatch.fnmatch(target, rule["pattern"])

    def check(self, tool_name: str, tool_input: dict) -> tuple:
        # Stage 1: 拒绝规则
        for rule in self.deny_rules:
            if self._matches(rule, tool_name, tool_input):
                return ("deny", f"拒绝规则: {rule['pattern']}")

        # Stage 2: 模式决策
        if self.mode == "plan":
            if tool_name in ("write_file", "edit_file", "bash"):
                return ("deny", "plan 模式禁止写操作")

        # Stage 3: 允许规则
        for rule in self.allow_rules:
            if self._matches(rule, tool_name, tool_input):
                return ("allow", f"允许规则: {rule['pattern']}")

        # Stage 4: 询问用户
        return ("ask", "需要用户确认")

# ============================================================
# 知识点 4:Two-Layer Skill Model(2.2)
# ============================================================

class SkillRegistry:
    """技能注册表"""

    def __init__(self):
        self.skills = {
            "review": {
                "name": "review",
                "description": "代码审查最佳实践和检查清单",
                "content": """# 代码审查技能

## 审查检查清单

### 正确性
- [ ] 逻辑是否正确
- [ ] 边界条件是否处理
- [ ] 错误处理是否完善

### 安全性
- [ ] 是否有 SQL 注入风险
- [ ] 是否有 XSS 风险
- [ ] 敏感信息是否正确处理

### 性能
- [ ] 是否有不必要的循环
- [ ] 是否有 N+1 查询问题
- [ ] 内存使用是否合理

### 可维护性
- [ ] 命名是否清晰
- [ ] 函数是否过长
- [ ] 是否有重复代码
"""
            }
        }

    def get_ads_text(self) -> str:
        ads = ["可用技能(使用 load_skill 加载):"]
        for skill in self.skills.values():
            ads.append(f"- {skill['name']}: {skill['description']}")
        return "\n".join(ads)

    def load_skill(self, name: str) -> str:
        skill = self.skills.get(name)
        return skill["content"] if skill else f"未找到技能: {name}"

# ============================================================
# 知识点 5:Subagent(2.3)
# ============================================================

def dispatch_subagent(task: str, max_turns: int = 5) -> str:
    """派生子代理执行审查子任务"""
    sub_messages = [{
        "role": "user",
        "content": f"完成以下任务并返回简洁摘要:\n{task}"
    }]

    for _ in range(max_turns):
        response = client.messages.create(
            model=MODEL,
            system="你是代码审查专家。分析代码并返回审查意见摘要。",
            messages=sub_messages,
            tools=[
                TOOLS[0],  # bash
                TOOLS[1],  # read_file
            ],
            max_tokens=4000,
        )
        sub_messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            summary = ""
            for block in response.content:
                if hasattr(block, "text"):
                    summary += block.text
            return summary

        results = []
        for block in response.content:
            if block.type == "tool_use":
                if block.name == "bash":
                    output = run_bash(block.input["command"])
                elif block.name == "read_file":
                    output = run_read(block.input["path"], block.input.get("limit"))
                else:
                    output = "未知工具"
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        sub_messages.append({"role": "user", "content": results})

    return "子代理未完成任务"

# ============================================================
# 主循环:整合所有知识点
# ============================================================

def smart_code_reviewer(target: str = "HEAD~1"):
    """智能代码审查助手主函数"""
    # 初始化组件
    perm = PermissionGate(mode="auto")
    skills = SkillRegistry()
    skill_ads = skills.get_ads_text()
    system_prompt = f"{SYSTEM_PROMPT}\n\n{skill_ads}"

    # 构建调度映射
    tool_handlers = {
        "bash": lambda **kw: run_bash(kw["command"]),
        "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
        "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
        "dispatch_subagent": lambda **kw: dispatch_subagent(kw["task"]),
        "load_skill": lambda **kw: skills.load_skill(kw["name"]),
    }

    # 用户需求
    query = f"请审查 Git 仓库中 {target} 的代码变更。"
    query += "先获取变更列表,然后对每个变更文件进行审查,最后生成审查报告写入 review_report.md。"
    messages = [{"role": "user", "content": query}]

    print(f"=== 智能代码审查助手 ===")
    print(f"审查目标: {target}\n")

    while True:
        response = client.messages.create(
            model=MODEL,
            system=system_prompt,
            messages=messages,
            tools=TOOLS,
            max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\n审查完成: {block.text[:200]}")
            break

        results = []
        for block in response.content:
            if block.type == "tool_use":
                # 权限检查(知识点 3)
                decision, reason = perm.check(block.name, block.input)
                if decision == "deny":
                    output = f"权限拒绝: {reason}"
                    print(f"  [拒绝] {block.name}: {reason}")
                else:
                    handler = tool_handlers.get(block.name)
                    output = handler(**block.input) if handler else "未知工具"
                    print(f"  [{block.name}] → {output[:100]}")

                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })

        messages.append({"role": "user", "content": results})

if __name__ == "__main__":
    smart_code_reviewer("HEAD~1")

代码解析

代码段 使用的知识点 说明
while True 循环 + stop_reason 检测 Agent Loop(1.1) 核心循环驱动整个审查流程,模型决定何时调用工具、何时结束
TOOL_HANDLERS 字典 + tool_handlers 映射 Tool Dispatch Map(1.2) 5 个工具通过字典路由,主循环不需要知道具体工具的实现
PermissionGate.check() Permission Pipeline(2.1) 每个工具调用前进行四阶段安全检查,阻止 git push 等危险操作
SkillRegistry + load_skill 工具 Two-Layer Skill Model(2.2) 系统提示中只有技能广告,审查检查清单按需加载
dispatch_subagent() 函数 Subagent(2.3) 每个文件的审查在独立上下文中执行,不污染主对话历史

扩展挑战

  1. 添加 Context Compaction:当审查大量文件时,对话历史会快速膨胀。在 smart_code_reviewer 中集成 compact_messages 函数,在每轮循环开始前检查并压缩上下文。
  2. 多代理团队审查:使用 TeamBus 替代 dispatch_subagent,创建前端审查 Agent、后端审查 Agent、安全审查 Agent 三个持久化队友,并行审查不同类型的变更。
  3. Hook 系统:在工具执行前后添加 Hook 点(如 pre_tool_execpost_tool_exec),支持外部脚本对工具调用进行日志记录或干预。

第五部分:常见问题与排查指南

常见错误及解决方案

错误信息 原因 解决方案
anthropic.AuthenticationError: invalid x-api-key API 密钥未设置或已过期 运行 export ANTHROPIC_API_KEY="sk-ant-xxx" 设置密钥,或在代码中直接传入 api_key 参数
Path escapes workspace: ../../../etc/passwd safe_path 检测到路径逃逸 这是正常的安全保护行为。确保工具输入中的路径是相对于工作目录的合法路径
subprocess.TimeoutExpired: Command timed out Bash 命令执行超过 30 秒 增加 timeout 参数值,或优化命令避免长时间运行的操作
ValueError: 路径逃逸! 模型尝试访问工作目录外的文件 检查系统提示是否明确告知 Agent 工作目录的范围
响应内容中没有 tool_use 块 模型直接返回文本而未调用工具 检查 TOOLS 定义是否正确,确保 input_schema 格式符合 JSON Schema 规范
工具执行错误: 'command' 工具输入参数名不匹配 确认 JSON Schema 中的 properties 名称与 TOOL_HANDLERS 中的 kw["xxx"] 一致
ConnectionError 网络连接问题或 API 端点不可达 检查网络连接,确认是否需要设置代理(HTTP_PROXY/HTTPS_PROXY
RateLimitError: Too many requests API 调用频率超过限制 添加请求间隔(time.sleep(1)),或升级 API 计划提高速率限制
上下文压缩后丢失关键信息 压缩摘要不够全面 优化压缩提示,明确要求保留用户原始需求和已完成操作列表
子代理达到最大循环次数 子代理任务过于复杂 增加 max_turns 参数,或将任务拆分为更小的子任务

调试技巧

  1. 打印 messages[] 长度和 token 估算:在每轮循环开始时打印 len(messages)estimate_tokens(messages),帮助理解上下文增长趋势。如果 token 数快速增长,说明工具输出过大,需要截断或触发压缩。
  2. 使用 plan 模式隔离问题:当 Agent 行为异常时,切换到 plan 模式(只读),让 Agent 先分析问题并给出计划,不执行任何写操作。确认计划正确后再切回 auto 模式执行。
  3. 单独测试工具函数:不要在 Agent Loop 中调试工具。单独调用 run_bash("ls")run_read("test.txt") 等函数确认行为正确,然后再集成到 Agent Loop 中。

第六部分:学习路线推荐

官方文档推荐阅读顺序

  1. s00-architecture-overview.md — 全局大图。先建立对整体架构的认知,理解 19 章的学习路径和每章的位置。重点关注:全局大图、章节速查表、关键状态列表。
  2. s01-agent-loop.md — Agent Loop。30 行代码的核心循环,是后续所有章节的基础。务必动手实现并理解 stop_reason 检测机制。
  3. s02-tool-dispatch.md — 工具调度映射。理解 dispatch map 如何让 Agent Loop 解耦于具体工具。
  4. s03-todo.md — Todo/Planning。理解如何让 Agent 管理自己的任务清单。
  5. s04-subagent.md — 子代理派生。理解上下文隔离的核心机制。
  6. s05-skill.md — 两层技能模型。理解懒加载在 Agent 系统中的应用。
  7. s06-compact.md — 上下文压缩。理解如何处理上下文窗口限制。
  8. s07-permission.md — 权限管道。理解四阶段安全决策管道。
  9. s08-hook.md — Hook 系统。理解不修改主循环即可扩展行为的机制。
  10. s09-s12 及后续章节 — 按顺序完成剩余章节,每章建立在前一章基础上。

推荐进阶资源

  • GitHub 仓库 mini-claude-code — 5 个版本的渐进式完整 Agent 实现(约 1100 行),从最简 Agent 到功能完备系统,适合对照学习。
  • PyShine 深度分析文章 — 第三方深度技术分析,包含详细的代码示例和架构图,适合在完成 s01-s06 后阅读以巩固理解。
  • Anthropic 官方 Claude Code 文档 — 了解 LearnClaudeCode 教学目标的真实产品。对比教学实现和生产实现,加深对设计取舍的理解。

信息来源与版本说明