AI 长任务最佳实践调研报告
调研日期: 2026-03-16
核心目标: 构建可靠的长时运行 AI Agent 系统
AI 长任务最佳实践调研报告
调研日期: 2026-03-16
核心目标: 构建可靠的长时运行 AI Agent 系统
目录
- 核心挑战
- Anthropic 官方方案
- 状态管理策略
- 错误恢复与重试机制
- 任务编排模式
- 监控与可观测性
- 工具与框架选型
- 实战操作指南
- 最佳实践清单
一、核心挑战
1.1 长任务的困境
┌─────────────────────────────────────────────────────────────────────────┐
│ 长任务核心问题 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 🧠 Context Window 限制 │
│ └── 即使有 compaction,仍无法在一个窗口内完成复杂任务 │
│ │
│ 🔌 Session 断连 │
│ └── 每个 Session 开始时无记忆,需要重建上下文 │
│ │
│ 🎯 进度追踪困难 │
│ └── Agent 不知道已完成什么,剩余什么 │
│ │
│ 🐛 中间状态不一致 │
│ └── 任务中断后,代码/数据可能处于半完成状态 │
│ │
│ ⏰ 时间跨度大 │
│ └── 任务可能跨越数小时甚至数天 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
1.2 常见失败模式
| 失败模式 |
描述 |
后果 |
| One-Shot 陷阱 |
试图一次完成所有工作 |
Context 耗尽,任务半途而废 |
| 虚假完成 |
Agent 声称任务完成,实际未完成 |
功能缺失,质量不达标 |
| 状态遗忘 |
新 Session 忘记之前的决策和进度 |
重复工作,决策冲突 |
| 测试不足 |
标记完成但未充分验证 |
隐藏 Bug,功能不可用 |
| 环境混乱 |
中断后代码/环境处于不可用状态 |
下次启动需要大量修复工作 |
二、Anthropic 官方方案
来源: Effective harnesses for long-running agents
2.1 双 Agent 架构
Anthropic 提出的解决方案包含两个专门的 Agent:
┌─────────────────────────────────────────────────────────────────────────┐
│ 双 Agent 架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 🚀 Initializer Agent (初始化 Agent) │
│ ├── 只在第一个 Session 运行 │
│ ├── 创建项目基础结构 │
│ ├── 生成 Feature List (功能清单) │
│ ├── 创建 init.sh 启动脚本 │
│ ├── 创建 claude-progress.txt 进度文件 │
│ └── 提交初始 Git Commit │
│ │
│ 🔧 Coding Agent (编码 Agent) │
│ ├── 每个 Session 开始时运行 │
│ ├── 读取进度文件和 Git 历史了解状态 │
│ ├── 选择一个未完成的功能进行开发 │
│ ├── 实现功能并充分测试 │
│ ├── 更新进度文件和 Feature List │
│ └── 提交 Git Commit 记录变更 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
2.2 关键组件
Feature List (功能清单)
用 JSON 格式记录所有功能及其状态:
{
"category": "functional",
"description": "New chat button creates a fresh conversation",
"steps": [
"Navigate to main interface",
"Click the 'New Chat' button",
"Verify a new conversation is created",
"Check that chat area shows welcome state",
"Verify conversation appears in sidebar"
],
"passes": false
}
为什么用 JSON 而非 Markdown?
- 模型更不容易随意修改 JSON 文件
- 结构化数据更易于程序化处理
- 减少误操作删除内容的风险
Progress File (进度文件)
claude-progress.txt 记录每个 Session 的工作摘要:
# Progress Log
## Session 2026-03-16 10:30
- Implemented user authentication
- Added login/logout functionality
- Fixed session management bug
- Status: Auth module complete
## Session 2026-03-16 14:00
- Started work on chat interface
- Created message component
- Pending: WebSocket integration
Init Script (启动脚本)
init.sh 确保环境可重复启动:
#!/bin/bash
# init.sh - 开发环境启动脚本
# 安装依赖
npm install
# 设置环境变量
export NODE_ENV=development
export DATABASE_URL=postgresql://localhost:5432/dev
# 启动开发服务器
npm run dev
# 运行基础测试
npm run test:basic
2.3 Coding Agent Session 启动流程
每个 Session 的标准启动流程:
Step 1: pwd
└── 确认当前工作目录
Step 2: 读取进度文件
└── 了解最近完成的工作
Step 3: 读取 Git Log
└── git log --oneline -20
Step 4: 读取 Feature List
└── 找出下一个未完成的功能
Step 5: 运行 init.sh
└── 启动开发服务器
Step 6: 基础功能验证
└── 使用浏览器自动化测试核心功能
Step 7: 开始开发新功能
└── 一次只做一个功能
2.4 Agent 行为规范
| 问题 |
Initializer Agent 行为 |
Coding Agent 行为 |
| 过早声明完成 |
创建详细的 Feature List |
开始时读取列表,选择一个功能 |
| 环境状态混乱 |
初始化 Git 和进度文件 |
开始时读取进度和 Git Log,结束时提交 |
| 功能标记过早完成 |
设置 Feature List |
必须充分测试后才能标记 passes: true |
| 不知道如何运行 |
编写 init.sh |
开始时读取并执行 init.sh |
三、状态管理策略
3.1 状态持久化模式
┌─────────────────────────────────────────────────────────────────────────┐
│ 状态持久化模式 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 📸 Snapshot-Based (快照模式) │
│ ├── 定期保存完整状态快照 │
│ ├── 恢复时从最近的快照开始 │
│ └── 适合: 有明确检查点的任务 │
│ │
│ 📝 Log-Based (日志模式) │
│ ├── 记录所有状态变更事件 │
│ ├── 通过重放日志重建状态 │
│ └── 适合: 需要完整审计追踪的场景 │
│ │
│ 🔄 Hybrid (混合模式) │
│ ├── 定期快照 + 增量日志 │
│ ├── 快速恢复 + 完整历史 │
│ └── 适合: 大多数生产场景 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
3.2 状态数据结构
# 推荐的状态数据结构
class AgentState:
"""Agent 状态结构"""
# 1. 任务元信息
task_id: str # 任务唯一标识
task_goal: str # 任务目标描述
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
# 2. 执行状态
status: str # pending | running | paused | completed | failed
current_step: int # 当前步骤编号
total_steps: int # 总步骤数
# 3. 进度追踪
completed_steps: list # 已完成步骤列表
pending_steps: list # 待完成步骤列表
failed_steps: list # 失败步骤列表
# 4. 上下文数据
context: dict # 任务上下文数据
decisions: list # 决策历史
artifacts: dict # 产出物引用
# 5. 错误处理
errors: list # 错误历史
retry_count: int # 重试次数
last_checkpoint: datetime # 最后检查点时间
3.3 状态存储方案
| 存储方案 |
适用场景 |
优点 |
缺点 |
| 文件系统 |
单机开发 |
简单、无依赖 |
不支持分布式 |
| SQLite |
中等规模 |
结构化查询 |
单机限制 |
| PostgreSQL |
生产环境 |
可靠、可扩展 |
需要运维 |
| Redis |
高频更新 |
快速、支持 TTL |
需要持久化配置 |
| 云存储 (S3) |
大文件/备份 |
无限容量 |
延迟较高 |
3.4 Checkpointing 最佳实践
class CheckpointManager:
"""检查点管理器"""
def __init__(self, checkpoint_dir: str, interval: int = 300):
self.checkpoint_dir = checkpoint_dir
self.interval = interval # 秒
self.last_checkpoint = None
def should_checkpoint(self) -> bool:
"""判断是否需要创建检查点"""
if self.last_checkpoint is None:
return True
elapsed = time.time() - self.last_checkpoint
return elapsed >= self.interval
def save_checkpoint(self, state: AgentState) -> str:
"""保存检查点"""
checkpoint_id = f"ckpt_{int(time.time())}"
checkpoint_path = f"{self.checkpoint_dir}/{checkpoint_id}.json"
with open(checkpoint_path, 'w') as f:
json.dump(state.to_dict(), f)
self.last_checkpoint = time.time()
return checkpoint_id
def load_latest_checkpoint(self) -> AgentState:
"""加载最新检查点"""
checkpoints = sorted(Path(self.checkpoint_dir).glob("ckpt_*.json"))
if not checkpoints:
return None
with open(checkpoints[-1], 'r') as f:
return AgentState.from_dict(json.load(f))
def cleanup_old_checkpoints(self, keep: int = 5):
"""清理旧检查点"""
checkpoints = sorted(Path(self.checkpoint_dir).glob("ckpt_*.json"))
for ckpt in checkpoints[:-keep]:
ckpt.unlink()
四、错误恢复与重试机制
4.1 重试策略
指数退避 (Exponential Backoff)
import random
import time
def exponential_backoff(
attempt: int,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
) -> float:
"""
指数退避算法
Args:
attempt: 当前尝试次数 (从 0 开始)
base_delay: 基础延迟 (秒)
max_delay: 最大延迟 (秒)
jitter: 是否添加随机抖动
Returns:
等待时间 (秒)
"""
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
# 添加 0.5x - 1.5x 的随机抖动
delay = delay * (0.5 + random.random())
return delay
# 使用示例
for attempt in range(5):
try:
result = api_call()
break
except Exception as e:
if attempt == 4:
raise
wait_time = exponential_backoff(attempt)
time.sleep(wait_time)
重试装饰器
from functools import wraps
import logging
logger = logging.getLogger(__name__)
def retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exceptions: tuple = (Exception,)
):
"""
重试装饰器
Args:
max_attempts: 最大尝试次数
base_delay: 基础延迟
max_delay: 最大延迟
exceptions: 需要重试的异常类型
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts - 1:
logger.error(f"所有重试失败: {func.__name__}")
raise
wait_time = exponential_backoff(attempt, base_delay, max_delay)
logger.warning(
f"{func.__name__} 失败 (尝试 {attempt + 1}/{max_attempts}), "
f"{wait_time:.1f}秒后重试: {e}"
)
time.sleep(wait_time)
raise last_exception
return wrapper
return decorator
# 使用示例
@retry(max_attempts=5, exceptions=(ConnectionError, TimeoutError))
def call_llm_api(prompt: str) -> str:
return api_client.generate(prompt)
4.2 故障分类与处理
from enum import Enum
from dataclasses import dataclass
class ErrorType(Enum):
"""错误类型分类"""
TRANSIENT = "transient" # 暂时性错误,可重试
PERMANENT = "permanent" # 永久性错误,需要人工干预
RESOURCE = "resource" # 资源限制,需要等待
LOGIC = "logic" # 逻辑错误,需要修改代码
@dataclass
class ErrorContext:
"""错误上下文"""
error_type: ErrorType
message: str
recoverable: bool
suggested_action: str
retry_after: float | None = None
def classify_error(error: Exception) -> ErrorContext:
"""错误分类器"""
error_str = str(error).lower()
# Rate Limit
if "rate limit" in error_str or "429" in error_str:
return ErrorContext(
error_type=ErrorType.RESOURCE,
message=str(error),
recoverable=True,
suggested_action="等待后重试",
retry_after=60.0
)
# Timeout
if "timeout" in error_str or "timed out" in error_str:
return ErrorContext(
error_type=ErrorType.TRANSIENT,
message=str(error),
recoverable=True,
suggested_action="重试请求"
)
# Auth Error
if "unauthorized" in error_str or "401" in error_str or "403" in error_str:
return ErrorContext(
error_type=ErrorType.PERMANENT,
message=str(error),
recoverable=False,
suggested_action="检查 API Key 或权限"
)
# Default
return ErrorContext(
error_type=ErrorType.TRANSIENT,
message=str(error),
recoverable=True,
suggested_action="重试"
)
4.3 Circuit Breaker (断路器)
from enum import Enum
import threading
import time
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 断开状态 (拒绝请求)
HALF_OPEN = "half_open" # 半开状态 (试探性恢复)
class CircuitBreaker:
"""
断路器模式
防止级联故障,当错误率达到阈值时自动断开
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
success_threshold: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.lock = threading.Lock()
def can_execute(self) -> bool:
"""检查是否可以执行"""
with self.lock:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# 检查是否可以进入半开状态
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
# HALF_OPEN
return True
def record_success(self):
"""记录成功"""
with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
def record_failure(self):
"""记录失败"""
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# 使用示例
circuit_breaker = CircuitBreaker()
def call_with_circuit_breaker(func, *args, **kwargs):
if not circuit_breaker.can_execute():
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
circuit_breaker.record_success()
return result
except Exception as e:
circuit_breaker.record_failure()
raise
五、任务编排模式
5.1 工作流模式分类
参考: AI SDK Workflow Patterns
┌─────────────────────────────────────────────────────────────────────────┐
│ 工作流编排模式 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 📦 Sequential (顺序处理) │
│ Step A → Step B → Step C → Done │
│ 适合: 有明确依赖关系的任务 │
│ │
│ 🔀 Routing (路由分发) │
│ Input → [Classifier] → Agent A / Agent B / Agent C │
│ 适合: 需要根据输入选择不同处理逻辑 │
│ │
│ ⚡ Parallel (并行处理) │
│ Input → [Agent A, Agent B, Agent C] → [Aggregator] → Output │
│ 适合: 可独立执行的子任务 │
│ │
│ 🎭 Orchestrator-Worker (编排者-工作者) │
│ Orchestrator 分解任务 → Workers 执行 → Orchestrator 汇总 │
│ 适合: 大型复杂任务 │
│ │
│ 🔄 Evaluator-Optimizer (评估-优化) │
│ Worker 产出 → Evaluator 评估 → 反馈 → Worker 改进 │
│ 适合: 需要迭代改进的任务 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
5.2 状态机模式
from enum import Enum
from typing import Callable, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class TaskState(Enum):
"""任务状态"""
PENDING = "pending"
INITIALIZING = "initializing"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class StateMachine:
"""
状态机任务编排器
定义清晰的状态转换规则,确保任务按预期流程执行
"""
# 允许的状态转换
TRANSITIONS: Dict[TaskState, set] = {
TaskState.PENDING: {TaskState.INITIALIZING, TaskState.CANCELLED},
TaskState.INITIALIZING: {TaskState.RUNNING, TaskState.FAILED, TaskState.CANCELLED},
TaskState.RUNNING: {TaskState.PAUSED, TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELLED},
TaskState.PAUSED: {TaskState.RUNNING, TaskState.CANCELLED},
TaskState.COMPLETED: set(),
TaskState.FAILED: {TaskState.PENDING}, # 允许重试
TaskState.CANCELLED: set(),
}
def __init__(self):
self.state = TaskState.PENDING
self.handlers: Dict[TaskState, Callable] = {}
self.history: list = []
def register_handler(self, state: TaskState, handler: Callable):
"""注册状态处理器"""
self.handlers[state] = handler
def transition(self, new_state: TaskState) -> bool:
"""状态转换"""
if new_state not in self.TRANSITIONS[self.state]:
logger.error(f"无效的状态转换: {self.state} → {new_state}")
return False
old_state = self.state
self.state = new_state
self.history.append({
"from": old_state,
"to": new_state,
"timestamp": time.time()
})
logger.info(f"状态转换: {old_state} → {new_state}")
return True
def execute(self, context: dict) -> dict:
"""执行当前状态的处理逻辑"""
if self.state not in self.handlers:
raise ValueError(f"未注册处理器: {self.state}")
handler = self.handlers[self.state]
return handler(context)
def can_resume(self) -> bool:
"""是否可以恢复执行"""
return self.state in {TaskState.PAUSED, TaskState.FAILED}
def get_history(self) -> list:
"""获取状态历史"""
return self.history.copy()
5.3 步骤编排器
from dataclasses import dataclass, field
from typing import Any, Callable, List, Optional
import json
from pathlib import Path
@dataclass
class Step:
"""任务步骤"""
id: str
name: str
handler: Callable
dependencies: List[str] = field(default_factory=list)
retry_count: int = 0
max_retries: int = 3
timeout: float = 300.0
status: str = "pending" # pending | running | completed | failed | skipped
@dataclass
class StepResult:
"""步骤执行结果"""
step_id: str
success: bool
output: Any = None
error: Optional[str] = None
duration: float = 0.0
class StepOrchestrator:
"""
步骤编排器
管理多步骤任务的执行,支持依赖关系、重试和状态持久化
"""
def __init__(self, state_file: str = "task_state.json"):
self.steps: Dict[str, Step] = {}
self.results: Dict[str, StepResult] = {}
self.state_file = Path(state_file)
self.context: dict = {}
def add_step(self, step: Step):
"""添加步骤"""
self.steps[step.id] = step
def load_state(self):
"""加载状态"""
if self.state_file.exists():
with open(self.state_file, 'r') as f:
data = json.load(f)
# 恢复步骤状态
for step_id, status in data.get("step_status", {}).items():
if step_id in self.steps:
self.steps[step_id].status = status
# 恢复上下文
self.context = data.get("context", {})
logger.info(f"已加载状态: {len(self.steps)} 个步骤")
def save_state(self):
"""保存状态"""
data = {
"step_status": {
step_id: step.status
for step_id, step in self.steps.items()
},
"context": self.context,
"updated_at": time.time()
}
with open(self.state_file, 'w') as f:
json.dump(data, f, indent=2)
def get_ready_steps(self) -> List[Step]:
"""获取可执行的步骤"""
ready = []
for step in self.steps.values():
if step.status != "pending":
continue
# 检查依赖是否完成
deps_completed = all(
self.steps[dep_id].status == "completed"
for dep_id in step.dependencies
if dep_id in self.steps
)
if deps_completed:
ready.append(step)
return ready
def execute_step(self, step: Step) -> StepResult:
"""执行单个步骤"""
import time
start_time = time.time()
try:
step.status = "running"
self.save_state()
output = step.handler(self.context)
step.status = "completed"
result = StepResult(
step_id=step.id,
success=True,
output=output,
duration=time.time() - start_time
)
except Exception as e:
step.retry_count += 1
if step.retry_count >= step.max_retries:
step.status = "failed"
else:
step.status = "pending" # 允许重试
result = StepResult(
step_id=step.id,
success=False,
error=str(e),
duration=time.time() - start_time
)
self.results[step.id] = result
self.save_state()
return result
def run(self) -> bool:
"""运行整个工作流"""
self.load_state()
while True:
ready_steps = self.get_ready_steps()
if not ready_steps:
# 检查是否全部完成
all_done = all(
s.status in ("completed", "skipped")
for s in self.steps.values()
)
if all_done:
logger.info("所有步骤完成")
return True
# 检查是否有失败的步骤
has_failed = any(
s.status == "failed"
for s in self.steps.values()
)
if has_failed:
logger.error("存在失败的步骤")
return False
# 等待中的步骤
logger.info("等待步骤完成...")
time.sleep(1)
continue
# 执行就绪的步骤
for step in ready_steps:
logger.info(f"执行步骤: {step.name}")
result = self.execute_step(step)
if result.success:
logger.info(f"步骤完成: {step.name}")
else:
logger.error(f"步骤失败: {step.name} - {result.error}")
return False
六、监控与可观测性
6.1 可观测性三支柱
┌─────────────────────────────────────────────────────────────────────────┐
│ 可观测性三支柱 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 📊 Metrics (指标) │
│ ├── 任务完成率 │
│ ├── 平均执行时间 │
│ ├── 错误率 │
│ ├── Token 消耗 │
│ └── 资源使用率 │
│ │
│ 📝 Logs (日志) │
│ ├── 结构化日志 │
│ ├── 任务执行轨迹 │
│ ├── 错误堆栈 │
│ └── 决策记录 │
│ │
│ 🔍 Traces (追踪) │
│ ├── 跨步骤追踪 │
│ ├── API 调用链 │
│ ├── 依赖关系图 │
│ └── 性能瓶颈定位 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
6.2 结构化日志
import json
import logging
from datetime import datetime
from typing import Any
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, task_id: str, log_file: str = "task.log"):
self.task_id = task_id
self.log_file = log_file
self.handlers = []
def log(
self,
level: str,
message: str,
**kwargs: Any
):
"""记录结构化日志"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"task_id": self.task_id,
"level": level,
"message": message,
**kwargs
}
# 写入文件
with open(self.log_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
# 控制台输出
log_method = getattr(logging, level.lower(), logging.info)
log_method(f"[{self.task_id}] {message}")
def info(self, message: str, **kwargs):
self.log("INFO", message, **kwargs)
def error(self, message: str, **kwargs):
self.log("ERROR", message, **kwargs)
def warning(self, message: str, **kwargs):
self.log("WARNING", message, **kwargs)
def step_start(self, step_id: str, step_name: str):
"""记录步骤开始"""
self.log("INFO", f"步骤开始: {step_name}", step_id=step_id, event="step_start")
def step_end(self, step_id: str, step_name: str, success: bool, duration: float):
"""记录步骤结束"""
self.log(
"INFO" if success else "ERROR",
f"步骤{'完成' if success else '失败'}: {step_name}",
step_id=step_id,
event="step_end",
success=success,
duration_seconds=duration
)
def decision(self, decision: str, reason: str):
"""记录决策"""
self.log("INFO", f"决策: {decision}", event="decision", reason=reason)
def checkpoint(self, checkpoint_id: str, state: dict):
"""记录检查点"""
self.log("INFO", f"创建检查点", event="checkpoint", checkpoint_id=checkpoint_id)
6.3 进度仪表盘
from dataclasses import dataclass
from typing import Dict, List
import json
@dataclass
class TaskProgress:
"""任务进度"""
task_id: str
status: str
total_steps: int
completed_steps: int
failed_steps: int
current_step: str
started_at: str
estimated_completion: str | None
progress_percent: float
class ProgressDashboard:
"""
进度仪表盘
生成任务进度的可视化报告
"""
def __init__(self, state_file: str):
self.state_file = state_file
def get_progress(self) -> TaskProgress:
"""获取进度信息"""
with open(self.state_file, 'r') as f:
state = json.load(f)
step_status = state.get("step_status", {})
total = len(step_status)
completed = sum(1 for s in step_status.values() if s == "completed")
failed = sum(1 for s in step_status.values() if s == "failed")
# 找到当前步骤
current = next(
(k for k, v in step_status.items() if v == "running"),
"N/A"
)
progress = (completed / total * 100) if total > 0 else 0
return TaskProgress(
task_id=state.get("task_id", "unknown"),
status=state.get("status", "unknown"),
total_steps=total,
completed_steps=completed,
failed_steps=failed,
current_step=current,
started_at=state.get("started_at", "unknown"),
estimated_completion=None, # TODO: 实现预估
progress_percent=round(progress, 1)
)
def render(self) -> str:
"""渲染进度报告"""
progress = self.get_progress()
# 进度条
bar_length = 40
filled = int(bar_length * progress.progress_percent / 100)
bar = "█" * filled + "░" * (bar_length - filled)
return f"""
┌─────────────────────────────────────────────────────────────┐
│ 任务进度报告 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Task ID: {progress.task_id:<42} │
│ Status: {progress.status:<43} │
│ │
│ [{bar}] {progress.progress_percent:.1f}% │
│ │
│ Steps: {progress.completed_steps}/{progress.total_steps} completed, {progress.failed_steps} failed │
│ Current: {progress.current_step:<42} │
│ │
│ Started: {progress.started_at:<41} │
│ │
└─────────────────────────────────────────────────────────────┘
"""
七、工具与框架选型
7.1 长任务框架对比
| 框架 |
类型 |
特点 |
适用场景 |
| Temporal |
工作流引擎 |
持久化执行、自动重试、长运行支持 |
企业级生产环境 |
| LangGraph |
Agent 框架 |
状态图、检查点、PostgreSQL 持久化 |
LLM 工作流 |
| Inngest |
Serverless |
事件驱动、自动重试、Durable Functions |
Serverless 架构 |
| Restate |
Durable Execution |
轻量级、支持 LLM、故障恢复 |
现代 AI 应用 |
| Claude Agent SDK |
Agent SDK |
内置 Todo、Compaction、Hooks |
Claude 生态 |
| 自建方案 |
自定义 |
完全控制、轻量 |
学习/简单场景 |
7.2 Temporal 示例
# Temporal 工作流示例
from datetime import timedelta
from temporalio import workflow, activity
@workflow.defn
class LongRunningTaskWorkflow:
"""长任务工作流"""
@workflow.run
async def run(self, task_config: dict) -> dict:
# Step 1: 初始化
init_result = await workflow.execute_activity(
"initialize_task",
task_config,
start_to_close_timeout=timedelta(minutes=5)
)
# Step 2: 执行子任务 (可并行)
sub_results = await workflow.execute_activity(
"execute_subtasks",
init_result,
start_to_close_timeout=timedelta(hours=2)
)
# Step 3: 汇总结果
final_result = await workflow.execute_activity(
"aggregate_results",
sub_results,
start_to_close_timeout=timedelta(minutes=10)
)
return final_result
@activity.defn
async def initialize_task(config: dict) -> dict:
"""初始化活动"""
# 自动重试、超时处理
return {"status": "initialized", "config": config}
7.3 LangGraph 示例
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict
class AgentState(TypedDict):
messages: list
current_step: str
results: dict
# 定义节点
def research_node(state: AgentState) -> AgentState:
# 研究逻辑
return {**state, "current_step": "research_done"}
def analyze_node(state: AgentState) -> AgentState:
# 分析逻辑
return {**state, "current_step": "analysis_done"}
def write_node(state: AgentState) -> AgentState:
# 写作逻辑
return {**state, "current_step": "writing_done"}
# 构建图
workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("write", write_node)
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", END)
workflow.set_entry_point("research")
# 添加检查点 (PostgreSQL 持久化)
checkpointer = PostgresSaver(connection_string)
app = workflow.compile(checkpointer=checkpointer)
# 运行 (支持恢复)
result = app.invoke(
{"messages": [], "current_step": "", "results": {}},
config={"configurable": {"thread_id": "task-123"}}
)
八、实战操作指南
8.1 长任务启动清单
在启动一个长任务前,确保以下准备工作已完成:
┌─────────────────────────────────────────────────────────────────────────┐
│ 长任务启动清单 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 📋 任务设计 │
│ □ 明确定义任务目标和成功标准 │
│ □ 将任务分解为独立、可验证的步骤 │
│ □ 识别步骤间的依赖关系 │
│ □ 为每个步骤设定超时和重试策略 │
│ │
│ 💾 状态管理 │
│ □ 选择状态存储方案 (文件/数据库) │
│ □ 设计状态数据结构 │
│ □ 实现检查点保存逻辑 │
│ □ 实现状态恢复逻辑 │
│ │
│ 🛡️ 错误处理 │
│ □ 分类可能出现的错误 │
│ □ 实现重试机制 (指数退避) │
│ □ 实现断路器 (可选) │
│ □ 定义失败恢复策略 │
│ │
│ 📊 可观测性 │
│ □ 配置结构化日志 │
│ □ 实现进度追踪 │
│ □ 设置告警规则 (可选) │
│ □ 准备进度查看方式 │
│ │
│ 🧪 测试 │
│ □ 测试正常执行流程 │
│ □ 测试中断恢复 │
│ □ 测试错误处理 │
│ □ 测试并发场景 (如适用) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
8.2 推荐的项目结构
long-running-task/
├── task_config.json # 任务配置
├── init.sh # 初始化脚本
├── state/
│ ├── task_state.json # 任务状态
│ ├── checkpoints/ # 检查点目录
│ │ ├── ckpt_1710560000.json
│ │ └── ckpt_1710560300.json
│ └── progress.json # 进度摘要
├── logs/
│ ├── task.log # 结构化日志
│ └── decisions.log # 决策记录
├── artifacts/ # 产出物
│ ├── report.md
│ └── data.json
├── src/
│ ├── orchestrator.py # 任务编排器
│ ├── steps/ # 步骤实现
│ │ ├── step_1_research.py
│ │ ├── step_2_analyze.py
│ │ └── step_3_report.py
│ ├── state_manager.py # 状态管理
│ └── logger.py # 日志工具
├── tests/
│ ├── test_orchestrator.py
│ └── test_steps.py
└── requirements.txt
8.3 完整启动脚本
#!/usr/bin/env python3
"""
长任务启动脚本
使用方式:
python run_task.py --config task_config.json
python run_task.py --config task_config.json --resume
"""
import argparse
import json
import sys
from pathlib import Path
from datetime import datetime
from orchestrator import StepOrchestrator, Step
from state_manager import CheckpointManager
from logger import StructuredLogger
from dashboard import ProgressDashboard
def load_config(config_path: str) -> dict:
"""加载配置"""
with open(config_path, 'r') as f:
return json.load(f)
def setup_directories(task_id: str) -> dict:
"""设置目录结构"""
base = Path("tasks") / task_id
dirs = {
"base": base,
"state": base / "state",
"checkpoints": base / "state" / "checkpoints",
"logs": base / "logs",
"artifacts": base / "artifacts"
}
for d in dirs.values():
d.mkdir(parents=True, exist_ok=True)
return dirs
def create_steps(config: dict) -> list[Step]:
"""创建步骤"""
from steps import (
step_1_research,
step_2_analyze,
step_3_report
)
steps = [
Step(
id="research",
name="信息收集",
handler=step_1_research.execute,
max_retries=3,
timeout=600.0
),
Step(
id="analyze",
name="数据分析",
handler=step_2_analyze.execute,
dependencies=["research"],
max_retries=2,
timeout=1200.0
),
Step(
id="report",
name="生成报告",
handler=step_3_report.execute,
dependencies=["analyze"],
max_retries=2,
timeout=300.0
)
]
return steps
def main():
parser = argparse.ArgumentParser(description="长任务执行器")
parser.add_argument("--config", required=True, help="配置文件路径")
parser.add_argument("--resume", action="store_true", help="恢复中断的任务")
args = parser.parse_args()
# 加载配置
config = load_config(args.config)
task_id = config.get("task_id", f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
# 设置目录
dirs = setup_directories(task_id)
# 初始化组件
logger = StructuredLogger(
task_id,
log_file=str(dirs["logs"] / "task.log")
)
checkpoint_mgr = CheckpointManager(
checkpoint_dir=str(dirs["checkpoints"])
)
orchestrator = StepOrchestrator(
state_file=str(dirs["state"] / "task_state.json")
)
# 注册步骤
for step in create_steps(config):
orchestrator.add_step(step)
logger.info("任务启动", config=config, resume=args.resume)
try:
# 执行任务
success = orchestrator.run()
if success:
logger.info("任务完成")
print("\n✅ 任务完成!")
# 显示进度
dashboard = ProgressDashboard(orchestrator.state_file)
print(dashboard.render())
else:
logger.error("任务失败")
print("\n❌ 任务失败,请检查日志")
sys.exit(1)
except KeyboardInterrupt:
logger.warning("用户中断任务")
orchestrator.save_state()
print("\n⏸️ 任务已暂停,可使用 --resume 恢复")
sys.exit(0)
except Exception as e:
logger.error(f"任务异常: {e}")
orchestrator.save_state()
print(f"\n💥 任务异常: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
8.4 监控脚本
#!/usr/bin/env python3
"""
任务监控脚本
使用方式:
python monitor.py --task task_20260316_143000
"""
import argparse
import time
from pathlib import Path
from dashboard import ProgressDashboard
import json
def monitor_task(task_dir: str, interval: float = 5.0):
"""监控任务进度"""
state_file = Path(task_dir) / "state" / "task_state.json"
log_file = Path(task_dir) / "logs" / "task.log"
if not state_file.exists():
print(f"❌ 任务状态文件不存在: {state_file}")
return
dashboard = ProgressDashboard(str(state_file))
print("🔍 开始监控任务...")
print("按 Ctrl+C 停止\n")
last_lines = 0
try:
while True:
# 显示进度
print("\033[2J\033[H") # 清屏
print(dashboard.render())
# 显示最近日志
if log_file.exists():
with open(log_file, 'r') as f:
lines = f.readlines()
if len(lines) > last_lines:
print("\n📜 最近日志:")
for line in lines[-5:]:
try:
entry = json.loads(line)
print(f" [{entry['level']}] {entry['message']}")
except:
pass
last_lines = len(lines)
time.sleep(interval)
except KeyboardInterrupt:
print("\n👋 监控已停止")
def main():
parser = argparse.ArgumentParser(description="任务监控器")
parser.add_argument("--task", required=True, help="任务目录")
parser.add_argument("--interval", type=float, default=5.0, help="刷新间隔(秒)")
args = parser.parse_args()
monitor_task(args.task, args.interval)
if __name__ == "__main__":
main()
九、最佳实践清单
9.1 设计原则
┌─────────────────────────────────────────────────────────────────────────┐
│ 长任务设计原则 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ✅ DO (推荐) │
│ │
│ 1. 小步迭代 │
│ └── 每个步骤应该是原子性的,可独立验证和恢复 │
│ │
│ 2. 频繁检查点 │
│ └── 在关键步骤后保存状态,减少恢复时的重复工作 │
│ │
│ 3. 幂等性设计 │
│ └── 同一步骤多次执行应该产生相同结果 │
│ │
│ 4. 明确的成功标准 │
│ └── 每个步骤都有可验证的完成条件 │
│ │
│ 5. 优雅降级 │
│ └── 部分失败不影响整体,允许跳过非关键步骤 │
│ │
│ ❌ DON'T (避免) │
│ │
│ 1. 一步到位 │
│ └── 避免试图在一个大步骤中完成所有工作 │
│ │
│ 2. 状态依赖内存 │
│ └── 所有状态必须持久化,不能依赖进程内存 │
│ │
│ 3. 无限重试 │
│ └── 设置最大重试次数,避免死循环 │
│ │
│ 4. 忽略错误 │
│ └── 记录所有错误,即使选择继续执行 │
│ │
│ 5. 假设网络稳定 │
│ └── 所有外部调用都可能失败,需要重试机制 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
9.2 检查点策略
| 场景 |
检查点频率 |
保留策略 |
| 快速任务 (< 10 分钟) |
每步完成后 |
保留最近 3 个 |
| 中等任务 (10-60 分钟) |
每步 + 每 5 分钟 |
保留最近 10 个 |
| 长时任务 (> 1 小时) |
每步 + 每 10 分钟 |
保留最近 20 个 |
| 关键任务 |
每步 + 每 2 分钟 |
保留所有 + 定期归档 |
9.3 错误处理策略
| 错误类型 |
处理方式 |
重试次数 |
退避策略 |
| 网络超时 |
自动重试 |
5 |
指数退避 (1s → 60s) |
| Rate Limit |
等待后重试 |
3 |
固定等待 (60s) |
| 认证失败 |
立即失败 |
0 |
N/A |
| 资源不足 |
等待后重试 |
3 |
线性退避 (30s) |
| 逻辑错误 |
记录并跳过 |
0 |
N/A |
9.4 Agent 提示词模板
# 长任务 Agent 提示词模板
## 角色定义
你是一个长任务执行 Agent,负责完成复杂的、跨多个 Session 的任务。
## 核心原则
1. **增量执行**: 每次只做一个步骤,不要试图一次完成所有工作
2. **状态持久化**: 每完成一个步骤,必须更新状态文件
3. **充分测试**: 标记完成前必须验证功能正常
4. **清晰记录**: 在进度文件中记录所有决策和变更
## Session 启动流程
每个 Session 开始时,按以下步骤操作:
1. 运行 `pwd` 确认工作目录
2. 读取进度文件 (`progress.md`)
3. 读取 Git 日志 (`git log --oneline -20`)
4. 读取功能清单 (`feature_list.json`)
5. 运行 `init.sh` 启动环境
6. 执行基础验证测试
7. 选择下一个未完成的功能
## Session 结束流程
每个 Session 结束前,必须:
1. 确保代码处于可运行状态
2. 提交 Git Commit (描述性消息)
3. 更新进度文件
4. 更新功能清单状态
## 错误处理
遇到错误时:
1. 记录错误详情到日志
2. 尝试恢复到上一个稳定状态
3. 如果无法恢复,保存当前进度并报告
4. 不要跳过错误,不要标记未完成的功能为完成
## 输出格式
所有输出使用 Markdown 格式,包含:
- 当前步骤说明
- 执行的操作
- 遇到的问题
- 下一步计划
参考资源
官方文档
框架与工具
社区资源
报告生成时间: 2026-03-16
基于 Anthropic 官方文档及社区最佳实践整理