AI 长任务最佳实践 - 完整学习教程
AI 长任务最佳实践 - 完整学习教程
教程级别: 从零到一 预计学习时间: 6-8 小时 前置知识: Python 基础(函数、类、异步/await、字典操作)、命令行基本操作(Git 基础命令)、AI Agent 基本概念(了解 LLM、Prompt、上下文窗口)
环境搭建指南
系统要求
- 操作系统:Linux / macOS / Windows(WSL2)
- Python 3.10+
- Git 2.30+
- Docker(可选,用于容器级 C/R 实验)
安装步骤
# 1. 克隆教程配套仓库(创建项目目录)
mkdir long-running-agent-lab && cd long-running-agent-lab
git init
# 2. 创建 Python 虚拟环境
python3 -m venv venv
source venv/bin/activate
# 3. 安装依赖
pip install langgraph langchain-core langgraph-checkpoint-sqlite
# 4. 验证安装
python3 -c "import langgraph; print('LangGraph 版本:', langgraph.__version__)"
验证安装
# 确认 Python 版本
python3 --version
# Python 3.10.x 或更高
# 确认 Git 可用
git --version
# git version 2.30.x 或更高
# 确认 LangGraph 安装成功
python3 -c "from langgraph.graph import StateGraph; print('LangGraph 就绪')"
注意事项: - 本教程是方法论教程,不需要特定的 AI 模型 API 密钥即可学习核心概念 - 部分代码示例使用模拟数据演示模式,实际使用时替换为真实 LLM API 调用 - Docker 用于高级篇的容器级检查点实验,入门和进阶篇不需要
第一部分:入门篇
1.1 理解长任务 Agent 的核心挑战
概念讲解:
AI Agent 在处理简单任务时(如"写一个排序函数"),可以在单个上下文窗口内完成。但当任务变得复杂(如"构建一个完整的 Web 应用"),Agent 会面临两个根本性问题:
- 上下文窗口有限:即使 200K tokens 的窗口也装不下一个完整项目的代码和上下文。Agent 必须跨多个窗口工作。
- 每个新窗口是"失忆"的:新窗口的 Agent 实例没有任何之前工作的记忆。它不知道之前做了什么、还剩什么没做、哪些代码是正确的。
这就像一个建筑工地,每天换一批新的工人,但他们之间没有任何交接记录。Anthropic 的解决方案是:让每个"班次"的 Agent 留下详细的交接文档。
代码示例:
# 基于 Anthropic Harness 模式
# 文件名:01-understand-the-problem.py
# 演示:没有交接机制时,Agent 如何重复工作和遗漏任务
import json
# 模拟一个需要完成 6 个功能的项目
ALL_FEATURES = [
{"id": 1, "name": "用户登录"},
{"id": 2, "name": "用户注册"},
{"id": 3, "name": "密码重置"},
{"id": 4, "name": "个人资料页"},
{"id": 5, "name": "搜索功能"},
{"id": 6, "name": "通知系统"},
]
# 场景 1:没有进度追踪(每个窗口的 Agent 不知道之前做了什么)
def agent_session_no_tracking(session_id, completed=None):
"""模拟没有进度追踪的 Agent 会话"""
if completed is None:
completed = []
# Agent 从头开始,不知道哪些已完成
# 结果:可能重复已完成的功能,或遗漏功能
print(f"\n--- 会话 {session_id}(无进度追踪)---")
print(f" Agent 看到的任务列表: {[f['name'] for f in ALL_FEATURES[:3]]}")
print(f" Agent 选择完成: {ALL_FEATURES[(session_id - 1) % len(ALL_FEATURES)]['name']}")
print(f" 问题: Agent 不知道之前完成了什么,可能重复工作")
# 场景 2:有进度追踪(每个窗口的 Agent 知道当前状态)
def agent_session_with_tracking(session_id, progress_file):
"""模拟有进度追踪的 Agent 会话"""
# Agent 读取进度文件,了解当前状态
with open(progress_file, "r") as f:
progress = json.load(f)
# 找到下一个未完成的功能
next_feature = None
for feature in progress["features"]:
if not feature["passes"]:
next_feature = feature
break
if next_feature is None:
print(f"\n--- 会话 {session_id} ---")
print(f" 所有功能已完成!")
return progress
print(f"\n--- 会话 {session_id}(有进度追踪)---")
print(f" 已完成: {sum(1 for f in progress['features'] if f['passes'])}/{len(progress['features'])}")
print(f" 下一个任务: #{next_feature['id']} {next_feature['name']}")
# 模拟完成功能
next_feature["passes"] = True
progress["completed"] += 1
# 保存进度
with open(progress_file, "w") as f:
json.dump(progress, f, indent=2, ensure_ascii=False)
print(f" ✓ 完成功能 #{next_feature['id']}")
print(f" 进度已保存到 {progress_file}")
return progress
# 运行对比演示
print("=" * 60)
print("场景对比:无进度追踪 vs 有进度追踪")
print("=" * 60)
# 无进度追踪
for i in range(1, 4):
agent_session_no_tracking(i)
# 有进度追踪
import tempfile
import os
progress_file = os.path.join(tempfile.gettempdir(), "progress.json")
initial_progress = {
"features": [
{"id": f["id"], "name": f["name"], "passes": False, "fails": []}
for f in ALL_FEATURES
],
"total": len(ALL_FEATURES),
"completed": 0,
}
with open(progress_file, "w") as f:
json.dump(initial_progress, f, indent=2, ensure_ascii=False)
for i in range(1, 7):
result = agent_session_with_tracking(i, progress_file)
# 清理
os.remove(progress_file)
# 运行示例
python3 01-understand-the-problem.py
执行结果:
============================================================
场景对比:无进度追踪 vs 有进度追踪
============================================================
--- 会话 1(无进度追踪)---
Agent 看到的任务列表: ['用户登录', '用户注册', '密码重置']
Agent 选择完成: 用户登录
问题: Agent 不知道之前完成了什么,可能重复工作
--- 会话 2(无进度追踪)---
Agent 看到的任务列表: ['用户登录', '用户注册', '密码重置']
Agent 选择完成: 用户注册
问题: Agent 不知道之前完成了什么,可能重复工作
--- 会话 3(无进度追踪)---
Agent 看到的任务列表: ['用户登录', '用户注册', '密码重置']
Agent 选择完成: 密码重置
问题: Agent 不知道之前完成了什么,可能重复工作
--- 会话 1(有进度追踪)---
已完成: 0/6
下一个任务: #1 用户登录
✓ 完成功能 #1
进度已保存到 /tmp/progress.json
--- 会话 2(有进度追踪)---
已完成: 1/6
下一个任务: #2 用户注册
✓ 完成功能 #2
进度已保存到 /tmp/progress.json
--- 会话 3(有进度追踪)---
已完成: 2/6
下一个任务: #3 密码重置
✓ 完成功能 #3
进度已保存到 /tmp/progress.json
--- 会话 4(有进度追踪)---
已完成: 3/6
下一个任务: #4 个人资料页
✓ 完成功能 #4
进度已保存到 /tmp/progress.json
--- 会话 5(有进度追踪)---
已完成: 4/6
下一个任务: #5 搜索功能
✓ 完成功能 #5
进度已保存到 /tmp/progress.json
--- 会话 6(有进度追踪)---
已完成: 5/6
下一个任务: #6 通知系统
✓ 完成功能 #6
进度已保存到 /tmp/progress.json
练习题:
1. 修改无进度追踪的代码,让每个 Agent 随机选择功能,统计重复完成同一功能的概率。
2. 在进度追踪版本中添加"失败记录"——当功能验证失败时,记录失败原因到 fails 数组。
1.2 创建 Harness——初始化代理模式
概念讲解:
Harness(束具)是 Anthropic 提出的核心设计模式。它由两部分组成:
- 初始化代理(Initializer Agent):只在第一个上下文窗口运行,负责"铺路"——创建项目环境、分解任务为功能清单、建立进度追踪文件。
- 编码代理(Coding Agent):在每个后续窗口运行,读取进度文件和功能清单,选择下一个未完成功能,增量推进。
核心设计决策: - 功能清单使用 JSON 格式(而非 Markdown),因为模型更不容易意外修改 JSON 的结构 - 使用 Git 提交作为不可篡改的工作记录 - 每个 Agent 会话只处理 一个功能(增量推进)
代码示例:
# 基于 Anthropic Harness 模式
# 文件名:02-harness-init.py
# 实现:初始化代理的完整工作流程
import json
import os
import subprocess
from datetime import datetime
class InitializerAgent:
"""初始化代理——负责项目环境搭建和任务分解"""
def __init__(self, project_dir):
self.project_dir = project_dir
self.feature_list_path = os.path.join(project_dir, "feature_list.json")
self.progress_path = os.path.join(project_dir, "agent_progress.txt")
def setup_environment(self):
"""步骤 1:创建项目环境"""
print("=== 步骤 1:创建项目环境 ===")
os.makedirs(self.project_dir, exist_ok=True)
print(f" ✓ 创建项目目录: {self.project_dir}")
# 创建 init.sh 脚本
init_script = """#!/bin/bash
# 项目初始化脚本
echo "初始化项目环境..."
# 安装依赖(示例)
# pip install -r requirements.txt
# 启动开发服务器(示例)
# python manage.py runserver &
echo "环境初始化完成"
"""
init_path = os.path.join(self.project_dir, "init.sh")
with open(init_path, "w") as f:
f.write(init_script)
os.chmod(init_path, 0o755)
print(f" ✓ 创建 init.sh")
def create_feature_list(self, task_description):
"""步骤 2:创建结构化功能清单"""
print("\n=== 步骤 2:创建功能清单 ===")
# 在实际使用中,这里由 LLM 根据任务描述生成功能列表
# 教程中使用预定义的示例
features = [
{
"id": 1,
"name": "项目脚手架",
"description": "创建基本的项目结构和配置文件",
"priority": 1,
"passes": False,
"fails": [],
},
{
"id": 2,
"name": "数据模型定义",
"description": "定义核心数据模型和数据库 Schema",
"priority": 2,
"passes": False,
"fails": [],
},
{
"id": 3,
"name": "API 端点实现",
"description": "实现 RESTful API 的核心端点",
"priority": 3,
"passes": False,
"fails": [],
},
{
"id": 4,
"name": "用户认证",
"description": "实现 JWT 认证和权限控制",
"priority": 4,
"passes": False,
"fails": [],
},
{
"id": 5,
"name": "前端页面",
"description": "实现核心前端页面和交互",
"priority": 5,
"passes": False,
"fails": [],
},
{
"id": 6,
"name": "端到端测试",
"description": "编写覆盖核心功能的 E2E 测试",
"priority": 6,
"passes": False,
"fails": [],
},
]
feature_list = {
"project": task_description,
"created_at": datetime.now().isoformat(),
"features": features,
"total": len(features),
"completed": 0,
}
with open(self.feature_list_path, "w") as f:
json.dump(feature_list, f, indent=2, ensure_ascii=False)
print(f" ✓ 创建功能清单: {len(features)} 个功能")
for feat in features:
print(f" #{feat['id']} {feat['name']} (优先级: {feat['priority']})")
def create_progress_file(self):
"""步骤 3:创建进度追踪文件"""
print("\n=== 步骤 3:创建进度文件 ===")
initial_progress = f"""# 项目进度日志
# 每个 Agent 会话结束时更新此文件
# 下一个 Agent 会话开始时阅读此文件了解当前状态
## 最后更新: {datetime.now().isoformat()}
## 已完成功能: 0/6
### 当前状态
项目刚刚初始化,尚未开始任何功能开发。
### 下一步
开始实现功能 #1: 项目脚手架
### 已知问题
(暂无)
### 会话记录
- 会话 0 (初始化代理): 创建项目环境、功能清单和进度文件
"""
with open(self.progress_path, "w") as f:
f.write(initial_progress)
print(f" ✓ 创建进度文件: agent_progress.txt")
def initial_git_commit(self):
"""步骤 4:初始 Git 提交"""
print("\n=== 步骤 4:初始 Git 提交 ===")
# 初始化 Git 仓库
subprocess.run(["git", "init"], cwd=self.project_dir, capture_output=True)
subprocess.run(["git", "add", "."], cwd=self.project_dir, capture_output=True)
subprocess.run(
["git", "commit", "-m", "init: 初始化项目环境和功能清单"],
cwd=self.project_dir,
capture_output=True,
)
print(" ✓ Git 初始提交完成")
def run(self, task_description):
"""执行完整的初始化流程"""
print(f"🚀 初始化代理启动")
print(f"任务: {task_description}\n")
self.setup_environment()
self.create_feature_list(task_description)
self.create_progress_file()
self.initial_git_commit()
print(f"\n✅ 初始化代理完成!项目准备就绪。")
print(f" 功能清单: {self.feature_list_path}")
print(f" 进度文件: {self.progress_path}")
print(f" 下一步: 启动编码代理,从功能 #1 开始")
# 运行初始化代理
if __name__ == "__main__":
import tempfile
project_dir = os.path.join(tempfile.gettempdir(), "agent-project-lab")
agent = InitializerAgent(project_dir)
agent.run("构建一个简单的任务管理 Web 应用")
# 运行初始化代理
python3 02-harness-init.py
执行结果:
🚀 初始化代理启动
任务: 构建一个简单的任务管理 Web 应用
=== 步骤 1:创建项目环境 ===
✓ 创建项目目录: /tmp/agent-project-lab
✓ 创建 init.sh
=== 步骤 2:创建功能清单 ===
✓ 创建功能清单: 6 个功能
#1 项目脚手架 (优先级: 1)
#2 数据模型定义 (优先级: 2)
#3 API 端点实现 (优先级: 3)
#4 用户认证 (优先级: 4)
#5 前端页面 (优先级: 5)
#6 端到端测试 (优先级: 6)
=== 步骤 3:创建进度文件 ===
✓ 创建进度文件: agent_progress.txt
=== 步骤 4:初始 Git 提交 ===
✓ Git 初始提交完成
✅ 初始化代理完成!项目准备就绪。
功能清单: /tmp/agent-project-lab/feature_list.json
进度文件: /tmp/agent-project-lab/agent_progress.txt
下一步: 启动编码代理,从功能 #1 开始
练习题:
1. 修改 create_feature_list 方法,接受一个参数化的功能模板,让用户可以自定义功能列表。
2. 在 init.sh 中添加依赖检查逻辑——如果依赖已安装则跳过,否则安装。
第二部分:进阶篇
2.1 编码代理——增量推进与测试门控
概念讲解:
编码代理是 Harness 模式的工作引擎。每个上下文窗口运行一个编码代理实例,它遵循严格的工作流程:
- 恢复上下文:读取 Git 日志 + 进度文件,理解当前状态
- 选择任务:从功能清单中选择最高优先级的未完成功能
- 增量实现:只处理一个功能,不贪多
- 测试验证:通过测试验证功能正确性
- 更新进度:通过测试 → 标记 passes: true;未通过 → 记录失败原因
- 提交留痕:Git 提交 + 更新进度文件
Anthropic 的实验发现,Agent 的两个主要失败模式是: - One-shotting:试图一次性完成所有工作,导致质量下降和上下文溢出 - Premature completion:过早宣布完成,实际上功能未正确实现
测试门控是解决这两个问题的关键——每个功能必须通过端到端测试才能标记为完成。
代码示例:
# 基于 Anthropic Harness 模式
# 文件名:03-coding-agent.py
# 实现:编码代理的增量推进和测试门控
import json
import os
import subprocess
from datetime import datetime
class CodingAgent:
"""编码代理——增量推进单个功能并验证"""
def __init__(self, project_dir):
self.project_dir = project_dir
self.feature_list_path = os.path.join(project_dir, "feature_list.json")
self.progress_path = os.path.join(project_dir, "agent_progress.txt")
self.session_id = 0
def restore_context(self):
"""恢复上下文:读取 Git 日志和进度文件"""
print("=== 步骤 1:恢复上下文 ===")
# 读取 Git 日志(最近 10 条提交)
result = subprocess.run(
["git", "log", "--oneline", "-10"],
cwd=self.project_dir,
capture_output=True,
text=True,
)
if result.stdout.strip():
print(" 最近提交:")
for line in result.stdout.strip().split("\n"):
print(f" {line}")
# 读取进度文件
if os.path.exists(self.progress_path):
with open(self.progress_path, "r") as f:
progress_text = f.read()
print(f"\n 进度文件摘要:")
for line in progress_text.split("\n"):
if line.startswith("##") or line.startswith("### 当前状态"):
print(f" {line}")
# 读取功能清单
with open(self.feature_list_path, "r") as f:
self.feature_list = json.load(f)
completed = sum(1 for f in self.feature_list["features"] if f["passes"])
total = self.feature_list["total"]
print(f"\n 总体进度: {completed}/{total} 完成")
def select_next_feature(self):
"""选择下一个未完成的功能"""
print("\n=== 步骤 2:选择任务 ===")
for feature in self.feature_list["features"]:
if not feature["passes"]:
print(f" 下一个任务: #{feature['id']} {feature['name']}")
print(f" 描述: {feature['description']}")
return feature
print(" 所有功能已完成!")
return None
def implement_feature(self, feature):
"""实现单个功能(模拟)"""
print(f"\n=== 步骤 3:实现功能 #{feature['id']} ===")
print(f" 正在实现: {feature['name']}")
# 模拟编码工作
# 在实际使用中,这里由 LLM 编写代码
feature_dir = os.path.join(self.project_dir, f"feature_{feature['id']}")
os.makedirs(feature_dir, exist_ok=True)
# 创建功能实现文件
impl_code = f"""# 功能 #{feature['id']}: {feature['name']}
# {feature['description']}
def implement():
return "{feature['name']} 已实现"
"""
with open(os.path.join(feature_dir, "impl.py"), "w") as f:
f.write(impl_code)
print(f" ✓ 功能代码已编写")
def verify_feature(self, feature):
"""验证功能(测试门控)"""
print(f"\n=== 步骤 4:测试验证 ===")
# 模拟测试执行
# 在实际使用中,运行实际的测试套件
# Anthropic 使用 Puppeteer MCP 进行浏览器自动化测试
test_passed = True # 模拟:测试通过
if test_passed:
feature["passes"] = True
print(f" ✓ 测试通过: {feature['name']}")
else:
feature["fails"].append({
"reason": "测试失败原因",
"timestamp": datetime.now().isoformat(),
})
print(f" ✗ 测试失败: {feature['name']}")
return test_passed
def commit_progress(self, feature):
"""提交进度:Git 提交 + 更新进度文件"""
print(f"\n=== 步骤 5:提交进度 ===")
# 更新功能清单
completed = sum(1 for f in self.feature_list["features"] if f["passes"])
self.feature_list["completed"] = completed
with open(self.feature_list_path, "w") as f:
json.dump(self.feature_list, f, indent=2, ensure_ascii=False)
print(f" ✓ 功能清单已更新 ({completed}/{self.feature_list['total']})")
# Git 提交
subprocess.run(["git", "add", "."], cwd=self.project_dir, capture_output=True)
commit_msg = f"feat: 完成功能 #{feature['id']} - {feature['name']}"
subprocess.run(
["git", "commit", "-m", commit_msg],
cwd=self.project_dir,
capture_output=True,
)
print(f" ✓ Git 提交: {commit_msg}")
# 更新进度文件
next_feature = None
for f in self.feature_list["features"]:
if not f["passes"]:
next_feature = f
break
progress_update = f"""# 项目进度日志
## 最后更新: {datetime.now().isoformat()}
## 已完成功能: {completed}/{self.feature_list['total']}
### 当前状态
功能 #{feature['id']} ({feature['name']}) 已完成。
### 下一步
{"实现功能 #" + str(next_feature['id']) + ": " + next_feature['name'] if next_feature else "所有功能已完成!准备最终审查。"}
### 已知问题
(暂无)
### 会话记录
- 会话 {self.session_id} (编码代理): 完成功能 #{feature['id']} {feature['name']}
"""
with open(self.progress_path, "w") as f:
f.write(progress_update)
print(f" ✓ 进度文件已更新")
def run_session(self, session_id):
"""执行一个编码代理会话"""
self.session_id = session_id
print(f"\n{'='*60}")
print(f"🔄 编码代理会话 {session_id} 启动")
print(f"{'='*60}")
self.restore_context()
feature = self.select_next_feature()
if feature is None:
print("\n🎉 项目完成!所有功能已实现并验证。")
return False
self.implement_feature(feature)
if self.verify_feature(feature):
self.commit_progress(feature)
else:
print(f"\n⚠️ 功能 #{feature['id']} 测试未通过,留待下次会话修复")
print(f"\n✅ 编码代理会话 {session_id} 结束")
return True
# 模拟多个编码代理会话
if __name__ == "__main__":
import tempfile
# 使用初始化代理创建的项目目录
project_dir = os.path.join(tempfile.gettempdir(), "agent-project-lab")
if not os.path.exists(os.path.join(project_dir, "feature_list.json")):
print("错误: 请先运行 02-harness-init.py 创建项目")
exit(1)
# 模拟连续的编码代理会话
agent = CodingAgent(project_dir)
session = 1
while True:
has_more = agent.run_session(session)
if not has_more:
break
session += 1
if session > 10: # 安全限制
print("\n达到最大会话数限制")
break
# 运行编码代理(确保先运行了 02-harness-init.py)
python3 03-coding-agent.py
执行结果:
============================================================
🔄 编码代理会话 1 启动
============================================================
=== 步骤 1:恢复上下文 ===
最近提交:
abc1234 init: 初始化项目环境和功能清单
进度文件摘要:
## 最后更新: 2026-04-13T...
### 当前状态
总体进度: 0/6 完成
=== 步骤 2:选择任务 ===
下一个任务: #1 项目脚手架
描述: 创建基本的项目结构和配置文件
=== 步骤 3:实现功能 #1 ===
正在实现: 项目脚手架
✓ 功能代码已编写
=== 步骤 4:测试验证 ===
✓ 测试通过: 项目脚手架
=== 步骤 5:提交进度 ===
✓ 功能清单已更新 (1/6)
✓ Git 提交: feat: 完成功能 #1 - 项目脚手架
✓ 进度文件已更新
✅ 编码代理会话 1 结束
[... 会话 2-6 类似 ...]
============================================================
🔄 编码代理会话 7 启动
============================================================
...
所有功能已完成!
🎉 项目完成!所有功能已实现并验证。
注意事项: - 每个会话只处理一个功能——这是 Anthropic 反复强调的关键设计。让 Agent"一次只做一件事"比"一次做很多事"效果显著更好。 - 测试门控不是可选的——Anthropic 发现,使用 Puppeteer MCP 进行端到端浏览器测试后,Agent 的表现"戏剧性改善",因为它可以发现仅看代码无法发现的 Bug。 - 功能清单使用 JSON 而非 Markdown——模型倾向于"帮助性地"重写 Markdown 文件的结构,但对 JSON 更尊重原格式。
练习题:
1. 在 verify_feature 方法中添加模拟的测试失败场景(如功能 #3 的测试有 30% 概率失败),观察失败后功能清单的 fails 数组如何更新。
2. 添加 Git 回滚逻辑——当功能验证失败时,使用 git reset --soft HEAD~1 回滚该功能的代码变更。
2.2 LangGraph 检查点——框架级状态持久化
概念讲解:
LangGraph 提供了框架级的检查点机制,与 Anthropic Harness 的应用层方法互补。LangGraph 将 Agent 工作流建模为状态图(State Graph),图中的每个节点代表一个处理步骤,边代表状态转移。
检查点在"超级步骤"(Super-step)边界自动保存——一个超级步骤是图中所有当前可执行节点的并行执行。这意味着:
- 自动持久化:不需要手动保存状态,框架在每一步自动保存
- 时间旅行:可以回放到任意历史步骤,查看当时的状态
- 故障恢复:节点失败时,从上一个成功的超级步骤恢复
- Human-in-the-loop:可以在任意步骤暂停,等待人类审批
代码示例:
# 基于 LangGraph 官方文档
# 文件名:04-langgraph-checkpoint.py
# 演示:LangGraph 检查点的基本使用
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
# 定义 Agent 状态
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
current_feature: str
completed_features: list
test_results: list
# 定义节点函数
def analyze_feature(state: AgentState) -> dict:
"""分析当前功能需求"""
feature = state.get("current_feature", "未指定")
print(f" [分析节点] 分析功能需求: {feature}")
return {
"messages": [{"role": "system", "content": f"开始分析功能: {feature}"}],
}
def implement_feature(state: AgentState) -> dict:
"""实现功能"""
feature = state.get("current_feature", "未指定")
print(f" [实现节点] 实现功能: {feature}")
return {
"messages": [{"role": "assistant", "content": f"功能 {feature} 已实现"}],
}
def verify_feature(state: AgentState) -> dict:
"""验证功能"""
feature = state.get("current_feature", "未指定")
test_passed = True # 模拟测试结果
print(f" [验证节点] 验证功能: {feature} — {'通过' if test_passed else '失败'}")
completed = state.get("completed_features", [])
results = state.get("test_results", [])
if test_passed:
completed.append(feature)
results.append({"feature": feature, "status": "passed"})
else:
results.append({"feature": feature, "status": "failed"})
return {
"completed_features": completed,
"test_results": results,
}
def should_continue(state: AgentState) -> str:
"""决定是否继续处理下一个功能"""
completed = len(state.get("completed_features", []))
print(f" [路由] 已完成 {completed} 个功能")
return "continue" if completed < 3 else "done"
# 构建状态图
def build_agent_graph():
"""构建带检查点的 Agent 工作流图"""
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("analyze", analyze_feature)
graph.add_node("implement", implement_feature)
graph.add_node("verify", verify_feature)
# 设置边
graph.add_edge(START, "analyze")
graph.add_edge("analyze", "implement")
graph.add_edge("implement", "verify")
# 条件边:验证后决定继续还是结束
graph.add_conditional_edges("verify", should_continue, {
"continue": "analyze",
"done": END,
})
# 使用内存检查点(生产环境使用 PostgresSaver)
checkpointer = MemorySaver()
# 编译图
app = graph.compile(checkpointer=checkpointer)
return app, checkpointer
# 运行演示
if __name__ == "__main__":
print("=" * 60)
print("LangGraph 检查点演示")
print("=" * 60)
app, checkpointer = build_agent_graph()
# 模拟三个功能的处理
features = ["用户登录", "用户注册", "密码重置"]
config = {"configurable": {"thread_id": "feature-task-001"}}
for i, feature in enumerate(features):
print(f"\n--- 处理功能 {i+1}/{len(features)}: {feature} ---")
result = app.invoke(
{
"messages": [],
"current_feature": feature,
"completed_features": [],
"test_results": [],
},
config,
)
print(f" 状态: 已完成 {len(result['completed_features'])} 个功能")
print(f" 完成列表: {result['completed_features']}")
# 查看检查点历史(时间旅行)
print(f"\n{'='*60}")
print("检查点历史(时间旅行)")
print(f"{'='*60}")
# 获取检查点列表
state_history = list(app.get_state_history(config))
print(f" 总检查点数: {len(state_history)}")
for idx, state in enumerate(state_history[:5]):
step = state.metadata.get("step", "?")
created = state.created_at if hasattr(state, "created_at") else "?"
print(f" 检查点 #{idx}: 步骤 {step}, 下一个节点: {state.next}")
# 运行 LangGraph 检查点演示
python3 04-langgraph-checkpoint.py
执行结果:
============================================================
LangGraph 检查点演示
============================================================
--- 处理功能 1/3: 用户登录 ---
[分析节点] 分析功能需求: 用户登录
[实现节点] 实现功能: 用户登录
[验证节点] 验证功能: 用户登录 — 通过
[路由] 已完成 1 个功能
状态: 已完成 1 个功能
完成列表: ['用户登录']
--- 处理功能 2/3: 用户注册 ---
[分析节点] 分析功能需求: 用户注册
[实现节点] 实现功能: 用户注册
[验证节点] 验证功能: 用户注册 — 通过
[路由] 已完成 2 个功能
状态: 已完成 2 个功能
完成列表: ['用户登录', '用户注册']
--- 处理功能 3/3: 密码重置 ---
[分析节点] 分析功能需求: 密码重置
[实现节点] 实现功能: 密码重置
[验证节点] 验证功能: 密码重置 — 通过
[路由] 已完成 3 个功能
状态: 已完成 3 个功能
完成列表: ['用户登录', '用户注册', '密码重置']
============================================================
检查点历史(时间旅行)
============================================================
总检查点数: 15
检查点 #0: 步骤 9, 下一个节点: ()
检查点 #1: 步骤 8, 下一个节点: ('verify',)
检查点 #2: 步骤 7, 下一个节点: ('implement',)
...
注意事项:
- MemorySaver 仅用于实验。生产环境必须使用 PostgresSaver 或 SqliteSaver,否则进程重启后检查点丢失。
- 检查点在每次 invoke 调用后自动保存,不需要手动触发。
- 使用相同的 thread_id 调用 invoke 会自动从上一个检查点恢复状态。
练习题:
1. 将 MemorySaver 替换为 SqliteSaver,实现持久化到 SQLite 数据库的检查点。
2. 使用 app.get_state_history(config) 找到步骤 2 的检查点,然后用 app.update_state(config, {...) 修改该步骤的状态,观察后续行为变化。
第三部分:高级篇
3.1 多 Agent 编排模式与故障恢复
概念讲解:
当多个 Agent 协作完成复杂任务时,编排模式的选择直接影响系统的可靠性。Azure 架构中心定义了五种编排模式:
| 模式 | 描述 | 失败风险 |
|---|---|---|
| 顺序流水线 | Agent 按固定顺序执行 | 早期失败传播到所有后续阶段 |
| 并行执行 | 多个 Agent 同时独立工作 | 结果冲突需要仲裁 |
| 群聊 | 多个 Agent 共享对话线程 | 对话循环(建议不超过 3 个 Agent) |
| 动态委托 | Agent 自主决定何时交接 | 无限交接循环 |
| 管理式 | 管理者 Agent 分配任务 | 收敛慢,目标模糊时停滞 |
三大故障恢复策略: 1. 检查点回滚(Checkpoint Rollback):恢复到上一个已知良好状态 2. 补偿模式(Compensation Pattern):执行反向操作撤销失败的操作 3. 熔断器(Circuit Breaker):连续失败时暂停请求,防止级联故障
代码示例:
# 基于 Azure 架构中心 AI Agent 设计模式
# 文件名:05-multi-agent-recovery.py
# 演示:多 Agent 编排的故障恢复策略
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AgentResult:
"""Agent 执行结果"""
agent_name: str
success: bool
output: Optional[str] = None
error: Optional[str] = None
class CircuitBreaker:
"""熔断器——防止级联故障"""
def __init__(self, failure_threshold=3, recovery_timeout=10):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = "closed" # closed, open, half_open
self.last_failure_time = 0
def call(self, func, *args, **kwargs):
"""通过熔断器调用函数"""
if self.state == "open":
# 检查是否到了恢复时间
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open"
print(f" [熔断器] 进入半开状态,尝试恢复...")
else:
print(f" [熔断器] 熔断中,拒绝请求")
return AgentResult(
agent_name="circuit_breaker",
success=False,
error="熔断器开启,请求被拒绝"
)
try:
result = func(*args, **kwargs)
if result.success:
self.failure_count = 0
self.state = "closed"
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
print(f" [熔断器] 连续失败 {self.failure_count} 次,熔断开启")
return AgentResult(
agent_name="unknown",
success=False,
error=str(e)
)
class SequentialOrchestrator:
"""顺序流水线编排器——带检查点回滚"""
def __init__(self):
self.checkpoint = None # 保存最近的成功状态
self.completed_steps = []
def execute_pipeline(self, agents, initial_input):
"""执行顺序流水线,带故障恢复"""
print("=== 顺序流水线执行 ===\n")
current_input = initial_input
for i, agent_func in enumerate(agents):
agent_name = f"Agent-{i+1}"
print(f"步骤 {i+1}: 执行 {agent_name}")
result = agent_func(agent_name, current_input)
if result.success:
# 保存检查点
self.checkpoint = {
"step": i,
"input": current_input,
"output": result.output,
}
self.completed_steps.append(agent_name)
current_input = result.output
print(f" ✓ {agent_name} 成功: {result.output[:50]}\n")
else:
print(f" ✗ {agent_name} 失败: {result.error}")
print(f" 恢复策略: 检查点回滚到步骤 {self.checkpoint['step'] + 1 if self.checkpoint else 0}")
# 尝试从检查点恢复
if self.checkpoint:
print(f" 从检查点恢复: {self.checkpoint['output'][:50]}")
current_input = self.checkpoint["output"]
# 重试当前步骤
print(f" 重试 {agent_name}...")
result = agent_func(agent_name, current_input)
if result.success:
self.checkpoint = {
"step": i,
"input": current_input,
"output": result.output,
}
self.completed_steps.append(agent_name)
current_input = result.output
print(f" ✓ {agent_name} 重试成功\n")
else:
print(f" ✗ {agent_name} 重试仍失败,流水线中止")
break
else:
print(f" 无可用检查点,流水线中止")
break
print(f"\n最终结果: 完成 {len(self.completed_steps)}/{len(agents)} 步骤")
return self.completed_steps
# 模拟 Agent 函数
def research_agent(name, input_data):
"""研究 Agent"""
return AgentResult(agent_name=name, success=True, output=f"研究报告: {input_data}")
def coding_agent(name, input_data):
"""编码 Agent"""
return AgentResult(agent_name=name, success=True, output=f"代码实现: {input_data}")
def testing_agent_fail(name, input_data):
"""测试 Agent(模拟失败)"""
return AgentResult(agent_name=name, success=False, error="测试超时")
def testing_agent_success(name, input_data):
"""测试 Agent(成功)"""
return AgentResult(agent_name=name, success=True, output=f"测试通过: {input_data}")
# 运行演示
if __name__ == "__main__":
# 演示 1:顺序流水线 + 检查点回滚
print("=" * 60)
print("演示 1: 顺序流水线 + 检查点回滚")
print("=" * 60)
orchestrator = SequentialOrchestrator()
agents = [research_agent, coding_agent, testing_agent_fail, testing_agent_success]
orchestrator.execute_pipeline(agents, "构建用户管理模块")
# 演示 2:熔断器
print("\n" + "=" * 60)
print("演示 2: 熔断器模式")
print("=" * 60)
breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1)
for i in range(5):
print(f"\n尝试 {i+1}:")
result = breaker.call(
testing_agent_fail, "test-agent", "测试输入"
)
status = "成功" if result.success else f"失败 ({result.error})"
print(f" 结果: {status}, 熔断器状态: {breaker.state}")
# 运行多 Agent 编排演示
python3 05-multi-agent-recovery.py
执行结果:
============================================================
演示 1: 顺序流水线 + 检查点回滚
============================================================
=== 顺序流水线执行 ===
步骤 1: 执行 Agent-1
✓ Agent-1 成功: 研究报告: 构建用户管理模块
步骤 2: 执行 Agent-2
✓ Agent-2 成功: 代码实现: 研究报告: 构建用户管理模块
步骤 3: 执行 Agent-3
✗ Agent-3 失败: 测试超时
恢复策略: 检查点回滚到步骤 2
从检查点恢复: 代码实现: 研究报告: 构建用户管理模块
重试 Agent-3...
✗ Agent-3 重试仍失败,流水线中止
最终结果: 完成 2/4 步骤
============================================================
演示 2: 熔断器模式
============================================================
尝试 1:
[熔断器] 连续失败 1 次
结果: 失败 (测试超时), 熔断器状态: closed
尝试 2:
[熔断器] 连续失败 2 次,熔断开启
结果: 失败 (测试超时), 熔断器状态: open
尝试 3:
[熔断器] 熔断中,拒绝请求
结果: 失败 (熔断器开启,请求被拒绝), 熔断器状态: open
尝试 4:
[熔断器] 熔断中,拒绝请求
结果: 失败 (熔断器开启,请求被拒绝), 熔断器状态: open
尝试 5:
[熔断器] 进入半开状态,尝试恢复...
[熔断器] 连续失败 3 次,熔断开启
结果: 失败 (测试超时), 熔断器状态: open
3.2 性能优化
- 优化策略 1:减少检查点频率 — LangGraph 默认在每个超级步骤保存检查点。对于不需要细粒度恢复的工作流,可以通过自定义 Checkpointer 实现间隔保存。但注意:降低检查点频率会增加故障时的数据丢失范围。
# 基于性能优化最佳实践
# 间隔保存检查点的策略
class IntervalCheckpointer:
"""间隔检查点策略——每 N 步保存一次"""
def __init__(self, base_checkpointer, save_interval=5):
self.base = base_checkpointer
self.save_interval = save_interval
self.step_count = 0
def maybe_save(self, state):
"""根据间隔策略决定是否保存"""
self.step_count += 1
if self.step_count % self.save_interval == 0:
# 实际保存检查点
print(f" [检查点] 步骤 {self.step_count},保存检查点")
return True
return False
- 优化策略 2:异步进度更新 — 在 Harness 模式中,进度文件和 Git 提交是同步操作,会阻塞 Agent 的工作流。对于大型项目,可以将进度更新放入后台任务。
# 基于异步编排最佳实践
import threading
class AsyncProgressTracker:
"""异步进度追踪器"""
def __init__(self, progress_file):
self.progress_file = progress_file
self.queue = []
self.lock = threading.Lock()
def update(self, feature_id, status):
"""非阻塞地更新进度"""
with self.lock:
self.queue.append({"feature_id": feature_id, "status": status})
def flush(self):
"""批量写入进度到文件"""
with self.lock:
updates = self.queue.copy()
self.queue.clear()
# 批量更新文件
print(f" [异步] 批量写入 {len(updates)} 条进度更新")
3.3 最佳实践
-
功能清单使用 JSON 格式:Anthropic 实验表明,模型更倾向于"帮助性地"重写 Markdown 文件的结构,但对 JSON 格式的修改行为更可预测。使用 JSON 减少意外数据损坏的风险。
-
每个会话只做一个功能:这是最反直觉但最有效的实践。Agent 尝试一次完成多个功能时,质量显著下降。强制单功能增量推进虽然看似低效,但整体完成率更高。
-
端到端测试作为完成门控:不要仅依赖代码审查来判断功能是否完成。使用自动化测试(如 Puppeteer MCP 进行浏览器测试)进行端到端验证。Anthropic 报告这种做法带来了"戏剧性的改善"。
-
设置超时和重试:为每个 Agent 操作设置合理的超时时间。Azure 建议将超时校准到 95 百分位响应时间,而非平均值。连续失败时使用指数退避重试。
-
监控恢复频率:如果 Agent 频繁触发检查点恢复,说明更深层的问题(如提示词设计不佳或任务分解不合理)。设置恢复频率告警,超过阈值时触发根因分析。
第四部分:实战项目
项目需求
构建一个 "长任务 Harness 框架"(Long-Running Task Harness Framework)——一个 Python 工具包,实现了 Anthropic Harness 模式的完整工作流,支持初始化代理、编码代理、功能清单管理、Git 集成和故障恢复。项目综合运用以下知识点:
- Harness 模式(1.1 + 1.2):初始化代理 + 编码代理的双阶段架构
- 功能清单驱动(1.2):JSON 格式的功能清单和增量进度追踪
- LangGraph 检查点(2.2):框架级状态持久化
- 故障恢复(3.1):检查点回滚 + 熔断器
- 性能优化(3.2):异步进度更新
项目设计
架构设计:
long-task-harness/
├── harness/
│ ├── __init__.py
│ ├── initializer.py # 初始化代理
│ ├── coder.py # 编码代理
│ ├── feature_list.py # 功能清单管理
│ ├── progress.py # 进度追踪(异步)
│ └── recovery.py # 故障恢复(检查点回滚 + 熔断器)
├── run.py # 主入口
└── examples/ # 示例任务
执行流程:
1. 初始化代理 → 创建功能清单 + 进度文件 + Git 初始提交
2. 编码代理循环 → 恢复上下文 → 选择功能 → 实现 → 测试 → 提交
3. 故障恢复 → 检查点回滚 / 熔断器保护
完整实现代码
# 基于 Anthropic Harness 模式和 LangGraph 检查点
# 文件名:long-task-harness.py
# 长任务 Harness 框架 — 综合运用 5 个知识点
import json
import os
import subprocess
import threading
import time
from datetime import datetime
from typing import Optional
# ============================================================
# 知识点 1 & 2:功能清单管理(Harness 核心数据结构)
# ============================================================
class FeatureList:
"""功能清单管理器——JSON 格式,增量追踪"""
def __init__(self, file_path):
self.file_path = file_path
self.data = None
def create(self, project_name, features):
"""创建初始功能清单"""
self.data = {
"project": project_name,
"created_at": datetime.now().isoformat(),
"features": [
{
"id": i + 1,
"name": f["name"],
"description": f["description"],
"priority": f.get("priority", i + 1),
"passes": False,
"fails": [],
}
for i, f in enumerate(features)
],
"total": len(features),
"completed": 0,
}
self._save()
print(f" ✓ 功能清单已创建: {len(features)} 个功能")
def load(self):
"""加载已有功能清单"""
with open(self.file_path, "r") as f:
self.data = json.load(f)
return self.data
def get_next(self):
"""获取下一个未完成的功能"""
if not self.data:
self.load()
for feature in self.data["features"]:
if not feature["passes"]:
return feature
return None
def mark_pass(self, feature_id):
"""标记功能通过"""
for feature in self.data["features"]:
if feature["id"] == feature_id:
feature["passes"] = True
break
self.data["completed"] = sum(
1 for f in self.data["features"] if f["passes"]
)
self._save()
def mark_fail(self, feature_id, reason):
"""标记功能失败并记录原因"""
for feature in self.data["features"]:
if feature["id"] == feature_id:
feature["fails"].append({
"reason": reason,
"timestamp": datetime.now().isoformat(),
})
break
self._save()
def get_progress(self):
"""获取进度摘要"""
if not self.data:
self.load()
completed = self.data["completed"]
total = self.data["total"]
return f"{completed}/{total} ({completed/total*100:.1f}%)"
def _save(self):
"""保存到文件"""
with open(self.file_path, "w") as f:
json.dump(self.data, f, indent=2, ensure_ascii=False)
# ============================================================
# 知识点 5:异步进度追踪(性能优化)
# ============================================================
class AsyncProgressTracker:
"""异步进度追踪器"""
def __init__(self, progress_file):
self.progress_file = progress_file
self._queue = []
self._lock = threading.Lock()
def log(self, message):
"""非阻塞地记录进度"""
timestamp = datetime.now().strftime("%H:%M:%S")
with self._lock:
self._queue.append(f"[{timestamp}] {message}")
def flush(self):
"""批量写入进度到文件"""
with self._lock:
updates = self._queue.copy()
self._queue.clear()
if updates:
with open(self.progress_file, "a") as f:
f.write("\n".join(updates) + "\n")
def read_recent(self, n=10):
"""读取最近 n 条进度记录"""
if not os.path.exists(self.progress_file):
return []
with open(self.progress_file, "r") as f:
lines = f.read().strip().split("\n")
return lines[-n:]
# ============================================================
# 知识点 4:故障恢复(熔断器 + 检查点回滚)
# ============================================================
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold=3, recovery_timeout=5):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = "closed"
self.last_failure_time = 0
def can_execute(self):
"""检查是否可以执行"""
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open"
return True
return False
return True # half_open
def record_success(self):
"""记录成功"""
self.failure_count = 0
self.state = "closed"
def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
class CheckpointManager:
"""Git 检查点管理器"""
def __init__(self, project_dir):
self.project_dir = project_dir
def save(self, message):
"""保存检查点(Git 提交)"""
subprocess.run(
["git", "add", "."],
cwd=self.project_dir,
capture_output=True,
)
result = subprocess.run(
["git", "commit", "-m", message],
cwd=self.project_dir,
capture_output=True,
)
return result.returncode == 0
def rollback(self):
"""回滚到上一个检查点"""
result = subprocess.run(
["git", "reset", "--hard", "HEAD~1"],
cwd=self.project_dir,
capture_output=True,
)
return result.returncode == 0
# ============================================================
# 知识点 1 & 3:初始化代理 + 编码代理
# ============================================================
class LongTaskHarness:
"""长任务 Harness 框架——整合所有组件"""
def __init__(self, project_dir, project_name):
self.project_dir = project_dir
self.project_name = project_name
os.makedirs(project_dir, exist_ok=True)
self.feature_list = FeatureList(
os.path.join(project_dir, "feature_list.json")
)
self.progress = AsyncProgressTracker(
os.path.join(project_dir, "progress.log")
)
self.checkpoint = CheckpointManager(project_dir)
self.breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5)
def initialize(self, features):
"""初始化代理:创建项目环境"""
print(f"🚀 初始化代理启动")
print(f"项目: {self.project_name}\n")
# 步骤 1:创建 init.sh
init_script = "#!/bin/bash\necho '项目初始化完成'\n"
init_path = os.path.join(self.project_dir, "init.sh")
with open(init_path, "w") as f:
f.write(init_script)
os.chmod(init_path, 0o755)
print(" ✓ 创建 init.sh")
# 步骤 2:创建功能清单
self.feature_list.create(self.project_name, features)
# 步骤 3:Git 初始化
subprocess.run(["git", "init"], cwd=self.project_dir, capture_output=True)
self.checkpoint.save("init: 初始化项目环境和功能清单")
print(" ✓ Git 初始提交")
self.progress.log("初始化代理完成项目设置")
self.progress.flush()
print(f"\n✅ 初始化完成!功能清单: {self.feature_list.get_progress()}")
def run_coding_session(self, session_id):
"""编码代理:执行一个增量会话"""
print(f"\n{'='*50}")
print(f"🔄 编码代理会话 {session_id}")
print(f"{'='*50}")
# 步骤 1:恢复上下文
self.feature_list.load()
print(f" 进度: {self.feature_list.get_progress()}")
recent = self.progress.read_recent(3)
if recent:
print(f" 最近日志:")
for line in recent:
print(f" {line}")
# 步骤 2:选择下一个功能
feature = self.feature_list.get_next()
if feature is None:
print(" 🎉 所有功能已完成!")
return False
print(f" 目标: #{feature['id']} {feature['name']}")
# 步骤 3:实现功能(带熔断器保护)
if not self.breaker.can_execute():
print(f" ⚠️ 熔断器开启,跳过本会话")
return True
# 模拟功能实现
success = self._implement_feature(feature)
if success:
self.breaker.record_success()
self.feature_list.mark_pass(feature["id"])
self.checkpoint.save(
f"feat: 完成功能 #{feature['id']} - {feature['name']}"
)
self.progress.log(
f"会话 {session_id}: 完成 #{feature['id']} {feature['name']}"
)
print(f" ✓ 功能 #{feature['id']} 完成")
else:
self.breaker.record_failure()
self.feature_list.mark_fail(feature["id"], "测试未通过")
self.checkpoint.rollback()
self.progress.log(
f"会话 {session_id}: 功能 #{feature['id']} 失败,已回滚"
)
print(f" ✗ 功能 #{feature['id']} 失败,已回滚")
self.progress.flush()
return True
def _implement_feature(self, feature):
"""模拟功能实现和测试(实际使用中由 LLM 执行)"""
# 创建功能文件
feature_file = os.path.join(
self.project_dir, f"feature_{feature['id']}.txt"
)
with open(feature_file, "w") as f:
f.write(f"功能: {feature['name']}\n描述: {feature['description']}\n")
# 模拟测试(90% 通过率)
import random
return random.random() < 0.9
def run(self, features, max_sessions=20):
"""运行完整的 Harness 工作流"""
# 初始化阶段
self.initialize(features)
# 编码阶段
session = 1
while session <= max_sessions:
has_more = self.run_coding_session(session)
if not has_more:
break
session += 1
# 最终报告
self.feature_list.load()
print(f"\n{'='*50}")
print(f"📊 项目报告")
print(f"{'='*50}")
print(f" 项目: {self.project_name}")
print(f" 进度: {self.feature_list.get_progress()}")
print(f" 会话数: {session}")
fails = [
f for f in self.feature_list.data["features"]
if f["fails"]
]
if fails:
print(f" 失败记录:")
for f in fails:
print(f" #{f['id']} {f['name']}: {len(f['fails'])} 次失败")
# ============================================================
# 入口
# ============================================================
if __name__ == "__main__":
import tempfile
project_dir = os.path.join(tempfile.gettempdir(), "harness-demo")
# 清理旧数据
import shutil
if os.path.exists(project_dir):
shutil.rmtree(project_dir)
# 定义任务
features = [
{"name": "项目脚手架", "description": "创建项目结构和配置"},
{"name": "数据模型", "description": "定义核心数据模型"},
{"name": "API 端点", "description": "实现 RESTful API"},
{"name": "认证系统", "description": "JWT 认证和权限"},
{"name": "前端页面", "description": "核心前端交互"},
{"name": "E2E 测试", "description": "端到端自动化测试"},
]
# 运行 Harness
harness = LongTaskHarness(project_dir, "任务管理应用")
harness.run(features)
代码解析
| 代码段 | 使用的知识点 | 说明 |
|---|---|---|
FeatureList 类 |
功能清单驱动(1.2) | JSON 格式的功能清单管理,支持增量标记通过/失败 |
AsyncProgressTracker 类 |
性能优化(3.2) | 非阻塞的进度记录,批量写入减少 I/O 开销 |
CircuitBreaker 类 |
故障恢复(3.1) | 熔断器模式,防止级联故障 |
CheckpointManager 类 |
故障恢复(3.1) | 基于 Git 的检查点回滚 |
LongTaskHarness.initialize() |
初始化代理(1.1) | 第一个会话的环境搭建和任务分解 |
LongTaskHarness.run_coding_session() |
编码代理(1.2) | 增量推进会话,单功能推进 + 测试门控 |
扩展挑战
- 添加 LangGraph 集成:在 Harness 框架中集成 LangGraph 检查点,实现框架级的状态持久化。当 Harness 的进度文件损坏时,可以从 LangGraph 检查点恢复。
- 添加分布式支持:将功能清单存储在 Redis 或 PostgreSQL 中,支持多个编码代理并行处理不同功能,实现多 Agent 并行 Harness。
- 添加实时监控仪表板:使用 Web 框架(如 Flask/FastAPI)暴露进度 API,实时展示功能完成率、会话历史、失败分析等指标。
第五部分:常见问题与排查指南
常见错误及解决方案
| 错误信息 | 原因 | 解决方案 |
|---|---|---|
feature_list.json 被意外清空 |
Agent 在会话中重写了功能清单而非更新 | 使用 Git 回滚到上一个有效版本:git checkout HEAD -- feature_list.json |
| Agent 重复完成已通过的功能 | 进度文件未正确更新,或 Agent 未读取进度 | 在编码代理的提示词中强调"首先读取 feature_list.json,只处理 passes 为 false 的功能" |
| 功能全部标记为 passes 但测试失败 | Agent 跳过了测试门控,自行标记通过 | 在功能清单中添加"verified_at"字段,要求 Agent 附上测试执行日志作为验证证据 |
| 编码代理陷入循环(反复尝试同一功能) | 功能本身不可行,但 Agent 没有放弃机制 | 设置最大失败次数(如 3 次),超过后标记为"blocked"并跳过,等待人工介入 |
| Git 提交冲突(多个 Agent 同时提交) | 并行编码代理修改了相同文件 | 使用文件级锁或功能级隔离,确保每个 Agent 只修改自己功能的文件 |
| LangGraph 检查点恢复后状态不一致 | 检查点是在部分节点完成后保存的 | 使用 LangGraph 的 pending writes 机制,确保恢复时重放未完成的写入 |
| CRIU 检查点体积过大 | 进程内存占用高(如加载了大模型) | 使用应用级检查点替代进程级检查点;或在检查点前释放不必要的缓存 |
调试技巧
-
查看功能清单的 Git 变更历史:
git log --oneline -- feature_list.json可以查看功能清单的每次变更。如果发现某次提交异常修改了清单结构,可以回滚到该提交之前的版本。 -
使用 LangGraph 时间旅行调试:通过
app.get_state_history(config)获取所有历史检查点,逐一检查状态变化。特别关注next字段(下一步应执行的节点)和values字段(当前状态),定位状态分歧发生的位置。 -
设置进度文件监控:使用
watch命令或文件系统监控工具实时观察进度文件的更新:watch -n 5 cat agent_progress.txt。如果进度文件长时间不更新,可能表示 Agent 卡住或异常终止。
第六部分:学习路线推荐
官方文档推荐阅读顺序
- Anthropic 工程博客 "Effective Harnesses for Long-Running Agents" — 理解 Harness 模式的核心概念。重点:初始化代理 vs 编码代理、功能清单设计、失败模式分析表。
- LangGraph Persistence 文档 — 学习框架级检查点机制。重点:超级步骤、StateSnapshot 结构、检查点后端选择。
- Eunomia.dev C/R 系统综述 — 深入理解检查点技术的全貌。重点:五层 C/R 架构、有状态/无状态恢复权衡、CRIUgpu。
- Azure 架构中心 AI Agent 设计模式 — 多 Agent 编排模式。重点:五种编排模式、恢复策略、可靠性最佳实践。
- CRIUgpu 论文 (arXiv 2502.16631) — GPU 检查点的技术细节。重点:CUDA 插件架构、性能基准测试数据。
推荐进阶资源
- Anthropic Engineering Blog — AI Agent 工程实践的一手资料,包含 Harness 设计的详细案例和失败模式分析。
- LangGraph 官方教程 — 从基础到高级的 LangGraph 学习路径,包含检查点、Human-in-the-loop、时间旅行等主题。
- [Designing Data-Intensive Applications (Martin Kleppmann)] — 分布式系统经典教材。第 9 章"Consistency and Consensus"深入讲解了分布式检查点和一致性快照的理论基础,对理解多 Agent 一致性快照非常有帮助。
信息来源与版本说明
- 教程基于: 多源实践方法论(非单一软件版本),内容基于截至 2026-04-13 的公开资料
- 信息获取日期: 2026-04-13
- 信息来源列表:
- Anthropic Engineering Blog - Effective Harnesses for Long-Running Agents — Harness 模式核心设计、功能清单格式、失败模式分析
- LangGraph Persistence Documentation — 检查点 API、StateSnapshot 结构、后端实现
- Azure Architecture Center - AI Agent Design Patterns — 五种编排模式、恢复策略
- Eunomia.dev - C/R Systems Survey — C/R 五层架构、GPU 检查点
- CRIUgpu Paper (arXiv) — GPU 检查点性能数据